Skip to content

API server leaks a KubernetesExecutor multiprocessing.Manager process per worker when viewing RUNNING task logs #68693

@abhipalsingh

Description

@abhipalsingh

Under which category would you file this issue?

Airflow Core

Apache Airflow version

3.2.1

What happened and how to reproduce it?

What happened

When the API server serves logs for a RUNNING task instance, it ends up constructing a KubernetesExecutor (to obtain get_task_log). KubernetesExecutor.__init__ creates a multiprocessing.Manager(), which forks a serve_forever server process. The API server never runs or shuts down this executor, so that Manager process is orphaned and runs forever (~350–400 MB resident each).

Because the executor is cached per process, it's ~1 leaked Manager per gunicorn/uvicorn worker (e.g. up to 4 per API-server pod). It gets worse with gunicorn worker refresh / any worker recycling: when a worker is replaced, its Manager child reparents to PID 1 and is never reaped, so orphaned Managers accumulate over time and contribute to API-server OOMs.

Confirmed with py-spy dump --native of a hung child (the api-server gunicorn: worker proc with PPID = another worker):

Thread (idle): "AnyIO worker thread"
  serve_forever (multiprocessing/managers.py:176)
  _run_server (multiprocessing/managers.py:599)
  run (multiprocessing/process.py:108)
  _bootstrap (multiprocessing/process.py:314)
  _launch (multiprocessing/popen_fork.py:71)
  _Popen (multiprocessing/context.py:282)
  start (multiprocessing/managers.py:562)
  Manager (multiprocessing/context.py:57)
  __init__ (.../cncf/kubernetes/executors/kubernetes_executor.py:104)   # self._manager = multiprocessing.Manager()
  load_executor (airflow/executors/executor_loader.py:368)
  get_default_executor (airflow/executors/executor_loader.py:294)
  _get_executor_get_task_log (airflow/utils/log/file_task_handler.py:577)
  _read (airflow/utils/log/file_task_handler.py:637)
  read (airflow/utils/log/file_task_handler.py:767)
  read_log_stream (airflow/utils/log/log_reader.py:132)
  _buffered_ndjson_stream (airflow/api_fastapi/core_api/routes/public/log.py:70)

The relevant call is unconditional for running tasks:

# airflow/utils/log/file_task_handler.py  (_read)
if ti.state == TaskInstanceState.RUNNING:
    executor_get_task_log = self._get_executor_get_task_log(ti)   # -> get_default_executor() -> KubernetesExecutor()
    response = executor_get_task_log(ti, try_number)
# providers/cncf/kubernetes/executors/kubernetes_executor.py  (__init__, ~line 104)
self._manager = multiprocessing.Manager()
self.task_queue   = self._manager.JoinableQueue()
self.result_queue = self._manager.JoinableQueue()

How to reproduce

  1. Deploy Airflow 3.2.1 with executor = KubernetesExecutor and a multi-replica API server (gunicorn).
  2. Start a DAG so a task is in the running state.
  3. Open that running task's logs in the UI (or GET /api/v2/dags/{dag}/dagRuns/{run}/taskInstances/{task}/logs/{n}).
  4. On the API-server pod, observe a new forked process per serving worker:
   kubectl exec <api-server-pod> -c api-server -- ps -eo pid,ppid,rss,args | grep gunicorn
   # a "gunicorn: worker" whose PPID is another worker (not the master); py-spy shows it in
   # multiprocessing managers.serve_forever -> KubernetesExecutor.__init__
  1. It is never reaped; with worker refresh enabled it orphans to PID 1 and accumulates.

What you think should happen instead?

Reading a running task's logs on the API server should not instantiate a full executor with a multiprocessing.Manager() (the API server never runs the executor's scheduling loop). Options:

  • Fetch get_task_log without constructing/initializing the executor's runtime resources (e.g. a lightweight/classmethod path, or lazily create the Manager only when the scheduler actually starts the executor), and/or
  • Ensure any executor instantiated purely for get_task_log is properly shut down (Manager.shutdown() / executor.end()), so no serve_forever process is leaked.

Operating System

Linux (EKS / Debian-based official image)

Deployment

Official Apache Airflow Helm Chart

Apache Airflow Provider(s)

No response

Versions of Apache Airflow Providers

No response

Official Helm Chart version

1.17.0

Kubernetes Version

1.35

Helm Chart configuration

No response

Docker Image customizations

No response

Anything else?

  • Impact scales with worker recycling: each worker_refresh (or max_requests) cycle orphans the per-worker Manager, so leaked Managers grow over time and push the API server toward OOM.
  • Workaround we used: disable worker refresh to cap it at ~1 Manager/worker, and restart the deployment to clear orphans. This bounds but does not eliminate the leak.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:APIAirflow's REST/HTTP APIarea:Executors-coreLocalExecutor & SequentialExecutorkind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetprovider:cncf-kubernetesKubernetes (k8s) provider related issues

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions