Skip to content

Fix deferrable BeamRunPythonPipelineOperator fails with 400 when job-id absent from launcher stdout#68720

Open
GayathriSrividya wants to merge 1 commit into
apache:mainfrom
GayathriSrividya:fix/beam-deferrable-dataflow-missing-job-id-68279
Open

Fix deferrable BeamRunPythonPipelineOperator fails with 400 when job-id absent from launcher stdout#68720
GayathriSrividya wants to merge 1 commit into
apache:mainfrom
GayathriSrividya:fix/beam-deferrable-dataflow-missing-job-id-68279

Conversation

@GayathriSrividya

@GayathriSrividya GayathriSrividya commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Closes #68279

Problem

A BeamRunPythonPipelineOperator (or Java variant) with deferrable=True and runner="DataflowRunner" raises:

airflow.exceptions.AirflowException: 400 Request must contain a job and project id.

whenever the Beam launcher subprocess stdout does not contain the Created job with id: [...] line. This happens routinely when the pipeline does not configure INFO-level logging, since the Beam SDK emits that line at INFO while Python's root logger defaults to WARNING.

The synchronous path (deferrable=False) already handles this: DataflowHook.wait_for_done() falls back to resolving the job by name prefix when job_id=None. The deferrable path had no such fallback — it passed job_id=None directly into the trigger, which the Dataflow API immediately rejected.

Fix

Before building the trigger in execute_on_dataflow(), if self.dataflow_job_id is still None after the launcher finishes, call a new DataflowHook.get_job_id_by_name() helper that resolves the ID via the Dataflow REST API by name prefix. This mirrors the existing synchronous fallback so both paths behave consistently.

The fix applies to both BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator.

Changes

  • providers/google/.../hooks/dataflow.py: add DataflowHook.get_job_id_by_name() — looks up the most recently submitted job whose name starts with the given prefix and returns its ID.
  • providers/apache/beam/.../operators/beam.py: in execute_on_dataflow() for both Python and Java operators, call the new helper when dataflow_job_id is None before deferring.
  • providers/apache/beam/tests/.../test_beam.py: add test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing for both operators, asserting get_job_id_by_name is called and the trigger carries the resolved ID.

…auncher stdout

When a DataflowRunner pipeline does not configure INFO-level logging the Beam
SDK's 'Created job with id: [...]' line is suppressed, so JOB_ID_PATTERN never
matches and self.dataflow_job_id stays None.  The synchronous wait_for_done()
path already handles this by resolving the job by name prefix; the deferrable
path did not, so it deferred with job_id=None and the Dataflow API immediately
rejected the trigger with '400 Request must contain a job and project id.'

Fix: before constructing the trigger in execute_on_dataflow() for both
BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator, if
dataflow_job_id is still None call a new DataflowHook.get_job_id_by_name()
helper that looks up the job by name prefix via the Dataflow REST API.  This
mirrors the existing synchronous fallback and ensures the trigger always
receives a valid ID.

Closes apache#68279
@boring-cyborg boring-cyborg Bot added area:providers provider:apache-beam provider:google Google (including GCP) related issues labels Jun 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

1 participant