diff --git a/airflow-core/newsfragments/68705.bugfix.rst b/airflow-core/newsfragments/68705.bugfix.rst new file mode 100644 index 0000000000000..5d48af708e134 --- /dev/null +++ b/airflow-core/newsfragments/68705.bugfix.rst @@ -0,0 +1 @@ +A failed backfill creation no longer leaves an orphaned backfill record that blocks all future backfills for the dag. If creating the backfill runs fails, the backfill is now rolled back so it can be retried. diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index 52019fa5d984b..bc42ff98e9069 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -677,34 +677,41 @@ def _create_backfill( ) session.add(br) session.commit() + backfill_id = br.id - session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one() - - dagrun_info_list = _get_info_list( - from_date=from_date, - to_date=to_date, - reverse=reverse, - dag=dag, - ) - if not dagrun_info_list: - raise RuntimeError(f"No runs to create for Dag {dag_id}") + try: + session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one() - first_info = dagrun_info_list[0] - if first_info.partition_key: - _create_runs_partitioned( - br=br, + dagrun_info_list = _get_info_list( + from_date=from_date, + to_date=to_date, + reverse=reverse, dag=dag, - dagrun_info_list=dagrun_info_list, - session=session, - ) - else: - _create_runs_non_partitioned( - br=br, - dag=dag, - dagrun_info_list=dagrun_info_list, - run_on_latest_version=run_on_latest_version, - session=session, ) + if not dagrun_info_list: + raise RuntimeError(f"No runs to create for Dag {dag_id}") + + first_info = dagrun_info_list[0] + if first_info.partition_key: + _create_runs_partitioned( + br=br, + dag=dag, + dagrun_info_list=dagrun_info_list, + session=session, + ) + else: + _create_runs_non_partitioned( + br=br, + dag=dag, + dagrun_info_list=dagrun_info_list, + run_on_latest_version=run_on_latest_version, + session=session, + ) + except Exception: + session.rollback() + session.execute(sa.delete(Backfill).where(Backfill.id == backfill_id)) + session.commit() + raise return br diff --git a/airflow-core/tests/unit/models/test_backfill.py b/airflow-core/tests/unit/models/test_backfill.py index 538c33a24612d..24b315718ecba 100644 --- a/airflow-core/tests/unit/models/test_backfill.py +++ b/airflow-core/tests/unit/models/test_backfill.py @@ -20,10 +20,11 @@ from contextlib import nullcontext from datetime import datetime, timedelta from typing import TYPE_CHECKING +from unittest import mock import pendulum import pytest -from sqlalchemy import select +from sqlalchemy import func, select from airflow._shared.timezones import timezone from airflow.models import DagModel, DagRun, TaskInstance @@ -106,6 +107,49 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, dag_maker, session): assert b is not None +def test_create_backfill_no_orphan_on_run_creation_failure(dag_maker, session): + """A failure while creating backfill runs must not leave an orphaned Backfill row. + + The Backfill row is created first; if run creation then fails, the row must be + rolled back, otherwise the ``num_active > 0`` check blocks all future backfills + for the dag. + """ + with dag_maker(schedule="@daily") as dag: + PythonOperator(task_id="hi", python_callable=print) + session.commit() + + with mock.patch( + "airflow.models.backfill._create_backfill_dag_run_non_partitioned", + side_effect=RuntimeError("boom"), + ): + with pytest.raises(RuntimeError, match="boom"): + _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-05"), + max_active_runs=2, + reverse=False, + triggering_user_name="pytest", + dag_run_conf={}, + ) + + assert ( + session.scalar(select(func.count()).select_from(Backfill).where(Backfill.dag_id == dag.dag_id)) == 0 + ) + + # A subsequent backfill must not be blocked by a leftover row. + b = _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-05"), + max_active_runs=2, + reverse=False, + triggering_user_name="pytest", + dag_run_conf={}, + ) + assert b is not None + + @pytest.mark.parametrize("reverse", [True, False]) @pytest.mark.parametrize("existing", [["2021-01-02", "2021-01-03"], []]) def test_create_backfill_simple(reverse, existing, dag_maker, session):