Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Coordinators are registered in ``airflow.cfg`` (or via environment variables) un
coordinators = {
"my-coordinator": {
"classpath": "path.to.CoordinatorClass",
"kwargs": {}
"kwargs": {},
"extra": {}
}
}

Expand All @@ -147,6 +148,14 @@ Coordinators are registered in ``airflow.cfg`` (or via environment variables) un
of each coordinator (e.g. :ref:`java-sdk/coordinator-config` for
:class:`~airflow.sdk.coordinators.java.JavaCoordinator`).

``extra`` is an optional object for any additional information you want to associate with a
coordinator without coupling it to the coordinator instance. The coordinator itself never
receives it; other components read it as needed. For example, KubernetesExecutor reads
``extra.pod_template_file`` to launch a queue's worker pod from a specific pod template, and
``extra.worker_container_repository`` + ``extra.worker_container_tag`` to override that queue's
worker base image (both keys are required), e.g. an image that bundles the JVM for a Java
coordinator.

``queue_to_coordinator``
A JSON object mapping Celery queue names to coordinator names:

Expand Down
18 changes: 17 additions & 1 deletion airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2045,13 +2045,29 @@ sdk:
multiple times under different names with different ``kwargs`` (for
example, two ``JavaCoordinator`` instances pinned to different JDK
versions).

An entry may also carry an optional ``extra`` object for additional
information associated with the coordinator that the coordinator itself
does not receive; other components read it as needed. For example,
KubernetesExecutor reads ``extra.pod_template_file`` to launch a queue's
worker pod from a specific pod template, and
``extra.worker_container_repository`` + ``extra.worker_container_tag`` to
override the worker base image for that queue (both are required).
version_added: 3.3.0
type: string
example: |
{
"jdk-17": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"]}
"kwargs": {
"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
"jvm_args": ["-Xmx1024m"]
},
"extra": {
"pod_template_file": "/opt/airflow/pod_templates/java.yaml",
"worker_container_repository": "myrepo/airflow-java",
"worker_container_tag": "3.3.0"
}
}
}
default: ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,64 @@ def start(self) -> None:
scheduler_job_id=self.scheduler_job_id,
)

def _coordinator_extra(self, queue: str | None) -> dict[str, Any] | None:
"""
Return the ``extra`` mapping a coordinator declares for *queue*, if any.

Read from the coordinator's declarative ``[sdk]`` config without importing
or instantiating the coordinator. The coordinator manager only exists on
Airflow 3.3+; on older Task SDKs the import fails and we fall back to no
extra. A malformed ``[sdk] coordinators`` / ``queue_to_coordinator`` config
must not crash the scheduler on this first lookup either, so an invalid
config also falls back to no extra. The exception types are imported from
``airflow.sdk`` so they match whatever Task SDK actually raised them.
"""
if not queue:
return None
try:
from airflow.sdk.exceptions import AirflowConfigException
from airflow.sdk.execution_time.coordinator import get_coordinator_manager
except ImportError:
return None
try:
return get_coordinator_manager().extra_for_queue(queue)
except (AirflowConfigException, ValueError):
self.log.warning(
"Ignoring coordinator config for queue %s: invalid [sdk] coordinator config",
queue,
exc_info=True,
)
return None

def _coordinator_pod_template_file(self, queue: str | None) -> str | None:
"""
Return the pod template a coordinator declares for *queue*, if any.

Lets a queue routed to a non-Python coordinator (via ``[sdk]
queue_to_coordinator``) launch its worker pod from a coordinator-specific
template — for example an image carrying the JVM for a Java coordinator.
"""
if (extra := self._coordinator_extra(queue)) is not None:
return extra.get("pod_template_file", None)
return None

def _coordinator_kube_image(self, queue: str | None) -> str | None:
"""
Return the worker base image a coordinator declares for *queue*, if any.

The base container image is never taken from a pod template; it comes
from ``kube_image`` (``worker_container_repository:worker_container_tag``)
or a per-task ``pod_override``. A coordinator may declare its own
``worker_container_repository`` and ``worker_container_tag`` in ``extra``
(e.g. a JRE-bearing image for a Java coordinator); both are required to
compose an override, otherwise the executor default applies.
"""
if (extra := self._coordinator_extra(queue)) is None:
return None
if (repo := extra.get("worker_container_repository")) and (tag := extra.get("worker_container_tag")):
return f"{repo}:{tag}"
return None

def execute_async(
self,
key: TaskInstanceKey,
Expand Down Expand Up @@ -226,8 +284,31 @@ def execute_async(
pod_template_file = executor_config.get("pod_template_file", None)
else:
pod_template_file = None

# A coordinator-level pod_template wins (e.g. a JVM image for JavaCoordinator)
if (coordinator_pod_template_file := self._coordinator_pod_template_file(queue)) is not None:
self.log.debug(
"Using coordinator-declared pod template %s for task %s in queue %s",
coordinator_pod_template_file,
key,
queue,
)
pod_template_file = coordinator_pod_template_file

# The base image is not carried by a pod template, so a coordinator routes
# its worker base image separately (e.g. a JRE image for a Java queue).
if (coordinator_kube_image := self._coordinator_kube_image(queue)) is not None:
self.log.debug(
"Using coordinator-declared base image %s for task %s in queue %s",
coordinator_kube_image,
key,
queue,
)

self.event_buffer[key] = (TaskInstanceState.QUEUED, self.scheduler_job_id)
self.task_queue.put(KubernetesJob(key, command, kube_executor_config, pod_template_file))
self.task_queue.put(
KubernetesJob(key, command, kube_executor_config, pod_template_file, coordinator_kube_image)
)
# We keep a temporary local record that we've handled this so we don't
# try and remove it from the QUEUED state while we process it
self.last_handled[key] = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class KubernetesJob(NamedTuple):
command: Sequence[str]
kube_executor_config: Any
pod_template_file: str | None
kube_image: str | None = None


ALL_NAMESPACES = "ALL_NAMESPACES"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ def run_next(self, next_job: KubernetesJob) -> None:
command = next_job.command
kube_executor_config = next_job.kube_executor_config
pod_template_file = next_job.pod_template_file
kube_image = next_job.kube_image or self.kube_config.kube_image

dag_id, task_id, run_id, try_number, map_index = key
if len(command) == 1:
Expand Down Expand Up @@ -586,7 +587,7 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_id=create_unique_id(dag_id, task_id),
dag_id=dag_id,
task_id=task_id,
kube_image=self.kube_config.kube_image,
kube_image=kube_image,
try_number=try_number,
map_index=map_index,
date=None,
Expand Down
Loading
Loading