Add dual MSF + Hang catalog support in @moq/watch#1147
Add dual MSF + Hang catalog support in @moq/watch#1147
Conversation
- Add `ordered` and `max_latency` fields to `Track` struct - Create `Subscription` struct for per-consumer preferences - Create `TrackSubscription` type with RAII lifecycle (auto-removes on drop) - Add `TrackProducer::poll_max` for aggregate subscription change detection - Add `TrackConsumer::subscribe()` to register subscriptions - Add `GroupProducer::is_aborted()` for duplicate group handling - Allow `create_group` to replace aborted groups (for subscription restart) - Replace `BroadcastDynamic::requested_track() -> TrackProducer` with `requested_track() -> TrackRequest` (handler creates producer, responds) - Make `BroadcastConsumer::subscribe_track` async with request/response - Add `BroadcastDynamic::insert_track` for handler-side track insertion - Add dedup for pending track requests (multiple subscribers share reply) - Update all callers across the workspace Known issues: - libmoq FFI tests deadlock (likely BroadcastConsumer upgrade in async context) - moq-mux tests may deadlock similarly Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Track is now just `{ name: String }` — no priority/ordered/max_latency
- Subscription gains `start: Option<u64>` and `end: Option<u64>` fields
- Aggregation: start = min (None wins), end = max (None wins)
- No cap/ceiling from producer — aggregation is purely across subscribers
- TrackProducer::subscription() is async, blocks until aggregate changes
- TrackProducer stores prev_subscription internally (no prev arg needed)
- Remove TrackConsumer::start_at() — use Subscription.start instead
- Remove subscription tests (will rewrite in follow-up)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Revert broadcast.rs to sync: consume_track() looks up or creates tracks synchronously, requested_track() returns TrackProducer directly - Remove TrackRequest and oneshot channel machinery - Replace TrackSubscription with TrackSubscriber using conducer-based subscriptions for two-level poll aggregation - TrackConsumer::subscribe() is async, blocks until first group exists - TrackSubscriber::recv_group() respects start/end range - Fix create_group aborted replacement: tombstone old entry, push new - Update all callers to use consume_track (sync) or subscribe_track - Fix moq-ffi to remove RUNTIME.block_on() now that consume_track is sync Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- libmoq: move consume_track + consumer construction before tokio::spawn so errors surface synchronously - lite/ietf subscriber: take TrackProducer by value instead of &mut Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
None now means "no preference" rather than "latest wins all." Callers wanting "latest" should start with None, then update() with a concrete value once latest() is known. Also: replace expect() with ? in consume_track, remove dead comment. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove recv_group/poll_recv_group from TrackConsumer (deprecated stubs remain) - Remove index field from TrackConsumer - Add poll_recv_group, recv_group, closed, poll_closed to TrackSubscriber - Rename OrderedConsumer → OrderedSubscriber (takes TrackSubscriber) - CatalogConsumer now takes TrackSubscriber - Update all protocol publishers to subscribe() before iterating groups - Update moq-ffi: subscribe_catalog/subscribe_media are now async - Update libmoq: move subscribe into spawned tasks - Update moq-mux: CatalogProducer::consume() is now async - Fix hang tests: write first group before subscribe to avoid deadlock - Fix hang tests: await tokio::time::sleep in paused-time tests - Add unit tests for subscribe() blocking on first group, finish, abort Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Deprecate the generic NotFound error in favor of more specific variants to make debugging easier. Each maps to its own wire code. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The cleanup task was firing immediately (unused() resolved with zero consumers), removing tracks from the lookup before any subscriber could find them. This matches main's behavior: stale entries are cleaned up lazily by consume_track when it finds closed weak refs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Take dev's moq_mux::consumer types (OrderedConsumer, LegacyConsumer) - Update them to use TrackSubscriber instead of TrackConsumer - Take dev's deprecated stubs in hang::container::consumer - Fix moq-cli subscribe to use consume_track + subscribe - Fix moq-ffi to use LegacyConsumer with TrackSubscriber Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create @moq/msf package with Zod schemas for MSF catalog format, and update the watch broadcast to subscribe to both catalog formats in parallel. Hang gets a 100ms headstart; first catalog to arrive wins. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughAdds a new JavaScript package for MSF catalogs with Zod schemas and encode/decode utilities, exposes it in the monorepo workspace, and integrates MSF support into the watch broadcast flow (runtime selection between "hang" and "msf"). Introduces package tsconfig and build scripts. In Rust, the consumer API shifts from synchronous 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-mux/src/convert/fmp4.rs (1)
37-50:⚠️ Potential issue | 🟠 MajorAvoid keeping
CatalogGuardalive across the awaited track setup.
CatalogGuardpublishes on drop, but here it stays live across everysubscribe_track(...).awaitand gets mutated before latercreate_track(...)calls can fail. That means one slow/missing source track stalls the first catalog publish, and a mid-loop error can still drop/publish a partial catalog that points at tracks you never finished wiring up.Also applies to: 80-106, 119-123
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/convert/fmp4.rs` around lines 37 - 50, The CatalogGuard (output_catalog.lock() -> guard) is held across await points and possible failures (subscribe_track.await and broadcast.create_track), causing premature or partial catalog publishes; instead, call subscribe_track(...).await and attempt broadcast.create_track(...) first without holding the guard, and only after create_track succeeds acquire output_catalog.lock(), insert the rendition into catalog.video.renditions (and any other catalog mutations), then drop the guard immediately so the lock is not held across awaits or long ops. Update the loop around subscribe_track, create_track, and guard usage in the code that touches output_catalog, subscribe_track, and broadcast.create_track so the guard is only alive for the minimal mutation window.
🧹 Nitpick comments (6)
js/watch/src/msf.ts (2)
43-44: Consider extracting default audio constants.The default
sampleRateof 48000 andnumberOfChannelsof 2 are reasonable but could be named constants for clarity.♻️ Optional: Named constants
+const DEFAULT_SAMPLE_RATE = 48000; +const DEFAULT_CHANNEL_COUNT = 2; + function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined { if (!track.codec) return undefined; return { codec: track.codec, container: toContainer(track), description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined, - sampleRate: u53(track.samplerate ?? 48000), - numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : 2), + sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE), + numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_CHANNEL_COUNT), bitrate: track.bitrate != null ? u53(track.bitrate) : undefined, }; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/msf.ts` around lines 43 - 44, Extract the literal defaults into named constants (e.g., DEFAULT_SAMPLE_RATE = 48000 and DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the inline literals in the sampleRate and numberOfChannels expressions so the code becomes u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare the constants near the top of js/watch/src/msf.ts so they are reusable and self-documenting.
6-13: Consider handling invalid base64 gracefully.
atob()throws aDOMExceptionif the input is not valid base64. If an MSF catalog contains malformedinitData, this will cause an uncaught exception without helpful context.♻️ Suggested defensive wrapper
function base64ToHex(b64: string): string { - const raw = atob(b64); + let raw: string; + try { + raw = atob(b64); + } catch { + console.warn("invalid base64 initData, skipping conversion"); + return ""; + } let hex = ""; for (let i = 0; i < raw.length; i++) { hex += raw.charCodeAt(i).toString(16).padStart(2, "0"); } return hex; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/msf.ts` around lines 6 - 13, The base64ToHex function currently calls atob(b64) which will throw a DOMException for invalid base64; wrap the atob call in a try/catch inside base64ToHex, handle the error by either returning a safe sentinel (e.g., null or empty string) or rethrowing a clearer Error with context (include the original exception and the offending b64 value), and ensure callers of base64ToHex handle the sentinel/exception. Update base64ToHex to validate or catch atob errors and document the chosen failure behavior so callers (where initData is parsed) can react appropriately.js/watch/src/broadcast.ts (2)
146-148: Redundant nullish coalescing.
updateis alreadyCatalog.Root | undefined, soupdate ?? undefinedis equivalent to justupdate.- return update ?? undefined; + return update;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/broadcast.ts` around lines 146 - 148, The return statement unnecessarily uses nullish coalescing on a value already typed as Catalog.Root | undefined; replace the redundant "return update ?? undefined;" with a plain "return update;" (locate the occurrence where the variable update is returned after the array close in broadcast.ts) so the behavior and types remain unchanged while removing the superfluous operator.
117-117: Consider extracting the headstart delay as a named constant.The 100ms delay is a tuning parameter that may need adjustment. A named constant would clarify intent and make it easier to find/change.
♻️ Optional: Named constant
+const HANG_HEADSTART_MS = 100; + // In `#runCatalog`: const msfFetch = msfTrack - ? new Promise((r) => setTimeout(r, 100)) + ? new Promise((r) => setTimeout(r, HANG_HEADSTART_MS)) .then(() => Msf.fetch(msfTrack))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/broadcast.ts` at line 117, Extract the magic number 100ms used in the headstart delay into a named top-level constant (e.g., HEADSTART_DELAY_MS or BROADCAST_HEADSTART_MS) in js/watch/src/broadcast.ts and replace the inline literal in the ternary new Promise((r) => setTimeout(r, 100)) with that constant; declare it near the top of the module (and export it if other modules may tune it) so the tuning parameter is clearly named and easy to change.rs/moq-mux/src/catalog.rs (1)
57-60: Make the consumed catalog format explicit.
CatalogProducernow publishes bothcatalog.jsonandcatalog, butconsume()still hardcodes the Hang path. Renaming this to something likeconsume_hang()or taking a format parameter would make the public API less surprising.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/catalog.rs` around lines 57 - 60, The public method consume() currently always subscribes to the Hang path but CatalogProducer now publishes both catalog.json and catalog, so update the API to make format explicit: either rename consume() to consume_hang() and leave behavior unchanged, or add a parameter (e.g., format: CatalogFormat enum with variants Hang and Json) to consume() and branch on it to subscribe to the appropriate track (e.g., use self.hang_track.consume() for Hang and the JSON track for Json). Update hang::CatalogConsumer construction or add a new consumer type if needed, and adjust all call sites and tests to use the new name or pass the format argument; also update public docs/comments for the changed function.rs/moq-lite/src/ietf/subscriber.rs (1)
521-522: Minor: Redundant variable rename.The parameter is renamed to
broadcast_pathand immediately aliased back tobroadcast. Consider keeping the original parameter namebroadcastif the rest of the function uses that name.Suggested simplification
- async fn run_subscribe(&mut self, broadcast_path: Path<'_>, mut track: TrackProducer) { - let broadcast = broadcast_path; + async fn run_subscribe(&mut self, broadcast: Path<'_>, mut track: TrackProducer) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 521 - 522, The function run_subscribe has a redundant alias: parameter broadcast_path is immediately assigned to broadcast; update the function signature async fn run_subscribe(&mut self, broadcast: Path<'_>, mut track: TrackProducer) and remove the temporary let broadcast = broadcast_path; so all uses refer directly to the broadcast parameter (or alternatively replace subsequent uses with broadcast_path if you prefer that name) — adjust the signature and remove the redundant binding in run_subscribe to eliminate the unnecessary rename.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/watch/src/broadcast.ts`:
- Around line 111-126: The current Promise.race can return undefined if
hangFetch resolves to undefined before msfFetch wakes up; change the logic so
only successful fetches compete: wrap hangFetch and msfFetch so they reject when
they would yield undefined (or use Promise.any with those wrapped promises) and
keep effect.cancel in the cancel slot; update the array built from hangFetch,
msfFetch and effect.cancel accordingly and then await the winner (e.g., await
Promise.any(wrappedCandidates)) so a quick failed hangFetch no longer causes an
early return and MSF can be selected when hang returns nothing.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 192-201: The Subscribe message is hardcoded with priority: 0 in
run_subscribe, preventing per-track priority; update the subscriber API to
accept and propagate a priority and set msg.priority accordingly: either add a
priority field to Track or TrackProducer (e.g., Track::priority or
TrackProducer::priority) and read it when building lite::Subscribe, or add a
priority parameter to run_subscribe so callers can pass the desired priority;
ensure the created Subscribe (in run_subscribe) uses that priority value instead
of 0 so the publisher can honor subscribe.priority when ordering deliveries.
---
Outside diff comments:
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 37-50: The CatalogGuard (output_catalog.lock() -> guard) is held
across await points and possible failures (subscribe_track.await and
broadcast.create_track), causing premature or partial catalog publishes;
instead, call subscribe_track(...).await and attempt broadcast.create_track(...)
first without holding the guard, and only after create_track succeeds acquire
output_catalog.lock(), insert the rendition into catalog.video.renditions (and
any other catalog mutations), then drop the guard immediately so the lock is not
held across awaits or long ops. Update the loop around subscribe_track,
create_track, and guard usage in the code that touches output_catalog,
subscribe_track, and broadcast.create_track so the guard is only alive for the
minimal mutation window.
---
Nitpick comments:
In `@js/watch/src/broadcast.ts`:
- Around line 146-148: The return statement unnecessarily uses nullish
coalescing on a value already typed as Catalog.Root | undefined; replace the
redundant "return update ?? undefined;" with a plain "return update;" (locate
the occurrence where the variable update is returned after the array close in
broadcast.ts) so the behavior and types remain unchanged while removing the
superfluous operator.
- Line 117: Extract the magic number 100ms used in the headstart delay into a
named top-level constant (e.g., HEADSTART_DELAY_MS or BROADCAST_HEADSTART_MS) in
js/watch/src/broadcast.ts and replace the inline literal in the ternary new
Promise((r) => setTimeout(r, 100)) with that constant; declare it near the top
of the module (and export it if other modules may tune it) so the tuning
parameter is clearly named and easy to change.
In `@js/watch/src/msf.ts`:
- Around line 43-44: Extract the literal defaults into named constants (e.g.,
DEFAULT_SAMPLE_RATE = 48000 and DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the
inline literals in the sampleRate and numberOfChannels expressions so the code
becomes u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare
the constants near the top of js/watch/src/msf.ts so they are reusable and
self-documenting.
- Around line 6-13: The base64ToHex function currently calls atob(b64) which
will throw a DOMException for invalid base64; wrap the atob call in a try/catch
inside base64ToHex, handle the error by either returning a safe sentinel (e.g.,
null or empty string) or rethrowing a clearer Error with context (include the
original exception and the offending b64 value), and ensure callers of
base64ToHex handle the sentinel/exception. Update base64ToHex to validate or
catch atob errors and document the chosen failure behavior so callers (where
initData is parsed) can react appropriately.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 521-522: The function run_subscribe has a redundant alias:
parameter broadcast_path is immediately assigned to broadcast; update the
function signature async fn run_subscribe(&mut self, broadcast: Path<'_>, mut
track: TrackProducer) and remove the temporary let broadcast = broadcast_path;
so all uses refer directly to the broadcast parameter (or alternatively replace
subsequent uses with broadcast_path if you prefer that name) — adjust the
signature and remove the redundant binding in run_subscribe to eliminate the
unnecessary rename.
In `@rs/moq-mux/src/catalog.rs`:
- Around line 57-60: The public method consume() currently always subscribes to
the Hang path but CatalogProducer now publishes both catalog.json and catalog,
so update the API to make format explicit: either rename consume() to
consume_hang() and leave behavior unchanged, or add a parameter (e.g., format:
CatalogFormat enum with variants Hang and Json) to consume() and branch on it to
subscribe to the appropriate track (e.g., use self.hang_track.consume() for Hang
and the JSON track for Json). Update hang::CatalogConsumer construction or add a
new consumer type if needed, and adjust all call sites and tests to use the new
name or pass the format argument; also update public docs/comments for the
changed function.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 10bb7c3e-9894-4069-ad11-b10ffd7b82a2
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (38)
js/msf/package.jsonjs/msf/src/catalog.tsjs/msf/src/index.tsjs/msf/tsconfig.jsonjs/watch/package.jsonjs/watch/src/broadcast.tsjs/watch/src/msf.tspackage.jsonrs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/hang/src/catalog/audio/mod.rsrs/hang/src/catalog/consumer.rsrs/hang/src/catalog/root.rsrs/hang/src/catalog/video/mod.rsrs/libmoq/src/consume.rsrs/moq-cli/src/subscribe.rsrs/moq-clock/src/clock.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/consumer.rsrs/moq-ffi/src/test.rsrs/moq-lite/src/error.rsrs/moq-lite/src/ietf/adapter.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/group.rsrs/moq-lite/src/model/track.rsrs/moq-mux/src/catalog.rsrs/moq-mux/src/consumer/ordered.rsrs/moq-mux/src/convert/fmp4.rsrs/moq-mux/src/convert/hang.rsrs/moq-mux/src/convert/test.rsrs/moq-native/examples/chat.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/web.rs
…cross awaits - broadcast.ts: Use Promise.any so a failed hang fetch no longer short-circuits MSF - lite/subscriber.rs: Read subscription params from consumer instead of hardcoding priority: 0 - fmp4.rs: Restructure to hold CatalogGuard only during catalog mutation, not across awaits - Minor: extract magic numbers, harden base64ToHex, remove redundant alias/nullish coalescing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
rs/moq-mux/src/convert/fmp4.rs (1)
142-180: Consider extracting duplicated spawn logic.The video and audio task spawning blocks (lines 143-161 and 162-180) are nearly identical, differing only in the
is_videoboolean passed toconvert_legacy_to_cmaf. A helper could reduce duplication.♻️ Optional: extract common spawn logic
fn spawn_track_task( tasks: &mut tokio::task::JoinSet<()>, name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer, convert: Option<(Vec<u8>, u64)>, is_video: bool, ) { match convert { None => { tasks.spawn(async move { if let Err(e) = passthrough_track(input, output).await { tracing::error!(%e, track = %name, "passthrough_track failed"); } }); } Some((_, timescale)) => { tasks.spawn(async move { if let Err(e) = convert_legacy_to_cmaf(input, output, timescale, is_video).await { tracing::error!(%e, track = %name, "convert_legacy_to_cmaf failed"); } }); } } }Then call:
for task in video_tasks { spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, true); } for task in audio_tasks { spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, false); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/convert/fmp4.rs` around lines 142 - 180, The video/audio spawn blocks duplicate logic; extract a helper (e.g., spawn_track_task) that accepts &mut tokio::task::JoinSet<()>, name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer, convert: Option<(Vec<u8>, u64)>, and is_video: bool, and moves the match that calls passthrough_track or convert_legacy_to_cmaf and logs errors; then replace the for loops over video_tasks and audio_tasks to call spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, true/false) respectively to remove duplication while keeping existing error logging via tracing::error!.js/watch/src/msf.ts (1)
8-8: Prefer documenting rationale, not just behavior, in helper comment.Consider briefly stating why invalid base64 is tolerated (e.g., keep catalog conversion resilient per-track).
As per coding guidelines, "Write comments that explain the 'why', not just the 'what'".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/msf.ts` at line 8, The helper comment that currently states "Convert base64 string to hex string, returning undefined on invalid input." should be expanded to include the rationale for tolerating invalid base64: explain that the base64→hex helper (the function that converts base64 strings to hex and returns undefined on invalid input) deliberately accepts and returns undefined for bad input to keep per-track catalog conversion resilient and avoid failing the whole conversion pipeline; update the comment to briefly state this "why" and keep the existing behavioral note (returning undefined) so future readers understand both behavior and intent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/watch/src/msf.ts`:
- Around line 43-53: In toAudioConfig, guard parsing of track.channelConfig
before calling u53: parse channelConfig with Number.parseInt only when it is a
valid numeric string (e.g., test with /^\s*\d+\s*$/ or Number.isFinite after
parse), fall back to DEFAULT_NUMBER_OF_CHANNELS when parse yields NaN or invalid
values, and then pass the safe integer to u53; update the computation that
currently calls u53(track.channelConfig ? Number.parseInt(track.channelConfig,
10) : DEFAULT_NUMBER_OF_CHANNELS) to validate the parse result first so u53
never receives NaN or malformed input.
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 258-270: The code currently constructs a default Subscription
(Subscription::default()) which ignores the subscriber's requested
start_group/end_group from the incoming Subscribe message, so replace the
default with a Subscription built from those fields: when handling subscribe
(the incoming Subscribe struct), map its start_group and end_group into
Subscription's start and end fields (e.g., Subscription { start:
subscribe.start_group.map(|g| g.into()), end: subscribe.end_group.map(|g|
g.into()), ..Default::default() }) and pass that into track.subscribe(...)
instead of Subscription::default(); keep the rest of the flow (Track::new,
consumer.ok_or, broadcast.consume_track, lite::SubscribeOk) unchanged.
---
Nitpick comments:
In `@js/watch/src/msf.ts`:
- Line 8: The helper comment that currently states "Convert base64 string to hex
string, returning undefined on invalid input." should be expanded to include the
rationale for tolerating invalid base64: explain that the base64→hex helper (the
function that converts base64 strings to hex and returns undefined on invalid
input) deliberately accepts and returns undefined for bad input to keep
per-track catalog conversion resilient and avoid failing the whole conversion
pipeline; update the comment to briefly state this "why" and keep the existing
behavioral note (returning undefined) so future readers understand both behavior
and intent.
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 142-180: The video/audio spawn blocks duplicate logic; extract a
helper (e.g., spawn_track_task) that accepts &mut tokio::task::JoinSet<()>,
name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer,
convert: Option<(Vec<u8>, u64)>, and is_video: bool, and moves the match that
calls passthrough_track or convert_legacy_to_cmaf and logs errors; then replace
the for loops over video_tasks and audio_tasks to call spawn_track_task(&mut
tasks, task.name, task.input, task.output, task.convert, true/false)
respectively to remove duplication while keeping existing error logging via
tracing::error!.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e144d820-8975-4097-ba6a-47f515ad7a3d
📒 Files selected for processing (8)
js/watch/src/broadcast.tsjs/watch/src/msf.tsrs/moq-lite/src/ietf/adapter.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-mux/src/convert/fmp4.rs
✅ Files skipped from review due to trivial changes (1)
- js/watch/src/broadcast.ts
🚧 Files skipped from review as they are similar to previous changes (4)
- rs/moq-lite/src/ietf/adapter.rs
- rs/moq-lite/src/ietf/subscriber.rs
- rs/moq-lite/src/lite/subscriber.rs
- rs/moq-lite/src/ietf/publisher.rs
| let track = Track::new(subscribe.track.to_string()); | ||
|
|
||
| // TODO wait until track.info() to get the *real* priority | ||
| let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; | ||
| let track = broadcast.consume_track(&track)?; | ||
| let subscriber = track.subscribe(Subscription::default()).await?; | ||
|
|
||
| let info = lite::SubscribeOk { | ||
| priority: track.info.priority, | ||
| priority: 0, | ||
| ordered: false, | ||
| max_latency: std::time::Duration::ZERO, | ||
| max_latency: Duration::ZERO, | ||
| start_group: None, | ||
| end_group: None, | ||
| }; |
There was a problem hiding this comment.
Subscriber's requested start_group/end_group are ignored.
The Subscribe message (per context snippet 4) contains start_group: Option<u64> and end_group: Option<u64> that subscribers use to request specific group ranges. Using Subscription::default() here ignores these fields entirely, since Subscription::default() sets start: None and end: None (meaning "no preference").
This breaks the protocol contract—subscribers requesting playback from a specific group will instead receive groups from wherever the publisher decides.
🔧 Proposed fix to honor subscriber's requested range
- let subscriber = track.subscribe(Subscription::default()).await?;
+ let subscriber = track
+ .subscribe(moq_lite::Subscription {
+ priority: subscribe.priority,
+ ordered: subscribe.ordered,
+ max_latency: subscribe.max_latency,
+ start: subscribe.start_group,
+ end: subscribe.end_group,
+ })
+ .await?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let track = Track::new(subscribe.track.to_string()); | |
| // TODO wait until track.info() to get the *real* priority | |
| let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; | |
| let track = broadcast.consume_track(&track)?; | |
| let subscriber = track.subscribe(Subscription::default()).await?; | |
| let info = lite::SubscribeOk { | |
| priority: track.info.priority, | |
| priority: 0, | |
| ordered: false, | |
| max_latency: std::time::Duration::ZERO, | |
| max_latency: Duration::ZERO, | |
| start_group: None, | |
| end_group: None, | |
| }; | |
| let track = Track::new(subscribe.track.to_string()); | |
| let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; | |
| let track = broadcast.consume_track(&track)?; | |
| let subscriber = track | |
| .subscribe(moq_lite::Subscription { | |
| priority: subscribe.priority, | |
| ordered: subscribe.ordered, | |
| max_latency: subscribe.max_latency, | |
| start: subscribe.start_group, | |
| end: subscribe.end_group, | |
| }) | |
| .await?; | |
| let info = lite::SubscribeOk { | |
| priority: 0, | |
| ordered: false, | |
| max_latency: Duration::ZERO, | |
| start_group: None, | |
| end_group: None, | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/lite/publisher.rs` around lines 258 - 270, The code currently
constructs a default Subscription (Subscription::default()) which ignores the
subscriber's requested start_group/end_group from the incoming Subscribe
message, so replace the default with a Subscription built from those fields:
when handling subscribe (the incoming Subscribe struct), map its start_group and
end_group into Subscription's start and end fields (e.g., Subscription { start:
subscribe.start_group.map(|g| g.into()), end: subscribe.end_group.map(|g|
g.into()), ..Default::default() }) and pass that into track.subscribe(...)
instead of Subscription::default(); keep the rest of the flow (Track::new,
consumer.ok_or, broadcast.consume_track, lite::SubscribeOk) unchanged.
- Add `catalog` attribute to <moq-watch> accepting "hang", "msf", or "auto" with zod validation; default changed from racing both to "hang" only - Guard channelConfig parsing in MSF catalog conversion to handle NaN - Propagate subscriber's start_group/end_group into Subscription Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 270-276: The SubscribeOk response is hardcoding priority: 0 while
scheduling uses subscribe.priority, causing a mismatch; update the code that
constructs lite::SubscribeOk (the info variable) to set priority to the incoming
subscribe.priority (or otherwise reflect the actual subscription priority) so
the client-visible SubscribeOk matches the scheduler behavior; ensure you
reference the request variable named subscribe and assign its priority into
lite::SubscribeOk::priority (handling any necessary cloning/copying or type
conversion).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e8c0efad-31a7-4768-9960-0f7e975085e7
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
js/watch/package.jsonjs/watch/src/broadcast.tsjs/watch/src/element.tsjs/watch/src/msf.tsrs/moq-lite/src/lite/publisher.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- js/watch/package.json
- js/watch/src/msf.ts
- js/watch/src/broadcast.ts
| let info = lite::SubscribeOk { | ||
| priority: track.info.priority, | ||
| priority: 0, | ||
| ordered: false, | ||
| max_latency: std::time::Duration::ZERO, | ||
| max_latency: Duration::ZERO, | ||
| start_group: None, | ||
| end_group: None, | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if the IETF publisher follows the same pattern for consistency
rg -n -A5 'SubscribeOk\s*\{' rs/moq-lite/src/ietf/publisher.rsRepository: moq-dev/moq
Length of output: 244
🏁 Script executed:
# Check the actual code snippet in lite/publisher.rs around lines 270-276
sed -n '265,280p' rs/moq-lite/src/lite/publisher.rsRepository: moq-dev/moq
Length of output: 363
🏁 Script executed:
# Check line 317 and surrounding context to see how subscribe.priority is used
sed -n '310,325p' rs/moq-lite/src/lite/publisher.rsRepository: moq-dev/moq
Length of output: 488
🏁 Script executed:
# Find the SubscribeOk struct definition to understand its fields
rg -n 'struct SubscribeOk|pub struct SubscribeOk' -A10 rs/Repository: moq-dev/moq
Length of output: 1489
🏁 Script executed:
# Check IETF publisher around the same area to compare priority handling
sed -n '130,160p' rs/moq-lite/src/ietf/publisher.rsRepository: moq-dev/moq
Length of output: 833
🏁 Script executed:
# Check context around the SubscribeOk creation for any comments
sed -n '250,290p' rs/moq-lite/src/lite/publisher.rsRepository: moq-dev/moq
Length of output: 1064
🏁 Script executed:
# Search for other uses of priority field in lite messages
rg -n 'priority:' rs/moq-lite/src/lite/publisher.rsRepository: moq-dev/moq
Length of output: 285
🏁 Script executed:
# Check if subscribe.priority has any constraints or is documented
rg -n 'subscribe\.priority|pub priority' rs/moq-lite/src/lite/ -B2 -A2Repository: moq-dev/moq
Length of output: 1629
🏁 Script executed:
# Check if there are tests that might clarify the expected behavior
find rs/moq-lite -name '*.rs' -exec grep -l 'SubscribeOk' {} \; | head -10Repository: moq-dev/moq
Length of output: 257
🏁 Script executed:
# Search for all SubscribeOk creations in the codebase
rg -n 'SubscribeOk\s*\{' rs/moq-lite/src/ -A5 | head -50Repository: moq-dev/moq
Length of output: 2942
🏁 Script executed:
# Check for any TODOs or FIXMEs related to priority or SubscribeOk
rg -n 'TODO|FIXME' rs/moq-lite/src/lite/publisher.rsRepository: moq-dev/moq
Length of output: 159
🏁 Script executed:
# Check the adapter to see how it handles SubscribeOk
rg -n -B5 -A10 'SubscribeOk' rs/moq-lite/src/ietf/adapter.rsRepository: moq-dev/moq
Length of output: 1808
SubscribeOk.priority hardcoded to 0 does not reflect the actual priority used for group scheduling.
The SubscribeOk response at line 271 hardcodes priority: 0, but line 317 schedules groups using subscribe.priority from the client request. This means clients are told the subscription priority is 0, while their requested priority is actually used for scheduling, creating a mismatch between advertised and actual behavior.
Either use subscribe.priority in the SubscribeOk response, or document why the priority is always reported as 0.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/lite/publisher.rs` around lines 270 - 276, The SubscribeOk
response is hardcoding priority: 0 while scheduling uses subscribe.priority,
causing a mismatch; update the code that constructs lite::SubscribeOk (the info
variable) to set priority to the incoming subscribe.priority (or otherwise
reflect the actual subscription priority) so the client-visible SubscribeOk
matches the scheduler behavior; ensure you reference the request variable named
subscribe and assign its priority into lite::SubscribeOk::priority (handling any
necessary cloning/copying or type conversion).
Summary
@moq/msfpackage with Zod schemas for MSF catalog format (matchingrs/moq-msftypes)@moq/watchbroadcast to subscribe to bothcatalog.json(hang) andcatalog(MSF) in paralleltoHang()) so the player works with MSF-only publishersUnknownBroadcast/UnknownTrackerror split,TrackSubscriberredesign, andmoq_muxconsumer updatesTest plan
just checkpassescatalog: ["hang"]andcatalog: ["msf"]for single-format mode🤖 Generated with Claude Code