Add partition clear support to REST API to match the CLI#68702
Conversation
d5b88c4 to
a49ae18
Compare
531a2d8 to
aa09898
Compare
clearDagRuns now accepts partition_key / partition_date window selectors
as an alternative to an explicit run list. Add POST /dags/{dag_id}/clearPartitions
to reset partition_key/partition_date on matching runs, with optional
task-instance clear — REST parity with `airflow dags clear` / `airflow partitions clear`.
aa09898 to
292ce16
Compare
There was a problem hiding this comment.
A few suggestions / fix.
Also the date-window semantics here don't actually match airflow partitions clear, which the PR aims to mirror. The CLI compares raw values (partition_date >= start / <= end, partition_command.py:100-103), while this resolves calendar-day bounds (>= resolve_day_bound(start) / < resolve_day_bound(end+1day)). For the same inputs the two clear different runs at the end boundary (and across timezones). Your calendar-day version actually matches the documented "inclusive, calendar-day granular" intent better than the CLI's <= end does — so I think the right fix is to make the CLI use the same resolve_day_bound logic (or share a helper), rather than leave the two interfaces inconsistent. Worth confirming the intended boundary behavior either way.
| run_id: str | None = Field( | ||
| default=None, | ||
| description="Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window.", | ||
| ) | ||
| partition_key: str | None = Field( | ||
| default=None, | ||
| description="Select runs by exact partition key match. Mutually exclusive with ``run_id`` and partition date window.", | ||
| ) | ||
| partition_date_start: datetime | None = Field( | ||
| default=None, | ||
| description="Inclusive start of the partition date window (calendar-day granular). Mutually exclusive with ``run_id`` and ``partition_key``.", | ||
| ) | ||
| partition_date_end: datetime | None = Field( | ||
| default=None, | ||
| description="Inclusive end of the partition date window (calendar-day granular). Mutually exclusive with ``run_id`` and ``partition_key``.", | ||
| ) | ||
| clear_task_instances: bool = Field( | ||
| default=False, | ||
| description="Also clear task instances on the matched runs.", | ||
| ) | ||
| dry_run: bool = Field( | ||
| default=True, | ||
| description="If True, compute counts without writing any changes.", | ||
| ) | ||
|
|
||
| @model_validator(mode="after") | ||
| def validate_exactly_one_selector(self) -> ClearPartitionsBody: | ||
| has_run_id = self.run_id is not None | ||
| has_partition_key = self.partition_key is not None | ||
| has_partition_date_window = ( | ||
| self.partition_date_start is not None or self.partition_date_end is not None | ||
| ) | ||
| selectors_active = sum([has_run_id, has_partition_key, has_partition_date_window]) | ||
| if selectors_active != 1: | ||
| raise ValueError( | ||
| "Exactly one of run_id, partition_key, or a partition date window " | ||
| "(partition_date_start / partition_date_end) must be provided." | ||
| ) | ||
| if ( | ||
| self.partition_date_start is not None | ||
| and self.partition_date_end is not None | ||
| and self.partition_date_start > self.partition_date_end | ||
| ): | ||
| raise ValueError("partition_date_start must be on or before partition_date_end.") | ||
| return self |
There was a problem hiding this comment.
Some fields and the validator are completely duplicated with BulkDAGRunClearBody. Can we factorize this with a base class ?
| partition_selectors_present = ( | ||
| body.partition_key is not None | ||
| or body.partition_date_start is not None | ||
| or body.partition_date_end is not None | ||
| ) | ||
| if not body.dag_runs and partition_selectors_present: | ||
| if dag_id and dag_id != "~": | ||
| entity_methods.append((dag_id, "PUT")) | ||
|
|
There was a problem hiding this comment.
Why is that necessary? It doesn't appear like this code path is hit in the code mentioned
| flushed = 0 | ||
| if ti_buffer_run_ids: | ||
| chunk_tis = list( | ||
| session.scalars(select(TaskInstance).where(TaskInstance.run_id.in_(ti_buffer_run_ids))) |
There was a problem hiding this comment.
Probably add a safeguard on the dag_id here too. because run_id is unique per dag id, not cross dags.
| run_tis = list(session.scalars(select(TaskInstance).where(TaskInstance.run_id == run.run_id))) | ||
| tis_dry_total += len(run_tis) |
There was a problem hiding this comment.
This scales the number of query with the number of run (the size of the partition).
Is is possible to replace this with one call instead?
| stmt = stmt.where(or_(DagRun.partition_key.is_not(None), DagRun.partition_date.is_not(None))) | ||
| if body.partition_date_start is not None: | ||
| lower = dag.timetable.resolve_day_bound(body.partition_date_start.date()) | ||
| stmt = stmt.where(DagRun.partition_date >= lower) | ||
| if body.partition_date_end is not None: | ||
| upper = dag.timetable.resolve_day_bound(body.partition_date_end.date() + timedelta(days=1)) | ||
| stmt = stmt.where(DagRun.partition_date < upper) |
There was a problem hiding this comment.
The partition-selector + resolve_day_bound date-window logic is now duplicated between clear_dag_runs (partition branch) and clear_partition_fields. They differ slightly (one selects DagRun.run_id, the other DagRun), but the window resolution is identical — worth extracting a small shared helper so the two can't drift?
| partition_selectors_present = ( | ||
| body.partition_key is not None | ||
| or body.partition_date_start is not None | ||
| or body.partition_date_end is not None | ||
| ) |
There was a problem hiding this comment.
Extract this into a BulkDAGRunClearBody method / computed field to avoid repeating this here and in partition_mode above.
Why
clearDagRunscould only select runs by an explicit list — no clearing a partitioned Dag bypartition_keyor date window, which the CLI already does.airflow partitions clear(resettingpartition_key/partition_date).What
POST /dags/{dag_id}/clearDagRuns:BulkDAGRunClearBodynow takespartition_keyor apartition_date_start/partition_date_endwindow as an alternative to an explicit run list (exactly one selector;~dag_id rejected). Also closes an authz gap where an emptydag_runslist skipped the access check.POST /dags/{dag_id}/clearPartitions: resetspartition_key/partition_dateon matching runs (mirrorsairflow partitions clear), with optionalclear_task_instances;dry_runreturns counts.Was generative AI tooling used to co-author this PR?
Generated-by: [Claude] following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.