Skip to content

OpenLineage: add execute_in_thread to emit task events without forking#68708

Open
mobuchowski wants to merge 4 commits into
apache:mainfrom
mobuchowski:openlineage-execute-in-thread
Open

OpenLineage: add execute_in_thread to emit task events without forking#68708
mobuchowski wants to merge 4 commits into
apache:mainfrom
mobuchowski:openlineage-execute-in-thread

Conversation

@mobuchowski

@mobuchowski mobuchowski commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Why

By default the OpenLineage listener emits each task-level event from a forked child process (os.fork() with no exec). That child inherits the task runner's connection to the Airflow supervisor; if there are issues with that connection - it can hang.

There were a bunch of issues with that fixed in 3.2.x release - like #66574 #65714 #66573 #67115 #66572 - but we still support earlier ones like 3.1.x line.

What

Add an opt-in [openlineage] execute_in_thread option (default False). When enabled, task-level emission runs in a time-bounded daemon thread instead of forking: nothing is inherited, so a blocked emission can never strand the task, and the task runner waits at most [openlineage] execution_timeout for emission before proceeding. Metadata extraction still runs in-process with full access to the task runtime, so Operators whose extractors resolve Connections, Variables or XComs keep working.

The default (fork) path is unchanged.

Verified on AWS MWAA and custom GKE Airflow environment.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: [Claude Code] following the guidelines

By default the OpenLineage listener emits each task-level event from a
forked child process (os.fork() with no exec). That child inherits the
task runner's connection to the Airflow supervisor; if the child's event
emission blocks (e.g. a slow or unreachable lineage backend), the
inherited connection can be left in a state that prevents the task from
being marked complete, leaving it stuck in the `running` state.

Add an opt-in `[openlineage] execute_in_thread` option (default False).
When enabled, task-level emission runs in a time-bounded daemon thread
instead of forking: nothing is inherited, so a blocked emission can never
strand the task, and the task runner waits at most
`[openlineage] execution_timeout` for emission before proceeding.
Metadata extraction still runs in-process with full access to the task
runtime, so Operators whose extractors resolve Connections, Variables or
XComs keep working.

The default (fork) path is unchanged.

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>

@kacpermuda kacpermuda left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR adds a well-motivated opt-in thread-based emission path that cleanly solves the supervisor-connection hang for most cases, and the default fork path is unchanged. Two nits (see inline) document genuine semantic differences between fork and thread isolation that slightly qualify the "can never block" guarantee in the docstring — but neither causes a runtime failure under normal conditions.

Comment thread providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Outdated
@kacpermuda

Copy link
Copy Markdown
Collaborator

Can we add some small paragraph about this in providers/openlineage/docs/troubleshooting.rst ? Can be helpful if we ever hit that again, to know what config to use. Also, let's clearly mention it there that it was observed on AF3.1, and may not be the case on other airflow versions since 3.2 has applied some fixes already

@kacpermuda kacpermuda left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few nit comments, looks solid otherwise - clean opt-in allowing a different execution mode, that can prevent some rare bug.

@abhipalsingh

Copy link
Copy Markdown

Adding a data point from the API server side, since this fork behavior isn't only a scheduler concern.

On Airflow 3.2.1 with apache-airflow-providers-openlineage==2.14.0 (pre-#65677), manual task-instance state changes via the REST
API (mark success/failed/skipped, clear) fire on_task_instance_* on the api-server — a multithreaded async process
(FastAPI/uvicorn under gunicorn). The listener's _fork_execute calls os.fork() from that multithreaded worker; a fraction of
children deadlock immediately on an inherited lock (py-spy showed them parked in futex_wait_queue, never reaching the post-fork setproctitle) and are never reaped → ~350–400 MB each → unbounded accumulation → api-server OOM.

#65677 helps the manual-state-change path (routes it through the ProcessPoolExecutor instead of a raw fork), but (a) that still forks a pool from the multithreaded async server, and (b) the natural-lifecycle handlers still use use_fork=True. So thread-based emission (this issue) is the cleaner fit for async contexts like the api-server, where os.fork() is fundamentally
unsafe.

We worked around it by disabling OpenLineage on the api-server (no transport configured there anyway), but big +1 for
execute_in_thread as the general fix.

@mobuchowski

Copy link
Copy Markdown
Contributor Author

@abhipalsingh we should not run any extraction on API server, so I think fix should be separate - to not fork anyway; but do it not behind the configuration flag.

I think it should be closer to what we do in the scheduler with ProcessPoolExecutor maybe.

…nfig

The update-providers-build-files prek hook regenerates get_provider_info.py
from provider.yaml. Adding execute_in_thread to provider.yaml requires the
generated file to be updated in sync.

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
…avior

The thread path also reaches the Airflow supervisor during in-process
metadata extraction, through the shared SUPERVISOR_COMMS threading lock.
That lock prevents the byte interleaving that corrupted the protocol under
fork, but it does mean the task runner can briefly wait on the lock while an
abandoned emission thread finishes a round trip. Reword the config docs,
docstring, and inline comment to describe this accurately instead of
claiming emission can "never block" the task runner.

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
The update-providers-build-files generator preserves provider.yaml option
order (execution_timeout then execute_in_thread), so place the generated
entry accordingly to keep the file in sync with the generator output.

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants