Fix deferrable BeamRunPythonPipelineOperator fails with 400 when job-id absent from launcher stdout#68720
Open
GayathriSrividya wants to merge 1 commit into
Conversation
…auncher stdout When a DataflowRunner pipeline does not configure INFO-level logging the Beam SDK's 'Created job with id: [...]' line is suppressed, so JOB_ID_PATTERN never matches and self.dataflow_job_id stays None. The synchronous wait_for_done() path already handles this by resolving the job by name prefix; the deferrable path did not, so it deferred with job_id=None and the Dataflow API immediately rejected the trigger with '400 Request must contain a job and project id.' Fix: before constructing the trigger in execute_on_dataflow() for both BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator, if dataflow_job_id is still None call a new DataflowHook.get_job_id_by_name() helper that looks up the job by name prefix via the Dataflow REST API. This mirrors the existing synchronous fallback and ensures the trigger always receives a valid ID. Closes apache#68279
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #68279
Problem
A
BeamRunPythonPipelineOperator(or Java variant) withdeferrable=Trueandrunner="DataflowRunner"raises:whenever the Beam launcher subprocess stdout does not contain the
Created job with id: [...]line. This happens routinely when the pipeline does not configure INFO-level logging, since the Beam SDK emits that line at INFO while Python's root logger defaults to WARNING.The synchronous path (
deferrable=False) already handles this:DataflowHook.wait_for_done()falls back to resolving the job by name prefix whenjob_id=None. The deferrable path had no such fallback — it passedjob_id=Nonedirectly into the trigger, which the Dataflow API immediately rejected.Fix
Before building the trigger in
execute_on_dataflow(), ifself.dataflow_job_idis stillNoneafter the launcher finishes, call a newDataflowHook.get_job_id_by_name()helper that resolves the ID via the Dataflow REST API by name prefix. This mirrors the existing synchronous fallback so both paths behave consistently.The fix applies to both
BeamRunPythonPipelineOperatorandBeamRunJavaPipelineOperator.Changes
providers/google/.../hooks/dataflow.py: addDataflowHook.get_job_id_by_name()— looks up the most recently submitted job whose name starts with the given prefix and returns its ID.providers/apache/beam/.../operators/beam.py: inexecute_on_dataflow()for both Python and Java operators, call the new helper whendataflow_job_id is Nonebefore deferring.providers/apache/beam/tests/.../test_beam.py: addtest_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missingfor both operators, assertingget_job_id_by_nameis called and the trigger carries the resolved ID.