Under which category would you file this issue?
Airflow Core
Apache Airflow version
3.2.1
What happened and how to reproduce it?
What happened
When the API server serves logs for a RUNNING task instance, it ends up constructing a KubernetesExecutor (to obtain get_task_log). KubernetesExecutor.__init__ creates a multiprocessing.Manager(), which forks a serve_forever server process. The API server never runs or shuts down this executor, so that Manager process is orphaned and runs forever (~350–400 MB resident each).
Because the executor is cached per process, it's ~1 leaked Manager per gunicorn/uvicorn worker (e.g. up to 4 per API-server pod). It gets worse with gunicorn worker refresh / any worker recycling: when a worker is replaced, its Manager child reparents to PID 1 and is never reaped, so orphaned Managers accumulate over time and contribute to API-server OOMs.
Confirmed with py-spy dump --native of a hung child (the api-server gunicorn: worker proc with PPID = another worker):
Thread (idle): "AnyIO worker thread"
serve_forever (multiprocessing/managers.py:176)
_run_server (multiprocessing/managers.py:599)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:314)
_launch (multiprocessing/popen_fork.py:71)
_Popen (multiprocessing/context.py:282)
start (multiprocessing/managers.py:562)
Manager (multiprocessing/context.py:57)
__init__ (.../cncf/kubernetes/executors/kubernetes_executor.py:104) # self._manager = multiprocessing.Manager()
load_executor (airflow/executors/executor_loader.py:368)
get_default_executor (airflow/executors/executor_loader.py:294)
_get_executor_get_task_log (airflow/utils/log/file_task_handler.py:577)
_read (airflow/utils/log/file_task_handler.py:637)
read (airflow/utils/log/file_task_handler.py:767)
read_log_stream (airflow/utils/log/log_reader.py:132)
_buffered_ndjson_stream (airflow/api_fastapi/core_api/routes/public/log.py:70)
The relevant call is unconditional for running tasks:
# airflow/utils/log/file_task_handler.py (_read)
if ti.state == TaskInstanceState.RUNNING:
executor_get_task_log = self._get_executor_get_task_log(ti) # -> get_default_executor() -> KubernetesExecutor()
response = executor_get_task_log(ti, try_number)
# providers/cncf/kubernetes/executors/kubernetes_executor.py (__init__, ~line 104)
self._manager = multiprocessing.Manager()
self.task_queue = self._manager.JoinableQueue()
self.result_queue = self._manager.JoinableQueue()
How to reproduce
- Deploy Airflow 3.2.1 with
executor = KubernetesExecutor and a multi-replica API server (gunicorn).
- Start a DAG so a task is in the running state.
- Open that running task's logs in the UI (or
GET /api/v2/dags/{dag}/dagRuns/{run}/taskInstances/{task}/logs/{n}).
- On the API-server pod, observe a new forked process per serving worker:
kubectl exec <api-server-pod> -c api-server -- ps -eo pid,ppid,rss,args | grep gunicorn
# a "gunicorn: worker" whose PPID is another worker (not the master); py-spy shows it in
# multiprocessing managers.serve_forever -> KubernetesExecutor.__init__
- It is never reaped; with worker refresh enabled it orphans to PID 1 and accumulates.
What you think should happen instead?
Reading a running task's logs on the API server should not instantiate a full executor with a multiprocessing.Manager() (the API server never runs the executor's scheduling loop). Options:
- Fetch
get_task_log without constructing/initializing the executor's runtime resources (e.g. a lightweight/classmethod path, or lazily create the Manager only when the scheduler actually starts the executor), and/or
- Ensure any executor instantiated purely for
get_task_log is properly shut down (Manager.shutdown() / executor.end()), so no serve_forever process is leaked.
Operating System
Linux (EKS / Debian-based official image)
Deployment
Official Apache Airflow Helm Chart
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
1.17.0
Kubernetes Version
1.35
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
- Impact scales with worker recycling: each
worker_refresh (or max_requests) cycle orphans the per-worker Manager, so leaked Managers grow over time and push the API server toward OOM.
- Workaround we used: disable worker refresh to cap it at ~1
Manager/worker, and restart the deployment to clear orphans. This bounds but does not eliminate the leak.
Are you willing to submit PR?
Code of Conduct
Under which category would you file this issue?
Airflow Core
Apache Airflow version
3.2.1
What happened and how to reproduce it?
What happened
When the API server serves logs for a RUNNING task instance, it ends up constructing a
KubernetesExecutor(to obtainget_task_log).KubernetesExecutor.__init__creates amultiprocessing.Manager(), which forks aserve_foreverserver process. The API server never runs or shuts down this executor, so thatManagerprocess is orphaned and runs forever (~350–400 MB resident each).Because the executor is cached per process, it's ~1 leaked
Managerper gunicorn/uvicorn worker (e.g. up to 4 per API-server pod). It gets worse with gunicorn worker refresh / any worker recycling: when a worker is replaced, itsManagerchild reparents to PID 1 and is never reaped, so orphanedManagers accumulate over time and contribute to API-server OOMs.Confirmed with
py-spy dump --nativeof a hung child (theapi-server gunicorn: workerproc withPPID= another worker):The relevant call is unconditional for running tasks:
How to reproduce
executor = KubernetesExecutorand a multi-replica API server (gunicorn).GET /api/v2/dags/{dag}/dagRuns/{run}/taskInstances/{task}/logs/{n}).What you think should happen instead?
Reading a running task's logs on the API server should not instantiate a full executor with a
multiprocessing.Manager()(the API server never runs the executor's scheduling loop). Options:get_task_logwithout constructing/initializing the executor's runtime resources (e.g. a lightweight/classmethod path, or lazily create theManageronly when the scheduler actually starts the executor), and/orget_task_logis properly shut down (Manager.shutdown()/executor.end()), so noserve_foreverprocess is leaked.Operating System
Linux (EKS / Debian-based official image)
Deployment
Official Apache Airflow Helm Chart
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
1.17.0
Kubernetes Version
1.35
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
worker_refresh(ormax_requests) cycle orphans the per-workerManager, so leakedManagers grow over time and push the API server toward OOM.Manager/worker, and restart the deployment to clear orphans. This bounds but does not eliminate the leak.Are you willing to submit PR?
Code of Conduct