Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Coordinators are registered in ``airflow.cfg`` (or via environment variables) un
coordinators = {
"my-coordinator": {
"classpath": "path.to.CoordinatorClass",
"kwargs": {}
"kwargs": {},
"extra": {}
}
}

Expand All @@ -147,6 +148,14 @@ Coordinators are registered in ``airflow.cfg`` (or via environment variables) un
of each coordinator (e.g. :ref:`java-sdk/coordinator-config` for
:class:`~airflow.sdk.coordinators.java.JavaCoordinator`).

``extra`` is an optional object for any additional information you want to associate with a
coordinator without coupling it to the coordinator instance. The coordinator itself never
receives it; other components read it as needed. For example, KubernetesExecutor reads
``extra.pod_template_file`` to launch a queue's worker pod from a specific pod template, and
``extra.worker_container_repository`` + ``extra.worker_container_tag`` to override that queue's
worker base image (both keys are required), e.g. an image that bundles the JVM for a Java
coordinator.

Comment thread
phanikumv marked this conversation as resolved.
``queue_to_coordinator``
A JSON object mapping Celery queue names to coordinator names:

Expand Down
18 changes: 17 additions & 1 deletion airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2045,13 +2045,29 @@ sdk:
multiple times under different names with different ``kwargs`` (for
example, two ``JavaCoordinator`` instances pinned to different JDK
versions).

An entry may also carry an optional ``extra`` object for additional
information associated with the coordinator that the coordinator itself
does not receive; other components read it as needed. For example,
KubernetesExecutor reads ``extra.pod_template_file`` to launch a queue's
worker pod from a specific pod template, and
``extra.worker_container_repository`` + ``extra.worker_container_tag`` to
override the worker base image for that queue (both are required).
version_added: 3.3.0
type: string
example: |
{
"jdk-17": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"]}
"kwargs": {
"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
"jvm_args": ["-Xmx1024m"]
},
"extra": {
"pod_template_file": "/opt/airflow/pod_templates/java.yaml",
"worker_container_repository": "myrepo/airflow-java",
"worker_container_tag": "3.3.0"
}
}
}
default: ~
Expand Down
29 changes: 29 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def execute_task(
class _CoordinatorSpec(pydantic.BaseModel):
classpath: str
kwargs: dict[str, Any] = pydantic.Field(default_factory=dict)
# Optional metadata read by other components; kept separate from ``kwargs``
# so it is never passed to the coordinator constructor.
extra: dict[str, Any] | None = None


class _PythonCoordinator(BaseCoordinator):
Expand Down Expand Up @@ -184,6 +187,18 @@ class CoordinatorManager:
routed to a JDK 11 coordinator, and a ``"modern-java"`` queue routed to a
JDK 17 coordinator).

A coordinator entry may also carry an optional ``extra`` mapping: metadata
that other components read as needed. It is kept separate from ``kwargs`` and
never passed to the coordinator constructor::

{
"java": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {...},
"extra": {"pod_template_file": "/opt/airflow/pod_templates/java.yaml"},
}
}

:meta private:
"""

Expand Down Expand Up @@ -234,6 +249,20 @@ def for_queue(self, queue: str) -> BaseCoordinator:
log.debug("Coordinator found for queue", coordinator=coordinator, queue=queue)
return coordinator

def extra_for_queue(self, queue: str) -> dict[str, Any] | None:
"""
Return the optional ``extra`` mapping configured for *queue*'s coordinator.

Returns ``None`` when the queue is not routed to a coordinator or its
coordinator declares no ``extra``. Only the declarative spec is read; the
coordinator is never instantiated.
"""
if (key := self._queue_to_coordinator.get(queue)) is None:
return None
if (spec := self._coordinator_specs.get(key)) is None:
return None
return spec.extra


@functools.cache
def get_coordinator_manager() -> CoordinatorManager:
Expand Down
79 changes: 79 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class _CoordinatorB(BaseCoordinator):
pass


class _ExplodingCoordinator(BaseCoordinator):
def __init__(self):
raise RuntimeError("coordinator must not be instantiated")


@pytest.fixture
def sdk_config(monkeypatch):
"""Set the ``[sdk]`` env vars consumed by :meth:`CoordinatorManager.from_config`.
Expand Down Expand Up @@ -120,3 +125,77 @@ def test_get_coordinator_manager_is_cached(self, monkeypatch):
m1 = get_coordinator_manager()
m2 = get_coordinator_manager()
assert m1 is m2

@pytest.mark.parametrize(
("queue", "expected"),
[
pytest.param(
"queue-java",
{"pod_template_file": "/opt/airflow/pod_templates/java.yaml"},
id="mapped-with-extra",
),
pytest.param("queue-go", None, id="mapped-without-extra"),
pytest.param("queue-unmapped", None, id="unmapped-queue"),
],
)
def test_extra_for_queue(self, sdk_config, queue, expected):
sdk_config(
coordinators=json.dumps(
{
"java": {
"classpath": f"{_CoordinatorA.__module__}._CoordinatorA",
"extra": {"pod_template_file": "/opt/airflow/pod_templates/java.yaml"},
},
"go": {"classpath": f"{_CoordinatorB.__module__}._CoordinatorB"},
}
),
queue_to_coordinator=json.dumps({"queue-java": "java", "queue-go": "go"}),
)
manager = CoordinatorManager.from_config()
# Resolving the extra must not instantiate the coordinator.
assert manager.extra_for_queue(queue) == expected
assert manager._created_coordinators == {}

def test_extra_not_forwarded_to_constructor(self, sdk_config):
"""``extra`` is kept separate from ``kwargs`` and never reaches the coordinator constructor."""
sdk_config(
coordinators=json.dumps(
{
"java": {
"classpath": f"{_CoordinatorA.__module__}._CoordinatorA",
"kwargs": {"label": "java-label"},
"extra": {"pod_template_file": "/opt/airflow/pod_templates/java.yaml"},
},
}
),
queue_to_coordinator=json.dumps({"queue-java": "java"}),
)
manager = CoordinatorManager.from_config()
# _CoordinatorA only accepts ``label``; construction would raise TypeError
# if ``extra`` were passed through.
coordinator = manager.for_queue("queue-java")
assert isinstance(coordinator, _CoordinatorA)
assert coordinator.label == "java-label"
# The extra is still readable from the spec without instantiation cost.
assert manager.extra_for_queue("queue-java") == {
"pod_template_file": "/opt/airflow/pod_templates/java.yaml"
}

def test_extra_for_queue_does_not_instantiate_coordinator(self, sdk_config):
"""Reading ``extra`` reads only the spec; a failing constructor must never run."""
sdk_config(
coordinators=json.dumps(
{
"boom": {
"classpath": f"{_ExplodingCoordinator.__module__}._ExplodingCoordinator",
"extra": {"pod_template_file": "/opt/airflow/pod_templates/boom.yaml"},
},
}
),
queue_to_coordinator=json.dumps({"queue-boom": "boom"}),
)
manager = CoordinatorManager.from_config()
assert manager.extra_for_queue("queue-boom") == {
"pod_template_file": "/opt/airflow/pod_templates/boom.yaml"
}
assert manager._created_coordinators == {}
Loading