Airflow Xcom Exclusive Now
Mastering Airflow XComs: Advanced Patterns for Exclusive Data Sharing
Use these strategies depending on your requirement:
@classmethod def get_value(cls, key, dag_id, task_id, run_id, map_index): # Enforce exclusive pull: only if (dag_id, calling_task, target_task) is allowed calling_task = task_id # Note: in real implementation, you'd need to resolve caller allowed_keys = cls.ALLOWED_PULLS.get((dag_id, calling_task), []) if key not in allowed_keys: raise AirflowException( f"XCom exclusive violation: Task calling_task not allowed to pull key 'key'" ) return super().get_value(key, dag_id, task_id, run_id, map_index) airflow xcom exclusive
Simple design:
Instead of saving data to PostgreSQL, a custom backend intercepts the XCom write, uploads the actual payload to an external object store (like Amazon S3, Google Cloud Storage, or Azure Blob), and saves only the URI string back to the Airflow metadata database. When a downstream task pulls the XCom, the backend automatically fetches and deserializes the object from cloud storage. XComs — Airflow 3
, you simply return a value from a Python function, and Airflow manages the XCom lifecycle for you. XComs — Airflow 3.2.0 Documentation
When a task finishes executing, it can push an XCom using a specific key. By default, if a Python operator returns a value, Airflow automatically serializes and pushes that value with the key return_value . High database CPU usage on Scheduler nodes
Use .output explicitly or pass it inside a Jinja template string: ti.xcom_pull(task_ids='...') . High database CPU usage on Scheduler nodes.
: A task "pushes" data into the system, and a downstream task "pulls" it out.
Starting with Airflow 2, calling xcom_pull() without a task_ids argument pulls only from the current task. In earlier versions, it would search all tasks. Always be explicit to avoid unexpected behavior.