Skip to content

Add partition clear support to REST API to match the CLI#68702

Open
Lee-W wants to merge 2 commits into
apache:mainfrom
astronomer:partition-clear-rest-api
Open

Add partition clear support to REST API to match the CLI#68702
Lee-W wants to merge 2 commits into
apache:mainfrom
astronomer:partition-clear-rest-api

Conversation

@Lee-W

@Lee-W Lee-W commented Jun 18, 2026

Copy link
Copy Markdown
Member
Why
  • clearDagRuns could only select runs by an explicit list — no clearing a partitioned Dag by partition_key or date window, which the CLI already does.
  • No REST equivalent of airflow partitions clear (resetting partition_key/partition_date).
  • So UI/API clients had to fall back to the CLI for any partition-aware clear.
What
  • POST /dags/{dag_id}/clearDagRuns: BulkDAGRunClearBody now takes partition_key or a partition_date_start/partition_date_end window as an alternative to an explicit run list (exactly one selector; ~ dag_id rejected). Also closes an authz gap where an empty dag_runs list skipped the access check.
  • New POST /dags/{dag_id}/clearPartitions: resets partition_key/partition_date on matching runs (mirrors airflow partitions clear), with optional clear_task_instances; dry_run returns counts.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: [Claude] following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added the area:API Airflow's REST/HTTP API label Jun 18, 2026
@Lee-W Lee-W force-pushed the partition-clear-rest-api branch 2 times, most recently from d5b88c4 to a49ae18 Compare June 18, 2026 13:11
@Lee-W Lee-W changed the title API: Add partition clear support to REST API to match the CLI Add partition clear support to REST API to match the CLI Jun 18, 2026
@Lee-W Lee-W added the backport-to-v3-3-test Backport to v3-3-test label Jun 18, 2026
@Lee-W Lee-W marked this pull request as ready for review June 18, 2026 13:36
@Lee-W Lee-W force-pushed the partition-clear-rest-api branch 2 times, most recently from 531a2d8 to aa09898 Compare June 18, 2026 15:33
clearDagRuns now accepts partition_key / partition_date window selectors
as an alternative to an explicit run list. Add POST /dags/{dag_id}/clearPartitions
to reset partition_key/partition_date on matching runs, with optional
task-instance clear — REST parity with `airflow dags clear` / `airflow partitions clear`.
@Lee-W Lee-W force-pushed the partition-clear-rest-api branch from aa09898 to 292ce16 Compare June 19, 2026 01:12

@pierrejeambrun pierrejeambrun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions / fix.

Also the date-window semantics here don't actually match airflow partitions clear, which the PR aims to mirror. The CLI compares raw values (partition_date >= start / <= end, partition_command.py:100-103), while this resolves calendar-day bounds (>= resolve_day_bound(start) / < resolve_day_bound(end+1day)). For the same inputs the two clear different runs at the end boundary (and across timezones). Your calendar-day version actually matches the documented "inclusive, calendar-day granular" intent better than the CLI's <= end does — so I think the right fix is to make the CLI use the same resolve_day_bound logic (or share a helper), rather than leave the two interfaces inconsistent. Worth confirming the intended boundary behavior either way.

Comment on lines +282 to +326
run_id: str | None = Field(
default=None,
description="Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window.",
)
partition_key: str | None = Field(
default=None,
description="Select runs by exact partition key match. Mutually exclusive with ``run_id`` and partition date window.",
)
partition_date_start: datetime | None = Field(
default=None,
description="Inclusive start of the partition date window (calendar-day granular). Mutually exclusive with ``run_id`` and ``partition_key``.",
)
partition_date_end: datetime | None = Field(
default=None,
description="Inclusive end of the partition date window (calendar-day granular). Mutually exclusive with ``run_id`` and ``partition_key``.",
)
clear_task_instances: bool = Field(
default=False,
description="Also clear task instances on the matched runs.",
)
dry_run: bool = Field(
default=True,
description="If True, compute counts without writing any changes.",
)

@model_validator(mode="after")
def validate_exactly_one_selector(self) -> ClearPartitionsBody:
has_run_id = self.run_id is not None
has_partition_key = self.partition_key is not None
has_partition_date_window = (
self.partition_date_start is not None or self.partition_date_end is not None
)
selectors_active = sum([has_run_id, has_partition_key, has_partition_date_window])
if selectors_active != 1:
raise ValueError(
"Exactly one of run_id, partition_key, or a partition date window "
"(partition_date_start / partition_date_end) must be provided."
)
if (
self.partition_date_start is not None
and self.partition_date_end is not None
and self.partition_date_start > self.partition_date_end
):
raise ValueError("partition_date_start must be on or before partition_date_end.")
return self

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some fields and the validator are completely duplicated with BulkDAGRunClearBody. Can we factorize this with a base class ?

Comment on lines +826 to +834
partition_selectors_present = (
body.partition_key is not None
or body.partition_date_start is not None
or body.partition_date_end is not None
)
if not body.dag_runs and partition_selectors_present:
if dag_id and dag_id != "~":
entity_methods.append((dag_id, "PUT"))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that necessary? It doesn't appear like this code path is hit in the code mentioned

flushed = 0
if ti_buffer_run_ids:
chunk_tis = list(
session.scalars(select(TaskInstance).where(TaskInstance.run_id.in_(ti_buffer_run_ids)))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably add a safeguard on the dag_id here too. because run_id is unique per dag id, not cross dags.

Comment on lines +227 to +228
run_tis = list(session.scalars(select(TaskInstance).where(TaskInstance.run_id == run.run_id)))
tis_dry_total += len(run_tis)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scales the number of query with the number of run (the size of the partition).

Is is possible to replace this with one call instead?

Comment on lines +180 to +186
stmt = stmt.where(or_(DagRun.partition_key.is_not(None), DagRun.partition_date.is_not(None)))
if body.partition_date_start is not None:
lower = dag.timetable.resolve_day_bound(body.partition_date_start.date())
stmt = stmt.where(DagRun.partition_date >= lower)
if body.partition_date_end is not None:
upper = dag.timetable.resolve_day_bound(body.partition_date_end.date() + timedelta(days=1))
stmt = stmt.where(DagRun.partition_date < upper)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partition-selector + resolve_day_bound date-window logic is now duplicated between clear_dag_runs (partition branch) and clear_partition_fields. They differ slightly (one selects DagRun.run_id, the other DagRun), but the window resolution is identical — worth extracting a small shared helper so the two can't drift?

Comment on lines +826 to +830
partition_selectors_present = (
body.partition_key is not None
or body.partition_date_start is not None
or body.partition_date_end is not None
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract this into a BulkDAGRunClearBody method / computed field to avoid repeating this here and in partition_mode above.

@Lee-W Lee-W added this to the Airflow 3.3.0 milestone Jun 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API backport-to-v3-3-test Backport to v3-3-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants