Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b7cb1aa
implement canonical metrics, feature flagged to preserve legacy where…
chrishagglund-ship-it Apr 29, 2026
9d671ca
validate the multiprocessing start method choice
chrishagglund-ship-it Apr 29, 2026
98cf8ac
remove unnecessary sleep
chrishagglund-ship-it Apr 29, 2026
6fed505
fixes for a few canonical metrics
chrishagglund-ship-it Apr 30, 2026
63974df
cleaner reporting on which metrics implementation is used in the test…
chrishagglund-ship-it May 6, 2026
9bca754
update docs regarding metrics
chrishagglund-ship-it May 6, 2026
a6e721d
additional update of docs re: metrics
chrishagglund-ship-it May 6, 2026
98255f8
add or update a changelog
chrishagglund-ship-it May 7, 2026
cf49ea8
wip fix payload type to not break upgraders that might refer to it
chrishagglund-ship-it May 11, 2026
d2c5fa0
wip attempt to address directory and db clobbering concern
chrishagglund-ship-it May 11, 2026
ce03f71
wip, attempting to not impose new behaviors on upgraders
chrishagglund-ship-it May 11, 2026
9d118b1
directory handling for metrics db location was broken by recent change
chrishagglund-ship-it May 12, 2026
c65cf83
wip, dealing with cardinality in url labels of metrics
chrishagglund-ship-it May 12, 2026
b2978cb
getting template strings output in canonical metrics uris - wip
chrishagglund-ship-it May 12, 2026
39d59e0
fix startup issue with db cleanup and child processes
chrishagglund-ship-it May 12, 2026
ca92647
addressing hygeine issues
chrishagglund-ship-it May 12, 2026
99c9457
mostly doc fixes, minor tweak to sensible default
chrishagglund-ship-it May 12, 2026
443e31d
restore default version label in one legacy metric
chrishagglund-ship-it May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Canonical metrics mode: opt-in harmonized metric surface via `WORKER_CANONICAL_METRICS=true` -- [details](METRICS.md#detailed-technical-notes--unreleased)
- `MetricsSettings` gains `clean_directory` and `clean_dead_pids` for opt-in stale `.db` file cleanup (both default to `False`)
- `CONDUCTOR_MP_START_METHOD` env var to control the worker pool's multiprocessing start method

### Changed

- Legacy metrics emit unchanged by default; no env var required
- `metrics_collector.py` is now a compatibility shim; `from conductor.client.telemetry.metrics_collector import MetricsCollector` continues to work
628 changes: 334 additions & 294 deletions METRICS.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ with TaskHandler(configuration=config, metrics_settings=metrics, scan_for_annota
curl http://localhost:8000/metrics
```

See [examples/metrics_example.py](examples/metrics_example.py) and [METRICS.md](METRICS.md) for details on all tracked metrics.
Legacy metrics are emitted by default. Set `WORKER_CANONICAL_METRICS=true` before starting workers to use the canonical metric catalog. See [examples/metrics_example.py](examples/metrics_example.py) and [METRICS.md](METRICS.md) for the full legacy and canonical reference.

### Managing Workflow Executions

Expand Down
7 changes: 1 addition & 6 deletions WORKER_CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ export conductor.worker.process_order.paused=true
When a worker is paused:
- It stops polling for new tasks
- Already-executing tasks complete normally
- The `task_paused_total` metric is incremented for each skipped poll
- No code changes or process restarts required

**Use cases:**
Expand All @@ -346,11 +345,7 @@ unset conductor.worker.all.paused
export conductor.worker.all.paused=false
```

**Monitor paused workers** using the `task_paused_total` metric:
```promql
# Check how many times workers were paused
task_paused_total{taskType="process_order"}
```
See [METRICS.md](METRICS.md) for the current Python SDK metrics catalog.

### Multi-Region Deployment

Expand Down
115 changes: 7 additions & 108 deletions docs/design/WORKER_DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,114 +767,13 @@ Workers package - all worker modules auto-discovered

## Metrics & Monitoring

The SDK provides comprehensive Prometheus metrics collection with two deployment modes:
This design document describes how worker events flow through the SDK. The
current user-facing metrics setup, legacy and canonical metric catalogs,
`WORKER_CANONICAL_METRICS` behavior, Prometheus examples, and migration guidance
are maintained in [`../../METRICS.md`](../../METRICS.md).

### Configuration

**HTTP Mode (Recommended - Metrics served from memory):**
```python
from conductor.client.configuration.settings.metrics_settings import MetricsSettings

metrics_settings = MetricsSettings(
directory="/tmp/conductor-metrics", # .db files for multiprocess coordination
update_interval=0.1, # Update every 100ms
http_port=8000 # Expose metrics via HTTP
)

with TaskHandler(
configuration=config,
metrics_settings=metrics_settings
) as handler:
handler.start_processes()
```

**File Mode (Metrics written to file):**
```python
metrics_settings = MetricsSettings(
directory="/tmp/conductor-metrics",
file_name="metrics.prom",
update_interval=1.0,
http_port=None # No HTTP server - write to file instead
)
```

### Modes

| Mode | HTTP Server | File Writes | Use Case |
|------|-------------|-------------|----------|
| HTTP (`http_port` set) | ✅ Built-in | ❌ Disabled | Prometheus scraping, production |
| File (`http_port=None`) | ❌ Disabled | ✅ Enabled | File-based monitoring, testing |

**HTTP Mode Benefits:**
- Metrics served directly from memory (no file I/O)
- Built-in HTTP server with `/metrics` and `/health` endpoints
- Automatic aggregation across worker processes (no PID labels)
- Ready for Prometheus scraping out-of-the-box

### Key Metrics

**Task Metrics:**
- `task_poll_time_seconds{taskType,quantile}` - Poll latency (includes batch polling)
- `task_execute_time_seconds{taskType,quantile}` - Actual execution time (async tasks: from submission to completion)
- `task_execute_error_total{taskType,exception}` - Execution errors by type
- `task_poll_total{taskType}` - Total poll count
- `task_result_size_bytes{taskType,quantile}` - Task output size

**API Metrics:**
- `http_api_client_request{method,uri,status,quantile}` - API request latency
- `http_api_client_request_count{method,uri,status}` - Request count by endpoint
- `http_api_client_request_sum{method,uri,status}` - Total request time

**Labels:**
- `taskType`: Task definition name
- `method`: HTTP method (GET, POST, PUT)
- `uri`: API endpoint path
- `status`: HTTP status code
- `exception`: Exception type (for errors)
- `quantile`: 0.5, 0.75, 0.9, 0.95, 0.99

**Important Notes:**
- **No PID labels**: Metrics are automatically aggregated across processes
- **Async execution time**: Includes actual execution time, not just coroutine submission time
- **Multiprocess safe**: Uses SQLite .db files in `directory` for coordination

### Prometheus Integration

**Scrape Config:**
```yaml
scrape_configs:
- job_name: 'conductor-workers'
static_configs:
- targets: ['localhost:8000']
scrape_interval: 15s
```

**Accessing Metrics:**
```bash
# Metrics endpoint
curl http://localhost:8000/metrics

# Health check
curl http://localhost:8000/health

# Watch specific metric
watch -n 1 'curl -s http://localhost:8000/metrics | grep task_execute_time_seconds'
```

**PromQL Examples:**
```promql
# Average execution time
rate(task_execute_time_seconds_sum[5m]) / rate(task_execute_time_seconds_count[5m])

# Success rate
sum(rate(task_execute_time_seconds_count{status="SUCCESS"}[5m])) / sum(rate(task_execute_time_seconds_count[5m]))

# p95 latency
task_execute_time_seconds{quantile="0.95"}

# Error rate
sum(rate(task_execute_error_total[5m])) by (taskType)
```
Keep metric names and PromQL examples out of this design document so the SDK has
one source of truth for legacy and canonical metrics.

---

Expand Down Expand Up @@ -1264,8 +1163,8 @@ class CostTracker(TaskRunnerEventsListener):
```python
handler = TaskHandler(
configuration=config,
metrics_settings=metrics_settings,
event_listeners=[
PrometheusMetricsCollector(),
SLAMonitor(threshold_ms=5000),
CostTracker(cost_per_second={'ml_task': 0.05}),
CustomAuditLogger()
Expand Down
43 changes: 7 additions & 36 deletions docs/design/WORKER_SDK_IMPLEMENTATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1789,44 +1789,15 @@ Response: void

## 15. Metrics & Monitoring

### 15.1 Required Metrics
The Python SDK's current metrics behavior is documented in
[`../../METRICS.md`](../../METRICS.md). That file is the source of truth for:

**Via Event System (Recommended):**
- Enabling metrics with `MetricsSettings`
- Selecting canonical metrics with `WORKER_CANONICAL_METRICS`
- The complete legacy and canonical Prometheus catalogs
- Migration guidance from legacy quantile gauges to canonical histograms

Implement MetricsCollector as EventListener:

```
class MetricsCollector implements TaskRunnerEventsListener {
on_poll_started(event):
increment_counter("task_poll_total", labels={taskType: event.taskType})

on_poll_completed(event):
record_histogram("task_poll_time_seconds", event.durationMs / 1000)
increment_counter("task_poll_total", labels={taskType: event.taskType})

on_task_execution_completed(event):
record_histogram("task_execute_time_seconds", event.durationMs / 1000)
record_histogram("task_result_size_bytes", event.outputSizeBytes)

on_task_execution_failure(event):
increment_counter("task_execute_error_total",
labels={taskType: event.taskType, exception: event.cause.type})

on_task_update_failure(event):
increment_counter("task_update_failed_total",
labels={taskType: event.taskType})
// CRITICAL: Alert operations team
}
```

**Metric Names (Prometheus format):**
- `task_poll_total{taskType}`
- `task_poll_time_seconds{taskType,quantile}`
- `task_execute_time_seconds{taskType,quantile}`
- `task_execute_error_total{taskType,exception}`
- `task_result_size_bytes{taskType,quantile}`
- `task_update_error_total{taskType,exception}`
- `task_update_failed_total{taskType}` ← CRITICAL metric
Do not duplicate metric names or PromQL examples in this implementation guide.

---

Expand Down
Loading
Loading