From f19ba770490011c8674ab05a934cc0b786d35bb1 Mon Sep 17 00:00:00 2001 From: AbdulRehman Date: Thu, 18 Jun 2026 13:18:15 +0500 Subject: [PATCH 1/2] Roll back orphaned backfill when run creation fails --- airflow-core/src/airflow/models/backfill.py | 55 +++++++++++-------- .../tests/unit/models/test_backfill.py | 46 +++++++++++++++- 2 files changed, 76 insertions(+), 25 deletions(-) diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index 52019fa5d984b..bc42ff98e9069 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -677,34 +677,41 @@ def _create_backfill( ) session.add(br) session.commit() + backfill_id = br.id - session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one() - - dagrun_info_list = _get_info_list( - from_date=from_date, - to_date=to_date, - reverse=reverse, - dag=dag, - ) - if not dagrun_info_list: - raise RuntimeError(f"No runs to create for Dag {dag_id}") + try: + session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one() - first_info = dagrun_info_list[0] - if first_info.partition_key: - _create_runs_partitioned( - br=br, + dagrun_info_list = _get_info_list( + from_date=from_date, + to_date=to_date, + reverse=reverse, dag=dag, - dagrun_info_list=dagrun_info_list, - session=session, - ) - else: - _create_runs_non_partitioned( - br=br, - dag=dag, - dagrun_info_list=dagrun_info_list, - run_on_latest_version=run_on_latest_version, - session=session, ) + if not dagrun_info_list: + raise RuntimeError(f"No runs to create for Dag {dag_id}") + + first_info = dagrun_info_list[0] + if first_info.partition_key: + _create_runs_partitioned( + br=br, + dag=dag, + dagrun_info_list=dagrun_info_list, + session=session, + ) + else: + _create_runs_non_partitioned( + br=br, + dag=dag, + dagrun_info_list=dagrun_info_list, + run_on_latest_version=run_on_latest_version, + session=session, + ) + except Exception: + session.rollback() + session.execute(sa.delete(Backfill).where(Backfill.id == backfill_id)) + session.commit() + raise return br diff --git a/airflow-core/tests/unit/models/test_backfill.py b/airflow-core/tests/unit/models/test_backfill.py index 538c33a24612d..24b315718ecba 100644 --- a/airflow-core/tests/unit/models/test_backfill.py +++ b/airflow-core/tests/unit/models/test_backfill.py @@ -20,10 +20,11 @@ from contextlib import nullcontext from datetime import datetime, timedelta from typing import TYPE_CHECKING +from unittest import mock import pendulum import pytest -from sqlalchemy import select +from sqlalchemy import func, select from airflow._shared.timezones import timezone from airflow.models import DagModel, DagRun, TaskInstance @@ -106,6 +107,49 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, dag_maker, session): assert b is not None +def test_create_backfill_no_orphan_on_run_creation_failure(dag_maker, session): + """A failure while creating backfill runs must not leave an orphaned Backfill row. + + The Backfill row is created first; if run creation then fails, the row must be + rolled back, otherwise the ``num_active > 0`` check blocks all future backfills + for the dag. + """ + with dag_maker(schedule="@daily") as dag: + PythonOperator(task_id="hi", python_callable=print) + session.commit() + + with mock.patch( + "airflow.models.backfill._create_backfill_dag_run_non_partitioned", + side_effect=RuntimeError("boom"), + ): + with pytest.raises(RuntimeError, match="boom"): + _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-05"), + max_active_runs=2, + reverse=False, + triggering_user_name="pytest", + dag_run_conf={}, + ) + + assert ( + session.scalar(select(func.count()).select_from(Backfill).where(Backfill.dag_id == dag.dag_id)) == 0 + ) + + # A subsequent backfill must not be blocked by a leftover row. + b = _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-05"), + max_active_runs=2, + reverse=False, + triggering_user_name="pytest", + dag_run_conf={}, + ) + assert b is not None + + @pytest.mark.parametrize("reverse", [True, False]) @pytest.mark.parametrize("existing", [["2021-01-02", "2021-01-03"], []]) def test_create_backfill_simple(reverse, existing, dag_maker, session): From ff390ff2efe758f1e3fa836a84cf08c89bd9c33d Mon Sep 17 00:00:00 2001 From: AbdulRehman Date: Thu, 18 Jun 2026 13:18:51 +0500 Subject: [PATCH 2/2] Add newsfragment for backfill rollback fix --- airflow-core/newsfragments/68705.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 airflow-core/newsfragments/68705.bugfix.rst diff --git a/airflow-core/newsfragments/68705.bugfix.rst b/airflow-core/newsfragments/68705.bugfix.rst new file mode 100644 index 0000000000000..5d48af708e134 --- /dev/null +++ b/airflow-core/newsfragments/68705.bugfix.rst @@ -0,0 +1 @@ +A failed backfill creation no longer leaves an orphaned backfill record that blocks all future backfills for the dag. If creating the backfill runs fails, the backfill is now rolled back so it can be retried.