In modern data engineering, it’s common to have multiple data pipelines running concurrently. However, when these pipelines share a common sub-task, it becomes crucial to ensure that the sub-task does not run simultaneously across multiple pipelines. This can prevent data corruption, race conditions, and other issues related to concurrent executions.
To address this challenge, a locking mechanism can be implemented. A centralized lock management system, such as Redis, ZooKeeper, or a relational database, can be used to control the execution flow. The key steps involve:
Identifying the Sub-Task: Clearly define the critical sub-task that should not run concurrently in multiple pipelines. It would be hard to scan the GUI-based pipeline / workflow. I would suggest creating DAGs and maintaining these DAGs in the relation tables. These tables can involve tasks, pipelines, dependencies, while dependencies is a nested table. So that we can query by distinct pipelines w/ their sub pipelines/tasks to figure out the reused ones.
Creating a Locking Mechanism: Use a centralized system to manage locks, such as a database, distributed lock service (e.g., ZooKeeper, etcd, Redis), or a file system with atomic operations.
CREATE TABLE locks (
lock_name VARCHAR(255) PRIMARY KEY, -- The name of the lock
owner_id VARCHAR(255) NOT NULL, -- Unique identifier for the process/pipeline holding the lock
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- When the lock was acquired
expires_at TIMESTAMP NOT NULL, -- When the lock expires
UNIQUE (lock_name, owner_id) -- Ensure that each lock is uniquely owned
);
Implementing Lock Acquisition and Release: Ensuring that pipelines acquire a lock before starting the sub-task and release it upon completion.
-- Acquire a lock
INSERT INTO locks (lock_name, owner_id, expires_at)
VALUES ('sub_task_lock', 'pipeline_1', CURRENT_TIMESTAMP + INTERVAL '10 minutes')
ON CONFLICT (lock_name) DO NOTHING;
-- Check if the lock is held by the current process
SELECT owner_id, expires_at
FROM locks
WHERE lock_name = 'sub_task_lock' and expires_at < CURRENT_TIMESTAMP;
-- Extend the lock
UPDATE locks
SET expires_at = CURRENT_TIMESTAMP + INTERVAL '10 minutes'
WHERE lock_name = 'sub_task_lock' AND owner_id = 'pipeline_1';
-- Release the lock
DELETE FROM locks
WHERE lock_name = 'sub_task_lock' AND owner_id = 'pipeline_1';
-- Clean up expired locks (to be run periodically)
DELETE FROM locks
WHERE expires_at < CURRENT_TIMESTAMP;
Monitoring and Alerts:
- Monitor the locking mechanism to ensure it’s working as expected.
- Set up alerts for situations where a lock might be held for too long, indicating potential issues in the pipeline.
Dashboards: Use monitoring tools like Grafana or Kibana to visualize the data from your lock table. This can include:
Number of active locks.
Locks close to expiration.
Average lock hold time.
Locks held by each owner.
Easy maintain:
Once we config our DAG pipelines within our tables, it would be easy to add a lock as a feature into a task.
# Take these two DAGs for example, Node B used twice.
DAG1: Start -> Node A -> Node B -> Node C -> End
DAG2: Start -> Node D -> Node B -> Node E -> End
# We can set up a lock as feature when define Node B.
Name: NodeB
Type: Task
Input: input_path/XXX.csv
Output: output_path/table_name
Notebook: notebook_path
....
lock: {lock_status: enable, lock_duration: 10 mins}
This approach can be adapted to various systems and technologies, providing a robust solution for managing exclusive sub-task execution in concurrent pipelines.
Leave a Reply