Skip to content

feat(moq-lite,moq-relay): per-category task profiling via tokio-metrics#1288

Open
kixelated wants to merge 1 commit intomainfrom
feat/task-profiling
Open

feat(moq-lite,moq-relay): per-category task profiling via tokio-metrics#1288
kixelated wants to merge 1 commit intomainfrom
feat/task-profiling

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

Adds diagnostic task-category profiling so we can answer "which class of detached task is piling up?" without eyeballing a jemalloc heap dump. This is the proactive follow-up to PR #1286 — that one fixed a specific run_send_bandwidth leak; this one makes the next such leak obvious within minutes instead of hours.

  • New moq_lite::task module exposes a typed Category handle plus spawn / snapshot / snapshot_raw / TaskSnapshot.
  • Each Category is a pub static with a lazily-initialised tokio_metrics::TaskMonitor. The typed handle means typos fail at compile time and there's no per-spawn hash lookup — first use registers self in a global Mutex<Vec<&'static Category>>, subsequent spawns are lock-free.
  • moq_relay::task::run() listens for SIGUSR2 and logs the snapshot on demand, and also logs passively every 10 minutes so slow leaks are visible from the journal without operator intervention. Wired into main.rs alongside the existing SIGUSR1 jemalloc handler.
  • All 21 web_async::spawn sites in rs/moq-lite/src/ migrated to crate::task::<CATEGORY>.spawn(...). The 3 tokio::spawn sites in moq-relay (connection loop, cluster remote, cert reload) are migrated to their own relay-side categories.
  • No RUSTFLAGS=\"--cfg tokio_unstable\" needed — TaskMonitor::instrument works on stable tokio. (That was the pitfall that killed web-async's earlier spawn_named attempt.)
  • web-async itself is untouched — Lock<T> and FuturesExt are still used inside moq-lite.

Bumps moq-lite 0.15.11 → 0.15.12. Adds tokio-metrics = \"0.5\" as a direct dep on moq-lite.

Output shape

A snapshot line looks like (one per category):

```
moq-lite/bandwidth live= 42 spawned= 1203 dropped= 1161 slow= 2 poll_us= 12
moq-lite/lite-session live= 42 spawned= 1203 dropped= 1161 slow= 0 poll_us= 18
moq-relay/connection live= 42 spawned= 1203 dropped= 1161 slow= 0 poll_us= 9
...
```

On a memory alert the operator runs `journalctl -u moq-relay | grep task_snapshot | tail` — any category whose `live` is monotonically climbing is the leak.

Out of scope

  • Test-only `tokio::spawn` sites in `moq-native`, `moq-lite/lite/priority` tests, `hang`/`moq-mux` per-group finishers. They're noise for leak detection; add in a follow-up if ever needed.
  • Removing `web-async` entirely. Out of scope because `Lock` and `FuturesExt` are still used.
  • `console-subscriber` / tokio-console live inspection. Layer on later if cumulative snapshots aren't enough.

Test plan

  • `cargo test -p moq-lite task::` — `snapshot_tracks_live_count` passes (live count = 1 for a blocked task, 0 for an instantly-finished one)
  • `just check` passes (rustdoc intra-doc links, clippy, biome, etc.)
  • Local smoke: run `moq-relay` + `moq-clock pub` + `moq-clock sub`, `kill -USR2 $(pgrep moq-relay)`, eyeball `task_snapshot` lines in stderr showing `moq-lite/bandwidth live=2`, `moq-lite/lite-session live=2`, `moq-relay/connection live=2`. Kill the clock processes and re-signal, confirm they drop to 0.
  • Deploy to one cdn relay via `just deploy use`, wait 10 min, `ssh root@use.cdn.moq.dev journalctl -u moq-relay | grep task_snapshot`, eyeball.

🤖 Generated with Claude Code

Add a typed `Category` handle plus a `moq_lite::task` module that wraps
every detached-task spawn site in moq-lite (and moq-relay) with a
`tokio_metrics::TaskMonitor`. The intent is diagnostic: on the next
memory alert we can `kill -USR2 $(pidof moq-relay)` and immediately see
which class of task is piling up, rather than eyeballing a jemalloc
heap dump and cross-referencing stack traces.

Design notes:

- Each category is a `pub static Category` with a lazily-initialised
  `TaskMonitor` in a `OnceLock`. First use registers self in a global
  `Mutex<Vec<&'static Category>>`; subsequent spawns are lock-free.
- `Category::spawn(future) -> JoinHandle<()>` replaces `web_async::spawn`
  inside moq-lite. The typed handle means typos fail at compile time
  and there's no per-spawn hash lookup.
- `moq_lite::task::snapshot()` returns a sorted `Vec<TaskSnapshot>` of
  leak-focused fields (`live`, `spawned`, `dropped`, `slow_polls`,
  `mean_poll_us`). `snapshot_raw()` exposes the full `TaskMetrics`.
- `moq_relay::task::run()` listens for `SIGUSR2` and logs the snapshot
  on demand, and also logs passively every 10 minutes so slow leaks
  are visible from the journal without operator intervention. Wired
  into `main.rs` alongside the existing `SIGUSR1` jemalloc handler.
- No `tokio_unstable` flag needed — `TaskMonitor::instrument` works on
  stable tokio. This was the original pitfall that killed web-async's
  earlier `spawn_named` attempt.

Scope:
- All 21 `web_async::spawn` sites in `rs/moq-lite/src/` now call
  `crate::task::<CATEGORY>.spawn(...)`.
- The 3 `tokio::spawn` sites in moq-relay (connection loop, cluster
  remote, cert reload) are migrated to their respective categories.
- `web_async` is untouched — `Lock<T>` and `FuturesExt` are still used.
- Test-only spawns and low-priority crates (hang, moq-mux, libmoq,
  moq-boy, etc.) are out of scope — add them in a follow-up if needed.

Bumps `moq-lite` 0.15.11 → 0.15.12. Adds `tokio-metrics = "0.5"` as a
direct dep on moq-lite.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 11, 2026

Walkthrough

A task profiling and monitoring framework is introduced across the moq-lite and moq-relay crates. The tokio-metrics dependency is added, and a new task.rs module is created in each crate to provide typed task categories with monitoring capabilities. Generic task spawning mechanisms (web_async::spawn, tokio::spawn) are systematically replaced with category-specific spawners throughout the codebase. The task module is exported as part of the public API. In moq-relay, a SIGUSR2 signal handler and periodic snapshot reporter are implemented to log task metrics at runtime.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: introducing per-category task profiling via tokio-metrics for both moq-lite and moq-relay.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, explaining the motivation, implementation details, output shape, and test plan.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/task-profiling
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/task-profiling

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
rs/moq-relay/src/cluster.rs (1)

242-250: Remove redundant .in_current_span() call in remote task spawn.

Category::spawn (line 53 of rs/moq-lite/src/task.rs) already applies in_current_span internally, making the explicit .in_current_span() at line 249 redundant and creating double instrumentation.

Proposed fix
 			let handle = crate::task::CLUSTER_REMOTE.spawn(
 				async move {
 					match this.run_remote(node2.as_str(), None, token, origin).await {
 						Ok(()) => tracing::info!(%node2, "origin closed"),
 						Err(err) => tracing::warn!(%err, %node2, "origin error"),
 					}
 				}
-				.in_current_span(),
 			);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/cluster.rs` around lines 242 - 250, The spawned remote task
is being instrumented twice because Category::spawn
(crate::task::CLUSTER_REMOTE.spawn) already applies .in_current_span; remove the
explicit .in_current_span() on the async block passed to CLUSTER_REMOTE.spawn to
avoid double instrumentation — locate the block that calls CLUSTER_REMOTE.spawn
with async move { match this.run_remote(node2.as_str(), None, token,
origin).await { ... } } and delete the trailing .in_current_span() call so the
task is only instrumented once.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/task.rs`:
- Around line 209-220: In the test function snapshot_tracks_live_count add a
call to tokio::time::pause() at the very start of the test (before spawning
TEST_BLOCKER/TEST_INSTANT and before the tokio::time::sleep call) so the test
uses simulated time; update the snapshot_tracks_live_count function to call
tokio::time::pause() immediately on entry (retaining the existing sleep/advance
calls if you still need to drive the simulated clock).

In `@rs/moq-relay/src/task.rs`:
- Around line 32-33: The call to tokio::signal::unix::signal inside pub async fn
run() unconditionally uses a Unix-only API and breaks non-Unix builds; fix by
gating the Unix-specific implementation with #[cfg(unix)] on the run function
(or the whole module) and provide a non-Unix fallback implementation (e.g.,
#[cfg(not(unix))] pub async fn run() -> anyhow::Result<()> that uses
tokio::signal::ctrl_c().await or returns a sensible no-op), or alternatively
gate the module export in lib.rs; update references to run() accordingly so
non-Unix targets compile.

---

Nitpick comments:
In `@rs/moq-relay/src/cluster.rs`:
- Around line 242-250: The spawned remote task is being instrumented twice
because Category::spawn (crate::task::CLUSTER_REMOTE.spawn) already applies
.in_current_span; remove the explicit .in_current_span() on the async block
passed to CLUSTER_REMOTE.spawn to avoid double instrumentation — locate the
block that calls CLUSTER_REMOTE.spawn with async move { match
this.run_remote(node2.as_str(), None, token, origin).await { ... } } and delete
the trailing .in_current_span() call so the task is only instrumented once.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 327d25c8-0ad1-49f6-9fdd-3bacfdb21d50

📥 Commits

Reviewing files that changed from the base of the PR and between 4ee5378 and 3903af7.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (17)
  • rs/moq-lite/Cargo.toml
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/session.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lib.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/session.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-lite/src/session.rs
  • rs/moq-lite/src/task.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/lib.rs
  • rs/moq-relay/src/main.rs
  • rs/moq-relay/src/task.rs
  • rs/moq-relay/src/web.rs

Comment on lines +209 to +220
#[tokio::test]
async fn snapshot_tracks_live_count() {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
TEST_BLOCKER.spawn(async move {
let _ = rx.await;
});
TEST_INSTANT.spawn(async {});

// Give the runtime time to poll both tasks once so the instant one
// actually completes and gets counted as dropped.
tokio::time::sleep(Duration::from_millis(20)).await;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify async tests using tokio sleep also pause time first.
rg -n --type rust -C3 '#\[tokio::test\]|tokio::time::sleep\(|tokio::time::pause\('

Repository: moq-dev/moq

Length of output: 50368


🏁 Script executed:

# Check if the file exists and examine the test
find . -name "task.rs" -path "*/moq-lite/*" 2>/dev/null | head -1

Repository: moq-dev/moq

Length of output: 80


🏁 Script executed:

# If file exists, check lines around 209-220
if [ -f "rs/moq-lite/src/task.rs" ]; then
  sed -n '200,230p' rs/moq-lite/src/task.rs | cat -n
fi

Repository: moq-dev/moq

Length of output: 1128


🏁 Script executed:

# Check for all async tests in this file
sed -n '1,50p' rs/moq-lite/src/task.rs | grep -E '#\[tokio::test\]|async fn'

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check the entire test file for all async tests
wc -l rs/moq-lite/src/task.rs

Repository: moq-dev/moq

Length of output: 82


🏁 Script executed:

# Search for all #[tokio::test] attributes in the file
rg -n '#\[tokio::test\]' rs/moq-lite/src/task.rs -A 2

Repository: moq-dev/moq

Length of output: 179


🏁 Script executed:

# Get the complete test function
sed -n '209,240p' rs/moq-lite/src/task.rs

Repository: moq-dev/moq

Length of output: 1014


Add tokio::time::pause() to start of test.

The test calls tokio::time::sleep() on line 220 without pausing time first, making the test slower and potentially timing-sensitive. Add tokio::time::pause(); at the start of the test to use simulated time instead.

Proposed fix
 #[tokio::test]
 async fn snapshot_tracks_live_count() {
+	tokio::time::pause();
 	let (tx, rx) = tokio::sync::oneshot::channel::<()>();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/task.rs` around lines 209 - 220, In the test function
snapshot_tracks_live_count add a call to tokio::time::pause() at the very start
of the test (before spawning TEST_BLOCKER/TEST_INSTANT and before the
tokio::time::sleep call) so the test uses simulated time; update the
snapshot_tracks_live_count function to call tokio::time::pause() immediately on
entry (retaining the existing sleep/advance calls if you still need to drive the
simulated clock).

Comment on lines +32 to +33
pub async fn run() -> anyhow::Result<()> {
let mut sig = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined2())?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify whether moq-relay is intentionally unix-only and whether task module is cfg-gated.
rg -n --glob '**/Cargo.toml' 'name\s*=\s*"moq-relay"|target|cfg\(unix\)|unix|windows'
rg -n 'tokio::signal::unix|cfg\(unix\)|mod task|pub mod task' rs/moq-relay/src

Repository: moq-dev/moq

Length of output: 759


🏁 Script executed:

cat -n rs/moq-relay/src/task.rs | head -40

Repository: moq-dev/moq

Length of output: 1989


🏁 Script executed:

cat -n rs/moq-relay/src/lib.rs | grep -A 2 -B 2 "pub mod task"

Repository: moq-dev/moq

Length of output: 189


Guard Unix-only signal API or add a non-Unix fallback.

Line 33 calls tokio::signal::unix::signal(...) unconditionally. The run() function lacks a #[cfg(unix)] guard, and the module is publicly exported without gating in lib.rs. This will fail compilation on non-Unix targets. Either guard this function with #[cfg(unix)] and provide a non-Unix fallback implementation, or gate the entire module declaration in lib.rs.

🔧 Suggested fix shape
+#[cfg(unix)]
 pub async fn run() -> anyhow::Result<()> {
 	let mut sig = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined2())?;
 	// ...
 }

+#[cfg(not(unix))]
+pub async fn run() -> anyhow::Result<()> {
+	std::future::pending::<anyhow::Result<()>>().await
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/task.rs` around lines 32 - 33, The call to
tokio::signal::unix::signal inside pub async fn run() unconditionally uses a
Unix-only API and breaks non-Unix builds; fix by gating the Unix-specific
implementation with #[cfg(unix)] on the run function (or the whole module) and
provide a non-Unix fallback implementation (e.g., #[cfg(not(unix))] pub async fn
run() -> anyhow::Result<()> that uses tokio::signal::ctrl_c().await or returns a
sensible no-op), or alternatively gate the module export in lib.rs; update
references to run() accordingly so non-Unix targets compile.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant