From 4cac9a55190e2446a2c90be031d8edca740c072f Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Thu, 18 Jun 2026 11:35:18 +0800 Subject: [PATCH] Add airflowctl dags clear command --- airflow-ctl/docs/images/command_hashes.txt | 4 +- airflow-ctl/docs/images/output_dagrun.svg | 74 ++--- airflow-ctl/docs/images/output_dags.svg | 134 ++++----- airflow-ctl/src/airflowctl/api/operations.py | 108 ++++++- airflow-ctl/src/airflowctl/ctl/cli_config.py | 64 ++++- .../airflowctl/ctl/commands/dag_command.py | 149 +++++++++- .../src/airflowctl/ctl/help_texts.yaml | 1 + .../tests/airflow_ctl/api/test_operations.py | 76 +++++ .../ctl/commands/test_dag_command.py | 265 ++++++++++++++++++ 9 files changed, 763 insertions(+), 112 deletions(-) diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 53c93e7546d1e..ddcf98428f6df 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -4,8 +4,8 @@ auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 config:a3d936cb15fe3b547bf6c82cf93d923f connections:942f9f88cb908c28bf5c19159fc5065b -dags:6b38e6bcd491bc1941e7814b77e63bde -dagrun:c32e0011aa9a845456c778786717208e +dags:b6e20d6088a503f5c9fb999ecfa2bba3 +dagrun:61b4a64027651b3b61bb1445cdcec623 jobs:a5b644c5da8889443bb40ee10b599270 pools:19efe105b9515ab1926ebcaf0e028d71 providers:34502fe09dc0b8b0a13e7e46efdffda6 diff --git a/airflow-ctl/docs/images/output_dagrun.svg b/airflow-ctl/docs/images/output_dagrun.svg index 48e96c894208e..ec75de92702cf 100644 --- a/airflow-ctl/docs/images/output_dagrun.svg +++ b/airflow-ctl/docs/images/output_dagrun.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + + + + + + + - + - + - - Usage:airflowctl dagrun [-hCOMMAND... - -Perform DagRun operations - -Positional Arguments: -COMMAND -getRetrieve a Dag run by Dag ID and run ID -listList Dag runs, optionally filtered by state and date range - -Options: --h--helpshow this help message and exit + + Usage:airflowctl dagrun [-hCOMMAND... + +Perform DagRun operations + +Positional Arguments: +COMMAND +-clear-task-instances +getRetrieve a Dag run by Dag ID and run ID +listList Dag runs, optionally filtered by state and  +date range + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/docs/images/output_dags.svg b/airflow-ctl/docs/images/output_dags.svg index 7f95b26bdbfe0..100a4de4963a5 100644 --- a/airflow-ctl/docs/images/output_dags.svg +++ b/airflow-ctl/docs/images/output_dags.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + - + - + - - Usage:airflowctl dags [-hCOMMAND... - -Perform Dags operations - -Positional Arguments: -COMMAND -deleteDelete a Dag by its ID -getRetrieve a Dag by its ID -get-detailsRetrieve detailed information for a Dag -get-import-errorRetrieve a Dag import error by its ID -get-statsRetrieve run statistics for one or more Dags -get-tagsList all tags used across Dags -get-versionRetrieve a specific version of a Dag -listList all Dags -list-import-errors -List all Dag import errors -list-versionList all versions of a Dag -list-warningList all Dag warnings -next-executionShow the next scheduled execution time for a Dag -pausePause a Dag -triggerTrigger a new Dag run -unpauseUnpause a Dag -updateUpdate properties of a Dag - -Options: --h--helpshow this help message and exit + + Usage:airflowctl dags [-hCOMMAND... + +Perform Dags operations + +Positional Arguments: +COMMAND +clearClear task instances for Dag runs selected by run ID, +partition key, or partition date +deleteDelete a Dag by its ID +getRetrieve a Dag by its ID +get-detailsRetrieve detailed information for a Dag +get-import-errorRetrieve a Dag import error by its ID +get-statsRetrieve run statistics for one or more Dags +get-tagsList all tags used across Dags +get-versionRetrieve a specific version of a Dag +listList all Dags +list-import-errors +List all Dag import errors +list-versionList all versions of a Dag +list-warningList all Dag warnings +next-executionShow the next scheduled execution time for a Dag +pausePause a Dag +triggerTrigger a new Dag run +unpauseUnpause a Dag +updateUpdate properties of a Dag + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index e250b66e127dd..63ab3ad60b97c 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -39,6 +39,7 @@ BulkBodyPoolBody, BulkBodyVariableBody, BulkResponse, + ClearTaskInstancesBody, Config, ConnectionBody, ConnectionCollectionResponse, @@ -68,6 +69,7 @@ ProviderCollectionResponse, QueuedEventCollectionResponse, QueuedEventResponse, + TaskInstanceCollectionResponse, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -522,7 +524,7 @@ def get(self, dag_id: str) -> DAGResponse | ServerResponseError: raise e def get_details(self, dag_id: str) -> DAGDetailsResponse | ServerResponseError: - """Get a DAG details.""" + """Get a Dag details.""" try: self.response = self.client.get(f"dags/{dag_id}/details") return DAGDetailsResponse.model_validate_json(self.response.content) @@ -530,11 +532,11 @@ def get_details(self, dag_id: str) -> DAGDetailsResponse | ServerResponseError: raise e def get_tags(self) -> DAGTagCollectionResponse | ServerResponseError: - """Get all DAG tags.""" + """Get all Dag tags.""" return super().execute_list(path="dagTags", data_model=DAGTagCollectionResponse) def list(self) -> DAGCollectionResponse | ServerResponseError: - """List DAGs.""" + """List Dags.""" return super().execute_list(path="dags", data_model=DAGCollectionResponse) def update(self, dag_id: str, dag_body: DAGPatchBody) -> DAGResponse | ServerResponseError: @@ -586,7 +588,7 @@ def list_warning(self) -> DAGWarningCollectionResponse | ServerResponseError: def trigger( self, dag_id: str, trigger_dag_run: TriggerDAGRunPostBody ) -> DAGRunResponse | ServerResponseError: - """Create a dag run.""" + """Create a Dag run.""" if trigger_dag_run.conf is None: trigger_dag_run.conf = {} try: @@ -613,31 +615,91 @@ def list( self, state: str | None = None, limit: int = 100, + offset: int | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, + logical_date_gte: datetime.datetime | None = None, + logical_date_gt: datetime.datetime | None = None, + logical_date_lte: datetime.datetime | None = None, + logical_date_lt: datetime.datetime | None = None, + partition_date_gte: datetime.datetime | None = None, + partition_date_gt: datetime.datetime | None = None, + partition_date_lte: datetime.datetime | None = None, + partition_date_lt: datetime.datetime | None = None, + partition_date_start: datetime.date | None = None, + partition_date_end: datetime.date | None = None, + order_by: str | None = None, + run_id_pattern: str | None = None, + partition_key_pattern: str | None = None, + partition_key_prefix_pattern: str | None = None, dag_id: str | None = None, ) -> DAGRunCollectionResponse | ServerResponseError: """ - List dag runs (at most `limit` results). + List Dag runs (at most `limit` results). Args: - state: Filter dag runs by state (optional; no filter applied when omitted) - start_date: Filter dag runs by start date (optional) - end_date: Filter dag runs by end date (optional) + state: Filter Dag runs by state (optional; no filter applied when omitted) + start_date: Filter Dag runs by start date (optional) + end_date: Filter Dag runs by end date (optional) limit: Limit the number of results returned - dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~"). + offset: Offset to start returning results from + logical_date_gte: Filter Dag runs with logical date greater than or equal to this value + logical_date_gt: Filter Dag runs with logical date greater than this value + logical_date_lte: Filter Dag runs with logical date less than or equal to this value + logical_date_lt: Filter Dag runs with logical date less than this value + partition_date_gte: Filter Dag runs with partition date greater than or equal to this value + partition_date_gt: Filter Dag runs with partition date greater than this value + partition_date_lte: Filter Dag runs with partition date less than or equal to this value + partition_date_lt: Filter Dag runs with partition date less than this value + partition_date_start: Filter Dag runs whose partition date is on or after this local day + partition_date_end: Filter Dag runs whose partition date is on or before this local day + order_by: Field or fields to sort by + run_id_pattern: Filter Dag runs by run ID pattern + partition_key_pattern: Filter Dag runs by partition key pattern + partition_key_prefix_pattern: Filter Dag runs by partition key prefix pattern + dag_id: The Dag ID to filter by. If None, retrieves Dag runs for all Dags (using "~"). """ - # Use "~" for all DAGs if dag_id is not specified + # Use "~" for all Dags if dag_id is not specified. if not dag_id: dag_id = "~" params: dict[str, Any] = {"limit": limit} + if offset is not None: + params["offset"] = offset if state is not None: params["state"] = str(state) if start_date is not None: params["start_date"] = start_date.isoformat() if end_date is not None: params["end_date"] = end_date.isoformat() + if logical_date_gte is not None: + params["logical_date_gte"] = logical_date_gte.isoformat() + if logical_date_gt is not None: + params["logical_date_gt"] = logical_date_gt.isoformat() + if logical_date_lte is not None: + params["logical_date_lte"] = logical_date_lte.isoformat() + if logical_date_lt is not None: + params["logical_date_lt"] = logical_date_lt.isoformat() + if partition_date_gte is not None: + params["partition_date_gte"] = partition_date_gte.isoformat() + if partition_date_gt is not None: + params["partition_date_gt"] = partition_date_gt.isoformat() + if partition_date_lte is not None: + params["partition_date_lte"] = partition_date_lte.isoformat() + if partition_date_lt is not None: + params["partition_date_lt"] = partition_date_lt.isoformat() + if partition_date_start is not None: + params["partition_date_start"] = partition_date_start.isoformat() + if partition_date_end is not None: + params["partition_date_end"] = partition_date_end.isoformat() + if order_by is not None: + params["order_by"] = order_by + if run_id_pattern is not None: + params["run_id_pattern"] = run_id_pattern + if partition_key_pattern is not None: + params["partition_key_pattern"] = partition_key_pattern + if partition_key_prefix_pattern is not None: + params["partition_key_prefix_pattern"] = partition_key_prefix_pattern try: self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params) @@ -645,6 +707,32 @@ def list( except ServerResponseError as e: raise e + def _clear_task_instances( + self, + dag_id: str, + dag_run_id: str, + *, + dry_run: bool = False, + only_failed: bool = False, + only_running: bool = False, + ) -> TaskInstanceCollectionResponse | ServerResponseError: + """Clear task instances for a Dag run.""" + body = ClearTaskInstancesBody( + dag_run_id=dag_run_id, + dry_run=dry_run, + only_failed=only_failed, + only_running=only_running, + reset_dag_runs=True, + ) + try: + self.response = self.client.post( + f"/dags/{dag_id}/clearTaskInstances", + json=body.model_dump(mode="json", by_alias=True, exclude_none=True), + ) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e + class JobsOperations(BaseOperations): """Job operations.""" diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 11ff4542e01ef..16c9e39a4c1aa 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -265,7 +265,52 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: ARG_DAG_ID = Arg( flags=("dag_id",), type=str, - help="The Dag ID of the Dag to pause or unpause", + help="The Dag ID", +) + +ARG_DAG_RUN_ID = Arg( + flags=("--run-id",), + type=str, + help="The Dag run ID to clear", +) +ARG_DAG_PARTITION_KEY = Arg( + flags=("--partition-key",), + type=str, + help="The Dag run partition key to clear", +) +ARG_DAG_PARTITION_DATE_START = Arg( + flags=("--partition-date-start",), + type=str, + help=( + "Inclusive lower bound of the partition_date window, interpreted as a local calendar " + "day in the Dag's timetable timezone. Any time-of-day component is ignored." + ), +) +ARG_DAG_PARTITION_DATE_END = Arg( + flags=("--partition-date-end",), + type=str, + help=( + "Inclusive upper bound of the partition_date window, interpreted as a local calendar " + "day in the Dag's timetable timezone. Any time-of-day component is ignored." + ), +) +ARG_DAG_CLEAR_ONLY_FAILED = Arg( + flags=("-f", "--only-failed"), + default=False, + action="store_true", + help="Only clear failed task instances", +) +ARG_DAG_CLEAR_ONLY_RUNNING = Arg( + flags=("-r", "--only-running"), + default=False, + action="store_true", + help="Only clear running task instances", +) +ARG_DAG_CLEAR_YES = Arg( + flags=("-y", "--yes"), + default=False, + action="store_true", + help="Do not prompt to confirm clearing task instances", ) ARG_ACTION_ON_EXISTING_KEY = Arg( @@ -484,6 +529,7 @@ def _is_primitive_type(type_name: str) -> bool: "dict", "tuple", "set", + "datetime.date", "datetime.datetime", } # Handle Optional types (e.g., "datetime.datetime | None", "str | None") @@ -514,6 +560,7 @@ def _python_type_from_string(type_name: str | type) -> type | Callable: "dict": dict, "tuple": tuple, "set": set, + "datetime.date": datetime.date, "datetime.datetime": datetime.datetime, "dict[str, typing.Any]": dict, } @@ -959,6 +1006,21 @@ def merge_commands( ) DAG_COMMANDS = ( + ActionCommand( + name="clear", + help="Clear task instances for Dag runs selected by run ID, partition key, or partition date", + func=lazy_load_command("airflowctl.ctl.commands.dag_command.clear"), + args=( + ARG_DAG_ID, + ARG_DAG_RUN_ID, + ARG_DAG_PARTITION_KEY, + ARG_DAG_PARTITION_DATE_START, + ARG_DAG_PARTITION_DATE_END, + ARG_DAG_CLEAR_ONLY_FAILED, + ARG_DAG_CLEAR_ONLY_RUNNING, + ARG_DAG_CLEAR_YES, + ), + ), ActionCommand( name="next-execution", help="Show the next scheduled execution time for a Dag", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py index 301821f9c2c3c..7cf65c890a315 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py @@ -17,6 +17,7 @@ from __future__ import annotations +import datetime import sys from typing import Literal @@ -84,17 +85,17 @@ def unpause(args, api_client=NEW_API_CLIENT) -> None: @provide_api_client(kind=ClientKind.CLI) def next_execution(args, api_client=NEW_API_CLIENT) -> dict | None: - """Show next scheduled execution time for a DAG.""" + """Show next scheduled execution time for a Dag.""" try: response = api_client.dags.get(dag_id=args.dag_id) except ServerResponseError as e: - rich.print(f"[red]Error retrieving DAG {args.dag_id}: {e}[/red]") + rich.print(f"[red]Error retrieving Dag {args.dag_id}: {e}[/red]") sys.exit(1) next_exec_data = {field: getattr(response, field) for field in _NEXT_EXECUTION_FIELDS} if all(value is None for value in next_exec_data.values()): - rich.print(f"[yellow]No upcoming run scheduled for DAG {args.dag_id}.[/yellow]") + rich.print(f"[yellow]No upcoming run scheduled for Dag {args.dag_id}.[/yellow]") return None result = next_exec_data @@ -103,3 +104,145 @@ def next_execution(args, api_client=NEW_API_CLIENT) -> dict | None: output=args.output, ) return result + + +def _parse_partition_date(value: str | None, *, option: str) -> datetime.date | None: + if value is None: + return None + + try: + if "T" not in value and " " not in value: + return datetime.date.fromisoformat(value) + + return datetime.datetime.fromisoformat(value.replace("Z", "+00:00")).date() + except ValueError: + raise SystemExit( + f"Invalid {option}: {value!r}. Use YYYY-MM-DD or ISO 8601 datetime; only the date is used." + ) from None + + +def _validate_clear_args(args) -> None: + has_run_id = args.run_id is not None + has_partition_key = args.partition_key is not None + has_partition_date = args.partition_date_start is not None or args.partition_date_end is not None + + if sum([has_run_id, has_partition_key, has_partition_date]) != 1: + raise SystemExit( + "Exactly one selector is required: --run-id, --partition-key, " + "or --partition-date-start with --partition-date-end." + ) + if has_partition_date and (args.partition_date_start is None or args.partition_date_end is None): + raise SystemExit("--partition-date-start and --partition-date-end must be provided together.") + if args.only_failed and args.only_running: + raise SystemExit("--only-failed and --only-running are mutually exclusive.") + + +def _list_dag_runs(api_client, dag_id: str, *, order_by: str = "logical_date", **filters) -> list: + dag_runs = [] + offset = 0 + while True: + response = api_client.dag_runs.list( + dag_id=dag_id, + offset=offset, + order_by=order_by, + **filters, + ) + page_dag_runs = response.dag_runs + dag_runs.extend(page_dag_runs) + + offset += len(page_dag_runs) + if not page_dag_runs or response.total_entries is None or offset >= response.total_entries: + return dag_runs + + +def _get_dag_runs_to_clear(args, api_client) -> list: + if args.run_id is not None: + return [api_client.dag_runs.get(dag_id=args.dag_id, dag_run_id=args.run_id)] + + if args.partition_key is not None: + return [ + dag_run + for dag_run in _list_dag_runs( + api_client, + args.dag_id, + order_by="partition_date", + partition_key_pattern=args.partition_key, + ) + if dag_run.partition_key == args.partition_key + ] + + partition_date_start = _parse_partition_date(args.partition_date_start, option="--partition-date-start") + partition_date_end = _parse_partition_date(args.partition_date_end, option="--partition-date-end") + if partition_date_start is not None and partition_date_end is not None: + if partition_date_start > partition_date_end: + raise SystemExit("--partition-date-start must be before or equal to --partition-date-end.") + return _list_dag_runs( + api_client, + args.dag_id, + order_by="partition_date", + partition_date_start=partition_date_start, + partition_date_end=partition_date_end, + ) + + return [] + + +def _print_dag_runs_to_clear(dag_id: str, dag_runs: list) -> None: + rich.print(f"[yellow]Dag:[/yellow] {dag_id}") + rich.print(f"[yellow]Dag runs to clear:[/yellow] {len(dag_runs)}") + for dag_run in dag_runs: + logical_date = dag_run.logical_date.isoformat() if dag_run.logical_date is not None else "-" + partition_date = getattr(dag_run, "partition_date", None) + partition_date_display = partition_date.isoformat() if partition_date is not None else "-" + rich.print( + f" - {dag_run.dag_run_id} (logical date: {logical_date}, partition date: {partition_date_display})" + ) + + +def _confirm_clear(dag_id: str, dag_runs: list) -> bool: + _print_dag_runs_to_clear(dag_id, dag_runs) + answer = input("Clear task instances for these Dag runs? [y/N] ") + return answer.strip().lower() in {"y", "yes"} + + +def _get_dag_run_sort_key(dag_run) -> tuple[str, str, str]: + partition_date: datetime.datetime | None = getattr(dag_run, "partition_date", None) + logical_date: datetime.datetime | None = dag_run.logical_date + return ( + partition_date.isoformat() if partition_date is not None else "", + logical_date.isoformat() if logical_date is not None else "", + dag_run.dag_run_id, + ) + + +@provide_api_client(kind=ClientKind.CLI) +def clear(args, api_client=NEW_API_CLIENT) -> dict[str, int | bool]: + """Clear task instances for selected Dag runs.""" + _validate_clear_args(args) + + dag_runs = _get_dag_runs_to_clear(args, api_client) + if not dag_runs: + rich.print(f"[yellow]No matching Dag runs found for {args.dag_id}.[/yellow]") + return {"dag_run_count": 0, "cleared_task_instances": 0} + + dag_runs = sorted(dag_runs, key=_get_dag_run_sort_key) + + if not args.yes and not _confirm_clear(args.dag_id, dag_runs): + rich.print("[yellow]Cancelled.[/yellow]") + return {"dag_run_count": len(dag_runs), "cleared_task_instances": 0, "cancelled": True} + + cleared_task_instances = 0 + for dag_run in dag_runs: + response = api_client.dag_runs._clear_task_instances( + dag_id=args.dag_id, + dag_run_id=dag_run.dag_run_id, + dry_run=False, + only_failed=args.only_failed, + only_running=args.only_running, + ) + cleared_task_instances += response.total_entries or 0 + + rich.print( + f"[green]Cleared {cleared_task_instances} task instance(s) across {len(dag_runs)} Dag run(s).[/green]" + ) + return {"dag_run_count": len(dag_runs), "cleared_task_instances": cleared_task_instances} diff --git a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml index eb566a96b1fb8..de17cccf9c84d 100644 --- a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml +++ b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml @@ -52,6 +52,7 @@ connections: test: "Test connectivity for a given connection" dags: + clear: "Clear task instances for Dag runs selected by run ID, partition key, or partition date" get: "Retrieve a Dag by its ID" get-details: "Retrieve detailed information for a Dag" get-tags: "List all tags used across Dags" diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 52faecee73ea0..9f9d329935e26 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1202,6 +1202,82 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.dag_run_collection_response + def test_list_with_clear_filters(self): + logical_date_start = datetime.datetime(2025, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + logical_date_end = datetime.datetime(2025, 1, 2, 23, 59, 59, tzinfo=datetime.timezone.utc) + partition_date_start = datetime.datetime(2025, 2, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + partition_date_after_start = datetime.datetime(2025, 2, 2, 0, 0, 0, tzinfo=datetime.timezone.utc) + partition_date_before_end = datetime.datetime(2025, 2, 3, 0, 0, 0, tzinfo=datetime.timezone.utc) + partition_date_end = datetime.datetime(2025, 2, 4, 23, 59, 59, tzinfo=datetime.timezone.utc) + partition_day_start = datetime.date(2025, 2, 1) + partition_day_end = datetime.date(2025, 2, 4) + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == f"/api/v2/dags/{self.dag_id}/dagRuns" + params = dict(request.url.params) + assert params["limit"] == "50" + assert params["offset"] == "100" + assert params["logical_date_gte"] == logical_date_start.isoformat() + assert params["logical_date_lte"] == logical_date_end.isoformat() + assert params["partition_date_gte"] == partition_date_start.isoformat() + assert params["partition_date_gt"] == partition_date_after_start.isoformat() + assert params["partition_date_lt"] == partition_date_before_end.isoformat() + assert params["partition_date_lte"] == partition_date_end.isoformat() + assert params["partition_date_start"] == partition_day_start.isoformat() + assert params["partition_date_end"] == partition_day_end.isoformat() + assert params["order_by"] == "logical_date" + assert params["run_id_pattern"] == "manual__" + assert params["partition_key_pattern"] == "2025-01-01" + return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.dag_runs.list( + dag_id=self.dag_id, + limit=50, + offset=100, + logical_date_gte=logical_date_start, + logical_date_lte=logical_date_end, + partition_date_gte=partition_date_start, + partition_date_gt=partition_date_after_start, + partition_date_lt=partition_date_before_end, + partition_date_lte=partition_date_end, + partition_date_start=partition_day_start, + partition_date_end=partition_day_end, + order_by="logical_date", + run_id_pattern="manual__", + partition_key_pattern="2025-01-01", + ) + + assert response == self.dag_run_collection_response + + def test_clear_task_instances(self): + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.method == "POST" + assert request.url.path == f"/api/v2/dags/{self.dag_id}/clearTaskInstances" + body = json.loads(request.content) + assert body == { + "dry_run": False, + "only_failed": True, + "only_running": False, + "reset_dag_runs": True, + "dag_run_id": self.dag_run_id, + "include_upstream": False, + "include_downstream": False, + "include_future": False, + "include_past": False, + "prevent_running_task": False, + } + return httpx.Response(200, json={"task_instances": [], "total_entries": 3}) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.dag_runs._clear_task_instances( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + only_failed=True, + ) + + assert response.total_entries == 3 + @pytest.mark.parametrize( ( "dag_id_input", diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py index 405eb030065f8..1b3b3add77eda 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py @@ -17,6 +17,8 @@ from __future__ import annotations import datetime +from types import SimpleNamespace +from unittest.mock import Mock, call import pytest @@ -120,6 +122,27 @@ class TestDagCommands: is_stale=False, ) + @staticmethod + def _dag_run( + dag_run_id: str, + *, + logical_date: datetime.datetime | None = datetime.datetime(2025, 1, 1, 0, 0, 0), + partition_key: str | None = None, + partition_date: datetime.datetime | None = datetime.datetime(2025, 1, 1, 0, 0, 0), + ): + return SimpleNamespace( + dag_run_id=dag_run_id, + logical_date=logical_date, + partition_key=partition_key, + partition_date=partition_date, + ) + + @staticmethod + def _api_client_mock(): + api_client = Mock(spec_set=["dag_runs"]) + api_client.dag_runs = Mock(spec_set=["get", "list", "_clear_task_instances"]) + return api_client + def test_pause_dag(self, api_client_maker, monkeypatch): api_client = api_client_maker( path=f"/api/v2/dags/{self.dag_id}", @@ -215,3 +238,245 @@ def test_next_execution_fail(self, api_client_maker): self.parser.parse_args(["dags", "next-execution", self.dag_id]), api_client=api_client, ) + + def test_clear_by_run_id(self): + api_client = self._api_client_mock() + api_client.dag_runs.get.return_value = self._dag_run("scheduled__2025-01-01") + api_client.dag_runs._clear_task_instances.return_value = SimpleNamespace(total_entries=2) + + result = dag_command.clear( + self.parser.parse_args( + ["dags", "clear", self.dag_id, "--run-id", "scheduled__2025-01-01", "--yes"] + ), + api_client=api_client, + ) + + assert result == {"dag_run_count": 1, "cleared_task_instances": 2} + api_client.dag_runs.get.assert_called_once_with( + dag_id=self.dag_id, dag_run_id="scheduled__2025-01-01" + ) + api_client.dag_runs._clear_task_instances.assert_called_once_with( + dag_id=self.dag_id, + dag_run_id="scheduled__2025-01-01", + dry_run=False, + only_failed=False, + only_running=False, + ) + + def test_clear_by_partition_key_filters_exact_match_and_paginates(self): + api_client = self._api_client_mock() + api_client.dag_runs.list.side_effect = [ + SimpleNamespace( + dag_runs=[ + self._dag_run( + "scheduled__2025-01-01", + logical_date=datetime.datetime(2025, 1, 1, 0, 0, 0), + partition_key="customer-a", + ), + self._dag_run( + "scheduled__2025-01-02", + logical_date=datetime.datetime(2025, 1, 2, 0, 0, 0), + partition_key="customer-a-suffix", + ), + ], + total_entries=3, + ), + SimpleNamespace( + dag_runs=[ + self._dag_run( + "scheduled__2025-01-03", + logical_date=datetime.datetime(2025, 1, 3, 0, 0, 0), + partition_key="customer-a", + ) + ], + total_entries=3, + ), + ] + api_client.dag_runs._clear_task_instances.side_effect = [ + SimpleNamespace(total_entries=1), + SimpleNamespace(total_entries=2), + ] + + result = dag_command.clear( + self.parser.parse_args(["dags", "clear", self.dag_id, "--partition-key", "customer-a", "--yes"]), + api_client=api_client, + ) + + assert result == {"dag_run_count": 2, "cleared_task_instances": 3} + assert api_client.dag_runs.list.call_args_list == [ + call( + dag_id=self.dag_id, + offset=0, + order_by="partition_date", + partition_key_pattern="customer-a", + ), + call( + dag_id=self.dag_id, + offset=2, + order_by="partition_date", + partition_key_pattern="customer-a", + ), + ] + assert api_client.dag_runs._clear_task_instances.call_args_list == [ + call( + dag_id=self.dag_id, + dag_run_id="scheduled__2025-01-01", + dry_run=False, + only_failed=False, + only_running=False, + ), + call( + dag_id=self.dag_id, + dag_run_id="scheduled__2025-01-03", + dry_run=False, + only_failed=False, + only_running=False, + ), + ] + + def test_clear_by_partition_date_uses_partition_date_filters(self): + api_client = self._api_client_mock() + api_client.dag_runs.list.return_value = SimpleNamespace( + dag_runs=[self._dag_run("scheduled__2025-01-01")], + total_entries=1, + ) + api_client.dag_runs._clear_task_instances.return_value = SimpleNamespace(total_entries=1) + + result = dag_command.clear( + self.parser.parse_args( + [ + "dags", + "clear", + self.dag_id, + "--partition-date-start", + "2025-01-01", + "--partition-date-end", + "2025-01-02", + "--only-running", + "--yes", + ] + ), + api_client=api_client, + ) + + assert result == {"dag_run_count": 1, "cleared_task_instances": 1} + api_client.dag_runs.list.assert_called_once_with( + dag_id=self.dag_id, + offset=0, + order_by="partition_date", + partition_date_start=datetime.date(2025, 1, 1), + partition_date_end=datetime.date(2025, 1, 2), + ) + api_client.dag_runs._clear_task_instances.assert_called_once_with( + dag_id=self.dag_id, + dag_run_id="scheduled__2025-01-01", + dry_run=False, + only_failed=False, + only_running=True, + ) + + def test_clear_by_partition_date_uses_calendar_dates_from_datetimes(self): + api_client = self._api_client_mock() + api_client.dag_runs.list.return_value = SimpleNamespace( + dag_runs=[self._dag_run("scheduled__2025-01-01")], + total_entries=1, + ) + api_client.dag_runs._clear_task_instances.return_value = SimpleNamespace(total_entries=1) + + result = dag_command.clear( + self.parser.parse_args( + [ + "dags", + "clear", + self.dag_id, + "--partition-date-start", + "2025-01-01T08:00:00+08:00", + "--partition-date-end", + "2025-01-02T17:00:00+08:00", + "--yes", + ] + ), + api_client=api_client, + ) + + assert result == {"dag_run_count": 1, "cleared_task_instances": 1} + api_client.dag_runs.list.assert_called_once_with( + dag_id=self.dag_id, + offset=0, + order_by="partition_date", + partition_date_start=datetime.date(2025, 1, 1), + partition_date_end=datetime.date(2025, 1, 2), + ) + + def test_clear_by_partition_date_accepts_naive_datetime_as_calendar_date(self): + api_client = self._api_client_mock() + api_client.dag_runs.list.return_value = SimpleNamespace( + dag_runs=[self._dag_run("scheduled__2025-01-01")], + total_entries=1, + ) + api_client.dag_runs._clear_task_instances.return_value = SimpleNamespace(total_entries=1) + + result = dag_command.clear( + self.parser.parse_args( + [ + "dags", + "clear", + self.dag_id, + "--partition-date-start", + "2025-01-01T00:00:00", + "--partition-date-end", + "2025-01-02T00:00:00", + "--yes", + ] + ), + api_client=api_client, + ) + + assert result == {"dag_run_count": 1, "cleared_task_instances": 1} + api_client.dag_runs.list.assert_called_once_with( + dag_id=self.dag_id, + offset=0, + order_by="partition_date", + partition_date_start=datetime.date(2025, 1, 1), + partition_date_end=datetime.date(2025, 1, 2), + ) + + def test_clear_prompts_before_clearing(self, monkeypatch): + api_client = self._api_client_mock() + api_client.dag_runs.get.return_value = self._dag_run("scheduled__2025-01-01") + monkeypatch.setattr("builtins.input", lambda _: "n") + + result = dag_command.clear( + self.parser.parse_args(["dags", "clear", self.dag_id, "--run-id", "scheduled__2025-01-01"]), + api_client=api_client, + ) + + assert result == {"dag_run_count": 1, "cleared_task_instances": 0, "cancelled": True} + api_client.dag_runs._clear_task_instances.assert_not_called() + + @pytest.mark.parametrize( + "command", + [ + ["dags", "clear", dag_id], + ["dags", "clear", dag_id, "--run-id", "run", "--partition-key", "key"], + [ + "dags", + "clear", + dag_id, + "--partition-date-start", + "2025-01-01", + ], + [ + "dags", + "clear", + dag_id, + "--run-id", + "run", + "--only-failed", + "--only-running", + ], + ], + ) + def test_clear_validates_selectors(self, command): + with pytest.raises(SystemExit): + dag_command.clear(self.parser.parse_args(command), api_client=self._api_client_mock())