Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68705.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A failed backfill creation no longer leaves an orphaned backfill record that blocks all future backfills for the dag. If creating the backfill runs fails, the backfill is now rolled back so it can be retried.
55 changes: 31 additions & 24 deletions airflow-core/src/airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,34 +677,41 @@ def _create_backfill(
)
session.add(br)
session.commit()
backfill_id = br.id

session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one()

dagrun_info_list = _get_info_list(
from_date=from_date,
to_date=to_date,
reverse=reverse,
dag=dag,
)
if not dagrun_info_list:
raise RuntimeError(f"No runs to create for Dag {dag_id}")
try:
session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one()

first_info = dagrun_info_list[0]
if first_info.partition_key:
_create_runs_partitioned(
br=br,
dagrun_info_list = _get_info_list(
from_date=from_date,
to_date=to_date,
reverse=reverse,
dag=dag,
dagrun_info_list=dagrun_info_list,
session=session,
)
else:
_create_runs_non_partitioned(
br=br,
dag=dag,
dagrun_info_list=dagrun_info_list,
run_on_latest_version=run_on_latest_version,
session=session,
)
if not dagrun_info_list:
raise RuntimeError(f"No runs to create for Dag {dag_id}")

first_info = dagrun_info_list[0]
if first_info.partition_key:
_create_runs_partitioned(
br=br,
dag=dag,
dagrun_info_list=dagrun_info_list,
session=session,
)
else:
_create_runs_non_partitioned(
br=br,
dag=dag,
dagrun_info_list=dagrun_info_list,
run_on_latest_version=run_on_latest_version,
session=session,
)
except Exception:
session.rollback()
session.execute(sa.delete(Backfill).where(Backfill.id == backfill_id))
session.commit()
raise
return br


Expand Down
46 changes: 45 additions & 1 deletion airflow-core/tests/unit/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
from contextlib import nullcontext
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from unittest import mock

import pendulum
import pytest
from sqlalchemy import select
from sqlalchemy import func, select

from airflow._shared.timezones import timezone
from airflow.models import DagModel, DagRun, TaskInstance
Expand Down Expand Up @@ -106,6 +107,49 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, dag_maker, session):
assert b is not None


def test_create_backfill_no_orphan_on_run_creation_failure(dag_maker, session):
"""A failure while creating backfill runs must not leave an orphaned Backfill row.

The Backfill row is created first; if run creation then fails, the row must be
rolled back, otherwise the ``num_active > 0`` check blocks all future backfills
for the dag.
"""
with dag_maker(schedule="@daily") as dag:
PythonOperator(task_id="hi", python_callable=print)
session.commit()

with mock.patch(
"airflow.models.backfill._create_backfill_dag_run_non_partitioned",
side_effect=RuntimeError("boom"),
):
with pytest.raises(RuntimeError, match="boom"):
_create_backfill(
dag_id=dag.dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-05"),
max_active_runs=2,
reverse=False,
triggering_user_name="pytest",
dag_run_conf={},
)

assert (
session.scalar(select(func.count()).select_from(Backfill).where(Backfill.dag_id == dag.dag_id)) == 0
)

# A subsequent backfill must not be blocked by a leftover row.
b = _create_backfill(
dag_id=dag.dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-05"),
max_active_runs=2,
reverse=False,
triggering_user_name="pytest",
dag_run_conf={},
)
assert b is not None


@pytest.mark.parametrize("reverse", [True, False])
@pytest.mark.parametrize("existing", [["2021-01-02", "2021-01-03"], []])
def test_create_backfill_simple(reverse, existing, dag_maker, session):
Expand Down
Loading