OpenLineage: add execute_in_thread to emit task events without forking#68708
OpenLineage: add execute_in_thread to emit task events without forking#68708mobuchowski wants to merge 4 commits into
Conversation
By default the OpenLineage listener emits each task-level event from a forked child process (os.fork() with no exec). That child inherits the task runner's connection to the Airflow supervisor; if the child's event emission blocks (e.g. a slow or unreachable lineage backend), the inherited connection can be left in a state that prevents the task from being marked complete, leaving it stuck in the `running` state. Add an opt-in `[openlineage] execute_in_thread` option (default False). When enabled, task-level emission runs in a time-bounded daemon thread instead of forking: nothing is inherited, so a blocked emission can never strand the task, and the task runner waits at most `[openlineage] execution_timeout` for emission before proceeding. Metadata extraction still runs in-process with full access to the task runtime, so Operators whose extractors resolve Connections, Variables or XComs keep working. The default (fork) path is unchanged. Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
There was a problem hiding this comment.
The PR adds a well-motivated opt-in thread-based emission path that cleanly solves the supervisor-connection hang for most cases, and the default fork path is unchanged. Two nits (see inline) document genuine semantic differences between fork and thread isolation that slightly qualify the "can never block" guarantee in the docstring — but neither causes a runtime failure under normal conditions.
|
Can we add some small paragraph about this in |
kacpermuda
left a comment
There was a problem hiding this comment.
Left few nit comments, looks solid otherwise - clean opt-in allowing a different execution mode, that can prevent some rare bug.
|
Adding a data point from the API server side, since this fork behavior isn't only a scheduler concern. On Airflow 3.2.1 with apache-airflow-providers-openlineage==2.14.0 (pre-#65677), manual task-instance state changes via the REST #65677 helps the manual-state-change path (routes it through the ProcessPoolExecutor instead of a raw fork), but (a) that still forks a pool from the multithreaded async server, and (b) the natural-lifecycle handlers still use use_fork=True. So thread-based emission (this issue) is the cleaner fit for async contexts like the api-server, where os.fork() is fundamentally We worked around it by disabling OpenLineage on the api-server (no transport configured there anyway), but big +1 for |
|
@abhipalsingh we should not run any extraction on API server, so I think fix should be separate - to not fork anyway; but do it not behind the configuration flag. I think it should be closer to what we do in the scheduler with ProcessPoolExecutor maybe. |
…nfig The update-providers-build-files prek hook regenerates get_provider_info.py from provider.yaml. Adding execute_in_thread to provider.yaml requires the generated file to be updated in sync. Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
…avior The thread path also reaches the Airflow supervisor during in-process metadata extraction, through the shared SUPERVISOR_COMMS threading lock. That lock prevents the byte interleaving that corrupted the protocol under fork, but it does mean the task runner can briefly wait on the lock while an abandoned emission thread finishes a round trip. Reword the config docs, docstring, and inline comment to describe this accurately instead of claiming emission can "never block" the task runner. Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
The update-providers-build-files generator preserves provider.yaml option order (execution_timeout then execute_in_thread), so place the generated entry accordingly to keep the file in sync with the generator output. Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Why
By default the OpenLineage listener emits each task-level event from a forked child process (os.fork() with no exec). That child inherits the task runner's connection to the Airflow supervisor; if there are issues with that connection - it can hang.
There were a bunch of issues with that fixed in 3.2.x release - like #66574 #65714 #66573 #67115 #66572 - but we still support earlier ones like 3.1.x line.
What
Add an opt-in
[openlineage] execute_in_threadoption (default False). When enabled, task-level emission runs in a time-bounded daemon thread instead of forking: nothing is inherited, so a blocked emission can never strand the task, and the task runner waits at most[openlineage] execution_timeoutfor emission before proceeding. Metadata extraction still runs in-process with full access to the task runtime, so Operators whose extractors resolve Connections, Variables or XComs keep working.The default (fork) path is unchanged.
Verified on AWS MWAA and custom GKE Airflow environment.
Was generative AI tooling used to co-author this PR?
Generated-by: [Claude Code] following the guidelines