feat(moq-lite,moq-relay): per-category task profiling via tokio-metrics#1288
feat(moq-lite,moq-relay): per-category task profiling via tokio-metrics#1288
Conversation
Add a typed `Category` handle plus a `moq_lite::task` module that wraps every detached-task spawn site in moq-lite (and moq-relay) with a `tokio_metrics::TaskMonitor`. The intent is diagnostic: on the next memory alert we can `kill -USR2 $(pidof moq-relay)` and immediately see which class of task is piling up, rather than eyeballing a jemalloc heap dump and cross-referencing stack traces. Design notes: - Each category is a `pub static Category` with a lazily-initialised `TaskMonitor` in a `OnceLock`. First use registers self in a global `Mutex<Vec<&'static Category>>`; subsequent spawns are lock-free. - `Category::spawn(future) -> JoinHandle<()>` replaces `web_async::spawn` inside moq-lite. The typed handle means typos fail at compile time and there's no per-spawn hash lookup. - `moq_lite::task::snapshot()` returns a sorted `Vec<TaskSnapshot>` of leak-focused fields (`live`, `spawned`, `dropped`, `slow_polls`, `mean_poll_us`). `snapshot_raw()` exposes the full `TaskMetrics`. - `moq_relay::task::run()` listens for `SIGUSR2` and logs the snapshot on demand, and also logs passively every 10 minutes so slow leaks are visible from the journal without operator intervention. Wired into `main.rs` alongside the existing `SIGUSR1` jemalloc handler. - No `tokio_unstable` flag needed — `TaskMonitor::instrument` works on stable tokio. This was the original pitfall that killed web-async's earlier `spawn_named` attempt. Scope: - All 21 `web_async::spawn` sites in `rs/moq-lite/src/` now call `crate::task::<CATEGORY>.spawn(...)`. - The 3 `tokio::spawn` sites in moq-relay (connection loop, cluster remote, cert reload) are migrated to their respective categories. - `web_async` is untouched — `Lock<T>` and `FuturesExt` are still used. - Test-only spawns and low-priority crates (hang, moq-mux, libmoq, moq-boy, etc.) are out of scope — add them in a follow-up if needed. Bumps `moq-lite` 0.15.11 → 0.15.12. Adds `tokio-metrics = "0.5"` as a direct dep on moq-lite. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughA task profiling and monitoring framework is introduced across the moq-lite and moq-relay crates. The 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rs/moq-relay/src/cluster.rs (1)
242-250: Remove redundant.in_current_span()call in remote task spawn.
Category::spawn(line 53 of rs/moq-lite/src/task.rs) already appliesin_current_spaninternally, making the explicit.in_current_span()at line 249 redundant and creating double instrumentation.Proposed fix
let handle = crate::task::CLUSTER_REMOTE.spawn( async move { match this.run_remote(node2.as_str(), None, token, origin).await { Ok(()) => tracing::info!(%node2, "origin closed"), Err(err) => tracing::warn!(%err, %node2, "origin error"), } } - .in_current_span(), );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-relay/src/cluster.rs` around lines 242 - 250, The spawned remote task is being instrumented twice because Category::spawn (crate::task::CLUSTER_REMOTE.spawn) already applies .in_current_span; remove the explicit .in_current_span() on the async block passed to CLUSTER_REMOTE.spawn to avoid double instrumentation — locate the block that calls CLUSTER_REMOTE.spawn with async move { match this.run_remote(node2.as_str(), None, token, origin).await { ... } } and delete the trailing .in_current_span() call so the task is only instrumented once.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/task.rs`:
- Around line 209-220: In the test function snapshot_tracks_live_count add a
call to tokio::time::pause() at the very start of the test (before spawning
TEST_BLOCKER/TEST_INSTANT and before the tokio::time::sleep call) so the test
uses simulated time; update the snapshot_tracks_live_count function to call
tokio::time::pause() immediately on entry (retaining the existing sleep/advance
calls if you still need to drive the simulated clock).
In `@rs/moq-relay/src/task.rs`:
- Around line 32-33: The call to tokio::signal::unix::signal inside pub async fn
run() unconditionally uses a Unix-only API and breaks non-Unix builds; fix by
gating the Unix-specific implementation with #[cfg(unix)] on the run function
(or the whole module) and provide a non-Unix fallback implementation (e.g.,
#[cfg(not(unix))] pub async fn run() -> anyhow::Result<()> that uses
tokio::signal::ctrl_c().await or returns a sensible no-op), or alternatively
gate the module export in lib.rs; update references to run() accordingly so
non-Unix targets compile.
---
Nitpick comments:
In `@rs/moq-relay/src/cluster.rs`:
- Around line 242-250: The spawned remote task is being instrumented twice
because Category::spawn (crate::task::CLUSTER_REMOTE.spawn) already applies
.in_current_span; remove the explicit .in_current_span() on the async block
passed to CLUSTER_REMOTE.spawn to avoid double instrumentation — locate the
block that calls CLUSTER_REMOTE.spawn with async move { match
this.run_remote(node2.as_str(), None, token, origin).await { ... } } and delete
the trailing .in_current_span() call so the task is only instrumented once.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 327d25c8-0ad1-49f6-9fdd-3bacfdb21d50
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (17)
rs/moq-lite/Cargo.tomlrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/session.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lib.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/session.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/origin.rsrs/moq-lite/src/session.rsrs/moq-lite/src/task.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/lib.rsrs/moq-relay/src/main.rsrs/moq-relay/src/task.rsrs/moq-relay/src/web.rs
| #[tokio::test] | ||
| async fn snapshot_tracks_live_count() { | ||
| let (tx, rx) = tokio::sync::oneshot::channel::<()>(); | ||
| TEST_BLOCKER.spawn(async move { | ||
| let _ = rx.await; | ||
| }); | ||
| TEST_INSTANT.spawn(async {}); | ||
|
|
||
| // Give the runtime time to poll both tasks once so the instant one | ||
| // actually completes and gets counted as dropped. | ||
| tokio::time::sleep(Duration::from_millis(20)).await; | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify async tests using tokio sleep also pause time first.
rg -n --type rust -C3 '#\[tokio::test\]|tokio::time::sleep\(|tokio::time::pause\('Repository: moq-dev/moq
Length of output: 50368
🏁 Script executed:
# Check if the file exists and examine the test
find . -name "task.rs" -path "*/moq-lite/*" 2>/dev/null | head -1Repository: moq-dev/moq
Length of output: 80
🏁 Script executed:
# If file exists, check lines around 209-220
if [ -f "rs/moq-lite/src/task.rs" ]; then
sed -n '200,230p' rs/moq-lite/src/task.rs | cat -n
fiRepository: moq-dev/moq
Length of output: 1128
🏁 Script executed:
# Check for all async tests in this file
sed -n '1,50p' rs/moq-lite/src/task.rs | grep -E '#\[tokio::test\]|async fn'Repository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Check the entire test file for all async tests
wc -l rs/moq-lite/src/task.rsRepository: moq-dev/moq
Length of output: 82
🏁 Script executed:
# Search for all #[tokio::test] attributes in the file
rg -n '#\[tokio::test\]' rs/moq-lite/src/task.rs -A 2Repository: moq-dev/moq
Length of output: 179
🏁 Script executed:
# Get the complete test function
sed -n '209,240p' rs/moq-lite/src/task.rsRepository: moq-dev/moq
Length of output: 1014
Add tokio::time::pause() to start of test.
The test calls tokio::time::sleep() on line 220 without pausing time first, making the test slower and potentially timing-sensitive. Add tokio::time::pause(); at the start of the test to use simulated time instead.
Proposed fix
#[tokio::test]
async fn snapshot_tracks_live_count() {
+ tokio::time::pause();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/task.rs` around lines 209 - 220, In the test function
snapshot_tracks_live_count add a call to tokio::time::pause() at the very start
of the test (before spawning TEST_BLOCKER/TEST_INSTANT and before the
tokio::time::sleep call) so the test uses simulated time; update the
snapshot_tracks_live_count function to call tokio::time::pause() immediately on
entry (retaining the existing sleep/advance calls if you still need to drive the
simulated clock).
| pub async fn run() -> anyhow::Result<()> { | ||
| let mut sig = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined2())?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify whether moq-relay is intentionally unix-only and whether task module is cfg-gated.
rg -n --glob '**/Cargo.toml' 'name\s*=\s*"moq-relay"|target|cfg\(unix\)|unix|windows'
rg -n 'tokio::signal::unix|cfg\(unix\)|mod task|pub mod task' rs/moq-relay/srcRepository: moq-dev/moq
Length of output: 759
🏁 Script executed:
cat -n rs/moq-relay/src/task.rs | head -40Repository: moq-dev/moq
Length of output: 1989
🏁 Script executed:
cat -n rs/moq-relay/src/lib.rs | grep -A 2 -B 2 "pub mod task"Repository: moq-dev/moq
Length of output: 189
Guard Unix-only signal API or add a non-Unix fallback.
Line 33 calls tokio::signal::unix::signal(...) unconditionally. The run() function lacks a #[cfg(unix)] guard, and the module is publicly exported without gating in lib.rs. This will fail compilation on non-Unix targets. Either guard this function with #[cfg(unix)] and provide a non-Unix fallback implementation, or gate the entire module declaration in lib.rs.
🔧 Suggested fix shape
+#[cfg(unix)]
pub async fn run() -> anyhow::Result<()> {
let mut sig = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined2())?;
// ...
}
+#[cfg(not(unix))]
+pub async fn run() -> anyhow::Result<()> {
+ std::future::pending::<anyhow::Result<()>>().await
+}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-relay/src/task.rs` around lines 32 - 33, The call to
tokio::signal::unix::signal inside pub async fn run() unconditionally uses a
Unix-only API and breaks non-Unix builds; fix by gating the Unix-specific
implementation with #[cfg(unix)] on the run function (or the whole module) and
provide a non-Unix fallback implementation (e.g., #[cfg(not(unix))] pub async fn
run() -> anyhow::Result<()> that uses tokio::signal::ctrl_c().await or returns a
sensible no-op), or alternatively gate the module export in lib.rs; update
references to run() accordingly so non-Unix targets compile.
Summary
Adds diagnostic task-category profiling so we can answer "which class of detached task is piling up?" without eyeballing a jemalloc heap dump. This is the proactive follow-up to PR #1286 — that one fixed a specific
run_send_bandwidthleak; this one makes the next such leak obvious within minutes instead of hours.moq_lite::taskmodule exposes a typedCategoryhandle plusspawn/snapshot/snapshot_raw/TaskSnapshot.Categoryis apub staticwith a lazily-initialisedtokio_metrics::TaskMonitor. The typed handle means typos fail at compile time and there's no per-spawn hash lookup — first use registers self in a globalMutex<Vec<&'static Category>>, subsequent spawns are lock-free.moq_relay::task::run()listens forSIGUSR2and logs the snapshot on demand, and also logs passively every 10 minutes so slow leaks are visible from the journal without operator intervention. Wired intomain.rsalongside the existingSIGUSR1jemalloc handler.web_async::spawnsites inrs/moq-lite/src/migrated tocrate::task::<CATEGORY>.spawn(...). The 3tokio::spawnsites in moq-relay (connection loop, cluster remote, cert reload) are migrated to their own relay-side categories.RUSTFLAGS=\"--cfg tokio_unstable\"needed —TaskMonitor::instrumentworks on stable tokio. (That was the pitfall that killedweb-async's earlierspawn_namedattempt.)web-asyncitself is untouched —Lock<T>andFuturesExtare still used inside moq-lite.Bumps
moq-lite0.15.11 → 0.15.12. Addstokio-metrics = \"0.5\"as a direct dep on moq-lite.Output shape
A snapshot line looks like (one per category):
```
moq-lite/bandwidth live= 42 spawned= 1203 dropped= 1161 slow= 2 poll_us= 12
moq-lite/lite-session live= 42 spawned= 1203 dropped= 1161 slow= 0 poll_us= 18
moq-relay/connection live= 42 spawned= 1203 dropped= 1161 slow= 0 poll_us= 9
...
```
On a memory alert the operator runs `journalctl -u moq-relay | grep task_snapshot | tail` — any category whose `live` is monotonically climbing is the leak.
Out of scope
Test plan
🤖 Generated with Claude Code