diff --git a/bun.lock b/bun.lock index ac7c11629..b4121e26a 100644 --- a/bun.lock +++ b/bun.lock @@ -77,7 +77,7 @@ }, "js/lite": { "name": "@moq/lite", - "version": "0.1.6", + "version": "0.1.7", "dependencies": { "@moq/qmux": "^0.0.6", "@moq/signals": "workspace:*", @@ -94,6 +94,18 @@ "zod": "^4.1.0", }, }, + "js/msf": { + "name": "@moq/msf", + "version": "0.1.0", + "dependencies": { + "@moq/lite": "workspace:^", + "zod": "^4.1.5", + }, + "devDependencies": { + "rimraf": "^6.0.1", + "typescript": "^5.9.2", + }, + }, "js/publish": { "name": "@moq/publish", "version": "0.2.3", @@ -173,8 +185,10 @@ "dependencies": { "@moq/hang": "workspace:^", "@moq/lite": "workspace:^", + "@moq/msf": "workspace:^", "@moq/signals": "workspace:^", "@moq/ui-core": "workspace:^", + "zod": "^4.1.5", }, "devDependencies": { "@types/audioworklet": "^0.0.77", @@ -449,6 +463,8 @@ "@moq/lite": ["@moq/lite@workspace:js/lite"], + "@moq/msf": ["@moq/msf@workspace:js/msf"], + "@moq/publish": ["@moq/publish@workspace:js/publish"], "@moq/qmux": ["@moq/qmux@0.0.6", "", {}, "sha512-ISuGz05lUvf1hzHW3Aw3VnsGRJe1w9Qdog3LQ66KS+l+5mzQsPANvW8yOioEe1Z9dJO2G3sAHoGPnzwnsY9SIQ=="], diff --git a/js/msf/package.json b/js/msf/package.json new file mode 100644 index 000000000..843e1d54d --- /dev/null +++ b/js/msf/package.json @@ -0,0 +1,24 @@ +{ + "name": "@moq/msf", + "type": "module", + "version": "0.1.0", + "description": "MSF (MOQT Streaming Format) catalog types for MoQ", + "license": "(MIT OR Apache-2.0)", + "repository": "github:moq-dev/moq", + "exports": { + ".": "./src/index.ts" + }, + "scripts": { + "build": "rimraf dist && tsc -b && bun ../common/package.ts", + "check": "tsc --noEmit", + "release": "bun ../common/release.ts" + }, + "dependencies": { + "@moq/lite": "workspace:^", + "zod": "^4.1.5" + }, + "devDependencies": { + "rimraf": "^6.0.1", + "typescript": "^5.9.2" + } +} diff --git a/js/msf/src/catalog.ts b/js/msf/src/catalog.ts new file mode 100644 index 000000000..7ee67368b --- /dev/null +++ b/js/msf/src/catalog.ts @@ -0,0 +1,61 @@ +import type * as Moq from "@moq/lite"; +import { z } from "zod"; + +export const PackagingSchema = z.enum(["loc", "cmaf", "legacy", "mediatimeline", "eventtimeline"]).or(z.string()); + +export type Packaging = z.infer; + +export const RoleSchema = z + .enum(["video", "audio", "audiodescription", "caption", "subtitle", "signlanguage"]) + .or(z.string()); + +export type Role = z.infer; + +export const TrackSchema = z.object({ + name: z.string(), + packaging: PackagingSchema, + isLive: z.boolean(), + role: RoleSchema.optional(), + codec: z.string().optional(), + width: z.number().optional(), + height: z.number().optional(), + framerate: z.number().optional(), + samplerate: z.number().optional(), + channelConfig: z.string().optional(), + bitrate: z.number().optional(), + initData: z.string().optional(), + renderGroup: z.number().optional(), + altGroup: z.number().optional(), +}); + +export type Track = z.infer; + +export const CatalogSchema = z.object({ + version: z.literal(1), + tracks: z.array(TrackSchema), +}); + +export type Catalog = z.infer; + +export function encode(catalog: Catalog): Uint8Array { + const encoder = new TextEncoder(); + return encoder.encode(JSON.stringify(catalog)); +} + +export function decode(raw: Uint8Array): Catalog { + const decoder = new TextDecoder(); + const str = decoder.decode(raw); + try { + const json = JSON.parse(str); + return CatalogSchema.parse(json); + } catch (error) { + console.warn("invalid MSF catalog", str); + throw error; + } +} + +export async function fetch(track: Moq.Track): Promise { + const frame = await track.readFrame(); + if (!frame) return undefined; + return decode(frame); +} diff --git a/js/msf/src/index.ts b/js/msf/src/index.ts new file mode 100644 index 000000000..669ac6719 --- /dev/null +++ b/js/msf/src/index.ts @@ -0,0 +1 @@ +export * from "./catalog"; diff --git a/js/msf/tsconfig.json b/js/msf/tsconfig.json new file mode 100644 index 000000000..0f506334d --- /dev/null +++ b/js/msf/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "./src" + }, + "include": ["src"] +} diff --git a/js/watch/package.json b/js/watch/package.json index 9b032beb1..637355c6b 100644 --- a/js/watch/package.json +++ b/js/watch/package.json @@ -26,8 +26,10 @@ "dependencies": { "@moq/hang": "workspace:^", "@moq/lite": "workspace:^", + "@moq/msf": "workspace:^", "@moq/signals": "workspace:^", - "@moq/ui-core": "workspace:^" + "@moq/ui-core": "workspace:^", + "zod": "^4.1.5" }, "devDependencies": { "@types/audioworklet": "^0.0.77", diff --git a/js/watch/src/broadcast.ts b/js/watch/src/broadcast.ts index 5ebc9181c..c607cb6bb 100644 --- a/js/watch/src/broadcast.ts +++ b/js/watch/src/broadcast.ts @@ -1,7 +1,20 @@ import * as Catalog from "@moq/hang/catalog"; import type * as Moq from "@moq/lite"; import { Path } from "@moq/lite"; +import * as Msf from "@moq/msf"; import { Effect, type Getter, Signal } from "@moq/signals"; +import { z } from "zod"; + +import { toHang } from "./msf"; + +export const catalogFormatSchema = z.enum(["hang", "msf"]); +export type CatalogFormat = z.infer; + +export const catalogAttrSchema = z.enum(["hang", "msf", "auto"]); +export type CatalogAttr = z.infer; + +/** Delay (ms) before attempting MSF catalog fetch, giving hang format a headstart. */ +const HANG_HEADSTART_MS = 100; export interface BroadcastProps { connection?: Moq.Connection.Established | Signal; @@ -19,6 +32,9 @@ export interface BroadcastProps { // Whether to reload the broadcast when it goes offline. // Defaults to false; pass true to wait for an announcement before subscribing. reload?: boolean | Signal; + + // Which catalog formats to try. Default: ["hang"] + catalog?: CatalogFormat[] | Signal; } // A catalog source that (optionally) reloads automatically when live/offline. @@ -30,6 +46,8 @@ export class Broadcast { status = new Signal<"offline" | "loading" | "live">("offline"); reload: Signal; + catalogFormats: Signal; + #active = new Signal(undefined); readonly active: Getter = this.#active; @@ -46,6 +64,7 @@ export class Broadcast { this.name = Signal.from(props?.name ?? Path.empty()); this.enabled = Signal.from(props?.enabled ?? false); this.reload = Signal.from(props?.reload ?? false); + this.catalogFormats = Signal.from(props?.catalog ?? (["hang"] as CatalogFormat[])); this.#announced = props?.announced ?? new Signal(new Set()); @@ -84,21 +103,75 @@ export class Broadcast { if (!values) return; const [_, broadcast] = values; + const formats = effect.get(this.catalogFormats); this.status.set("loading"); - const catalog = broadcast.subscribe("catalog.json", Catalog.PRIORITY.catalog); - effect.cleanup(() => catalog.close()); + const hangTrack = formats.includes("hang") + ? broadcast.subscribe("catalog.json", Catalog.PRIORITY.catalog) + : undefined; + const msfTrack = formats.includes("msf") ? broadcast.subscribe("catalog", Catalog.PRIORITY.catalog) : undefined; + + if (hangTrack) effect.cleanup(() => hangTrack.close()); + if (msfTrack) effect.cleanup(() => msfTrack.close()); effect.spawn(async () => { try { - for (;;) { - const update = await Promise.race([effect.cancel, Catalog.fetch(catalog)]); - if (!update) break; + // Race the first catalog fetch, giving hang a headstart. + // Wrap each fetch so undefined results reject, ensuring only + // successful fetches compete (via Promise.any). + const hangFetch = hangTrack + ? Catalog.fetch(hangTrack).then((r) => { + if (r) return { kind: "hang" as const, root: r }; + throw new Error("hang catalog empty"); + }) + : undefined; + + const msfFetch = msfTrack + ? new Promise((r) => setTimeout(r, HANG_HEADSTART_MS)) + .then(() => Msf.fetch(msfTrack)) + .then((c) => { + if (c) return { kind: "msf" as const, root: toHang(c) }; + throw new Error("msf catalog empty"); + }) + : undefined; + + const candidates = [hangFetch, msfFetch].filter((c): c is NonNullable => c != null); + if (candidates.length === 0) return; + + const first = await Promise.race([effect.cancel.then(() => undefined), Promise.any(candidates)]); + if (!first) return; + + // Close the loser + if (first.kind === "hang") { + msfTrack?.close(); + } else { + hangTrack?.close(); + } - console.debug("received catalog", this.name.peek(), update); + console.debug("received catalog", first.kind, this.name.peek(), first.root); + this.#catalog.set(first.root); + this.status.set("live"); + + // Continue reading updates from the winner + const fetchNext = + first.kind === "hang" + ? async () => { + const update = await Promise.race([ + effect.cancel, + Catalog.fetch(hangTrack as Moq.Track), + ]); + return update; + } + : async () => { + const update = await Promise.race([effect.cancel, Msf.fetch(msfTrack as Moq.Track)]); + return update ? toHang(update) : undefined; + }; - this.#catalog.set(update); - this.status.set("live"); + for (;;) { + const root = await fetchNext(); + if (!root) break; + console.debug("received catalog", first.kind, this.name.peek(), root); + this.#catalog.set(root); } } catch (err) { console.warn("error fetching catalog", this.name.peek(), err); diff --git a/js/watch/src/element.ts b/js/watch/src/element.ts index 6302083dd..92f2971ae 100644 --- a/js/watch/src/element.ts +++ b/js/watch/src/element.ts @@ -2,10 +2,17 @@ import type { Time } from "@moq/lite"; import * as Moq from "@moq/lite"; import { Effect, Signal } from "@moq/signals"; import { MultiBackend } from "./backend"; -import { Broadcast } from "./broadcast"; +import { Broadcast, type CatalogAttr, type CatalogFormat, catalogAttrSchema } from "./broadcast"; import { Sync } from "./sync"; -const OBSERVED = ["url", "name", "paused", "volume", "muted", "reload", "jitter"] as const; +function parseCatalogAttr(value: string | null): CatalogFormat[] { + const parsed = catalogAttrSchema.safeParse(value); + if (!parsed.success) return ["hang"]; + if (parsed.data === "auto") return ["hang", "msf"]; + return [parsed.data]; +} + +const OBSERVED = ["url", "name", "paused", "volume", "muted", "reload", "jitter", "catalog"] as const; type Observed = (typeof OBSERVED)[number]; // Close everything when this element is garbage collected. @@ -152,6 +159,8 @@ export default class MoqWatch extends HTMLElement { this.broadcast.reload.set(newValue !== null); } else if (name === "jitter") { this.backend.jitter.set((newValue ? Number.parseFloat(newValue) : 100) as Time.Milli); + } else if (name === "catalog") { + this.broadcast.catalogFormats.set(parseCatalogAttr(newValue)); } else { const exhaustive: never = name; throw new Error(`Invalid attribute: ${exhaustive}`); @@ -213,6 +222,18 @@ export default class MoqWatch extends HTMLElement { set jitter(value: number) { this.backend.jitter.set(value as Time.Milli); } + + get catalog(): CatalogFormat[] { + return this.broadcast.catalogFormats.peek(); + } + + set catalog(value: CatalogAttr | CatalogFormat[]) { + if (typeof value === "string") { + this.broadcast.catalogFormats.set(parseCatalogAttr(value)); + } else { + this.broadcast.catalogFormats.set(value); + } + } } customElements.define("moq-watch", MoqWatch); diff --git a/js/watch/src/msf.ts b/js/watch/src/msf.ts new file mode 100644 index 000000000..37e864671 --- /dev/null +++ b/js/watch/src/msf.ts @@ -0,0 +1,88 @@ +import type * as Catalog from "@moq/hang/catalog"; +import { u53 } from "@moq/hang/catalog"; +import type * as Msf from "@moq/msf"; + +const DEFAULT_SAMPLE_RATE = 48000; +const DEFAULT_NUMBER_OF_CHANNELS = 2; + +// Convert base64 string to hex string, returning undefined on invalid input. +function base64ToHex(b64: string): string | undefined { + try { + const raw = atob(b64); + let hex = ""; + for (let i = 0; i < raw.length; i++) { + hex += raw.charCodeAt(i).toString(16).padStart(2, "0"); + } + return hex; + } catch { + return undefined; + } +} + +function toContainer(track: Msf.Track): Catalog.Container { + if (track.packaging === "cmaf" && track.initData) { + return { kind: "cmaf", initData: track.initData }; + } + return { kind: "legacy" }; +} + +function toVideoConfig(track: Msf.Track): Catalog.VideoConfig | undefined { + if (!track.codec) return undefined; + + return { + codec: track.codec, + container: toContainer(track), + description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined, + codedWidth: track.width != null ? u53(track.width) : undefined, + codedHeight: track.height != null ? u53(track.height) : undefined, + framerate: track.framerate, + bitrate: track.bitrate != null ? u53(track.bitrate) : undefined, + }; +} + +function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined { + if (!track.codec) return undefined; + + return { + codec: track.codec, + container: toContainer(track), + description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined, + sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE), + numberOfChannels: u53( + (() => { + if (!track.channelConfig) return DEFAULT_NUMBER_OF_CHANNELS; + const parsed = Number.parseInt(track.channelConfig, 10); + return Number.isFinite(parsed) ? parsed : DEFAULT_NUMBER_OF_CHANNELS; + })(), + ), + bitrate: track.bitrate != null ? u53(track.bitrate) : undefined, + }; +} + +/** Convert an MSF catalog to a hang catalog Root. */ +export function toHang(msf: Msf.Catalog): Catalog.Root { + const videoRenditions: Record = {}; + const audioRenditions: Record = {}; + + for (const track of msf.tracks) { + if (track.role === "video") { + const config = toVideoConfig(track); + if (config) videoRenditions[track.name] = config; + } else if (track.role === "audio") { + const config = toAudioConfig(track); + if (config) audioRenditions[track.name] = config; + } + } + + const root: Catalog.Root = {}; + + if (Object.keys(videoRenditions).length > 0) { + root.video = { renditions: videoRenditions }; + } + + if (Object.keys(audioRenditions).length > 0) { + root.audio = { renditions: audioRenditions }; + } + + return root; +} diff --git a/package.json b/package.json index d08b306c8..039e35e29 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "js/clock", "js/token", "js/hang", + "js/msf", "js/ui-core", "js/watch", "js/publish", diff --git a/rs/hang/examples/subscribe.rs b/rs/hang/examples/subscribe.rs index 1e936cbee..b707d19ac 100644 --- a/rs/hang/examples/subscribe.rs +++ b/rs/hang/examples/subscribe.rs @@ -49,8 +49,9 @@ async fn run_subscribe(mut consumer: moq_lite::OriginConsumer) -> anyhow::Result tracing::info!(%path, "broadcast announced"); // Read the catalog to discover available tracks. - let catalog_track = broadcast.subscribe_track(&hang::Catalog::default_track())?; - let mut catalog = hang::CatalogConsumer::new(catalog_track); + let catalog_track = broadcast.consume_track(&hang::Catalog::default_track())?; + let catalog_sub = catalog_track.subscribe(moq_lite::Subscription::default()).await?; + let mut catalog = hang::CatalogConsumer::new(catalog_sub); let info = catalog.next().await?.ok_or_else(|| anyhow::anyhow!("no catalog"))?; @@ -71,16 +72,14 @@ async fn run_subscribe(mut consumer: moq_lite::OriginConsumer) -> anyhow::Result ); // Subscribe to the video track. - let track = moq_lite::Track { - name: name.clone(), - priority: 1, - }; + let track = moq_lite::Track::new(name.clone()); - let track_consumer = broadcast.subscribe_track(&track)?; + let track_consumer = broadcast.consume_track(&track)?; + let track_sub = track_consumer.subscribe(moq_lite::Subscription::default()).await?; // Skip over groups where all frames are older than 500ms to maintain low latency. let mut ordered = - moq_mux::consumer::OrderedConsumer::new(track_consumer, moq_mux::consumer::Legacy, Duration::from_millis(500)); + moq_mux::consumer::OrderedConsumer::new(track_sub, moq_mux::consumer::Legacy, Duration::from_millis(500)); // Read frames in presentation order. while let Some(frame) = ordered.read().await? { diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index b25c87135..a592ea606 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -40,10 +40,7 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> { // The catalog can contain multiple tracks, used by the viewer to choose the best track. fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> anyhow::Result { // Basic information about the video track. - let video_track = moq_lite::Track { - name: "video".to_string(), - priority: 1, // Video typically has lower priority than audio - }; + let video_track = moq_lite::Track::new("video"); // Example video configuration // In a real application, you would get this from the encoder diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index 1958183c2..65c7dcef2 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -38,8 +38,7 @@ impl Audio { if let btree_map::Entry::Vacant(entry) = self.renditions.entry(name.clone()) { entry.insert(config.clone()); - // TODO: Remove priority - return moq_lite::Track { name, priority: 2 }; + return moq_lite::Track::new(name); } } diff --git a/rs/hang/src/catalog/consumer.rs b/rs/hang/src/catalog/consumer.rs index 1339249a4..33b042716 100644 --- a/rs/hang/src/catalog/consumer.rs +++ b/rs/hang/src/catalog/consumer.rs @@ -2,18 +2,17 @@ use crate::{Catalog, Result}; /// A catalog consumer, used to receive catalog updates and discover tracks. /// -/// This wraps a `moq_lite::TrackConsumer` and automatically deserializes JSON +/// This wraps a `moq_lite::TrackSubscriber` and automatically deserializes JSON /// catalog data to discover available audio and video tracks in a broadcast. -#[derive(Clone)] pub struct CatalogConsumer { - /// Access to the underlying track consumer. - pub track: moq_lite::TrackConsumer, + /// Access to the underlying track subscriber. + pub track: moq_lite::TrackSubscriber, group: Option, } impl CatalogConsumer { - /// Create a new catalog consumer from a MoQ track consumer. - pub fn new(track: moq_lite::TrackConsumer) -> Self { + /// Create a new catalog consumer from a MoQ track subscriber. + pub fn new(track: moq_lite::TrackSubscriber) -> Self { Self { track, group: None } } @@ -42,15 +41,10 @@ impl CatalogConsumer { } } } - - /// Wait until the catalog track is closed. - pub async fn closed(&self) -> Result<()> { - Ok(self.track.closed().await?) - } } -impl From for CatalogConsumer { - fn from(inner: moq_lite::TrackConsumer) -> Self { +impl From for CatalogConsumer { + fn from(inner: moq_lite::TrackSubscriber) -> Self { Self::new(inner) } } diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 731005b25..5d8ae5fe9 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -77,10 +77,7 @@ impl Catalog { } pub fn default_track() -> moq_lite::Track { - moq_lite::Track { - name: Catalog::DEFAULT_NAME.to_string(), - priority: 100, - } + moq_lite::Track::new(Catalog::DEFAULT_NAME) } } diff --git a/rs/hang/src/catalog/video/mod.rs b/rs/hang/src/catalog/video/mod.rs index f3ef4ff9b..b293f9a84 100644 --- a/rs/hang/src/catalog/video/mod.rs +++ b/rs/hang/src/catalog/video/mod.rs @@ -58,8 +58,7 @@ impl Video { }; if let btree_map::Entry::Vacant(entry) = self.renditions.entry(name.clone()) { entry.insert(config.clone()); - // TODO: Remove priority - return moq_lite::Track { name, priority: 1 }; + return moq_lite::Track::new(name); } } @@ -85,7 +84,7 @@ pub struct Display { /// This struct contains all the information needed to initialize a video decoder, /// including codec-specific parameters, resolution, and optional metadata. /// -/// Reference: +/// Reference: #[serde_with::serde_as] #[serde_with::skip_serializing_none] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/rs/libmoq/src/consume.rs b/rs/libmoq/src/consume.rs index d018589f4..49d988a05 100644 --- a/rs/libmoq/src/consume.rs +++ b/rs/libmoq/src/consume.rs @@ -50,7 +50,7 @@ impl Consume { pub fn catalog(&mut self, broadcast: Id, on_catalog: OnStatus) -> Result { let broadcast = self.broadcast.get(broadcast).ok_or(Error::BroadcastNotFound)?.clone(); - let catalog = broadcast.subscribe_track(&hang::catalog::Catalog::default_track())?; + let track = broadcast.consume_track(&hang::catalog::Catalog::default_track())?; let channel = oneshot::channel(); let entry = TaskEntry { @@ -61,7 +61,7 @@ impl Consume { tokio::spawn(async move { let res = tokio::select! { - res = Self::run_catalog(id, broadcast, catalog.into()) => res, + res = Self::run_catalog(id, broadcast, track) => res, _ = channel.1 => Ok(()), }; @@ -77,8 +77,10 @@ impl Consume { async fn run_catalog( task_id: Id, broadcast: moq_lite::BroadcastConsumer, - mut catalog: hang::CatalogConsumer, + track: moq_lite::TrackConsumer, ) -> Result<(), Error> { + let subscriber = track.subscribe(moq_lite::Subscription::default()).await?; + let mut catalog = hang::CatalogConsumer::new(subscriber); while let Some(catalog) = catalog.next().await? { // Unfortunately we need to store the codec information on the heap. let audio_codec = catalog @@ -219,11 +221,9 @@ impl Consume { .nth(index) .ok_or(Error::NoIndex)?; - let track = consume.broadcast.subscribe_track(&moq_lite::Track { - name: rendition.clone(), - priority: 1, // TODO: Remove priority - })?; - let track = LegacyConsumer::new(track, moq_mux::consumer::Legacy, latency); + let track = consume + .broadcast + .consume_track(&moq_lite::Track::new(rendition.clone()))?; let channel = oneshot::channel(); let entry = TaskEntry { @@ -234,7 +234,7 @@ impl Consume { tokio::spawn(async move { let res = tokio::select! { - res = Self::run_track(id, track) => res, + res = Self::run_track(id, track, latency) => res, _ = channel.1 => Ok(()), }; @@ -263,11 +263,9 @@ impl Consume { .nth(index) .ok_or(Error::NoIndex)?; - let track = consume.broadcast.subscribe_track(&moq_lite::Track { - name: rendition.clone(), - priority: 2, // TODO: Remove priority - })?; - let track = LegacyConsumer::new(track, moq_mux::consumer::Legacy, latency); + let track = consume + .broadcast + .consume_track(&moq_lite::Track::new(rendition.clone()))?; let channel = oneshot::channel(); let entry = TaskEntry { @@ -278,7 +276,7 @@ impl Consume { tokio::spawn(async move { let res = tokio::select! { - res = Self::run_track(id, track) => res, + res = Self::run_track(id, track, latency) => res, _ = channel.1 => Ok(()), }; @@ -291,7 +289,9 @@ impl Consume { Ok(id) } - async fn run_track(task_id: Id, mut track: LegacyConsumer) -> Result<(), Error> { + async fn run_track(task_id: Id, track: moq_lite::TrackConsumer, latency: std::time::Duration) -> Result<(), Error> { + let subscriber = track.subscribe(moq_lite::Subscription::default()).await?; + let mut track = LegacyConsumer::new(subscriber, moq_mux::consumer::Legacy, latency); while let Some(mut ordered) = track.read().await? { // TODO add a chunking API so we don't have to (potentially) allocate a contiguous buffer for the frame. let mut new_payload = hang::container::BufList::new(); diff --git a/rs/moq-cli/src/subscribe.rs b/rs/moq-cli/src/subscribe.rs index 200549d58..4f3545497 100644 --- a/rs/moq-cli/src/subscribe.rs +++ b/rs/moq-cli/src/subscribe.rs @@ -41,8 +41,8 @@ impl Subscribe { let cmaf_consumer = cmaf_output.consume(); let converter = moq_mux::convert::Fmp4::new(self.broadcast, cmaf_output); - // Subscribe to the catalog before the converter starts, so we don't miss it. - let catalog_track = cmaf_consumer.subscribe_track(&hang::Catalog::default_track())?; + // Consume the catalog track before the converter starts, so we don't miss it. + let catalog_track = cmaf_consumer.consume_track(&hang::Catalog::default_track())?; let max_latency = std::time::Duration::from_millis(self.args.max_latency); @@ -64,7 +64,8 @@ async fn mux_fmp4( ) -> anyhow::Result<()> { let mut stdout = tokio::io::stdout(); - let mut catalog_consumer = hang::CatalogConsumer::new(catalog_track); + let catalog_sub = catalog_track.subscribe(moq_lite::Subscription::default()).await?; + let mut catalog_consumer = hang::CatalogConsumer::new(catalog_sub); let catalog = catalog_consumer.next().await?.context("empty catalog")?; // Build exporter from catalog (for init segment) @@ -79,10 +80,9 @@ async fn mux_fmp4( let mut muxer_tracks = Vec::new(); for (name, config) in &catalog.video.renditions { - let track = cmaf_consumer.subscribe_track(&moq_lite::Track { - name: name.clone(), - priority: 1, - })?; + let track = cmaf_consumer + .subscribe_track(&moq_lite::Track::new(name.clone()), moq_lite::Subscription::default()) + .await?; let timescale = match &config.container { hang::catalog::Container::Cmaf { init_data } => parse_timescale_from_init(init_data)?, @@ -97,10 +97,9 @@ async fn mux_fmp4( } for (name, config) in &catalog.audio.renditions { - let track = cmaf_consumer.subscribe_track(&moq_lite::Track { - name: name.clone(), - priority: 2, - })?; + let track = cmaf_consumer + .subscribe_track(&moq_lite::Track::new(name.clone()), moq_lite::Subscription::default()) + .await?; let timescale = match &config.container { hang::catalog::Container::Cmaf { init_data } => parse_timescale_from_init(init_data)?, diff --git a/rs/moq-clock/src/clock.rs b/rs/moq-clock/src/clock.rs index 527f218a6..67e16f3e3 100644 --- a/rs/moq-clock/src/clock.rs +++ b/rs/moq-clock/src/clock.rs @@ -71,11 +71,11 @@ impl Publisher { } } pub struct Subscriber { - track: TrackConsumer, + track: TrackSubscriber, } impl Subscriber { - pub fn new(track: TrackConsumer) -> Self { + pub fn new(track: TrackSubscriber) -> Self { Self { track } } diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index 1ebb5f21c..44408f05f 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -53,10 +53,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!(url = ?config.url, "connecting to server"); - let track = Track { - name: config.track, - priority: 0, - }; + let track = Track::new(config.track); let origin = moq_lite::Origin::produce(); @@ -97,8 +94,9 @@ async fn main() -> anyhow::Result<()> { Some(announce) = origin.announced() => match announce { (path, Some(broadcast)) => { tracing::info!(broadcast = %path, "broadcast is online, subscribing to track"); - let track = broadcast.subscribe_track(&track)?; - clock = Some(clock::Subscriber::new(track)); + let track = broadcast.consume_track(&track)?; + let subscriber = track.subscribe(Subscription::default()).await?; + clock = Some(clock::Subscriber::new(subscriber)); } (path, None) => { tracing::warn!(broadcast = %path, "broadcast is offline, waiting..."); diff --git a/rs/moq-ffi/src/consumer.rs b/rs/moq-ffi/src/consumer.rs index 06ef9cfaa..2ac012788 100644 --- a/rs/moq-ffi/src/consumer.rs +++ b/rs/moq-ffi/src/consumer.rs @@ -76,10 +76,10 @@ impl Media { #[uniffi::export] impl MoqBroadcastConsumer { /// Subscribe to the catalog for this broadcast. - pub fn subscribe_catalog(&self) -> Result, MoqError> { - let _guard = crate::ffi::RUNTIME.enter(); - let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?; - let consumer = hang::CatalogConsumer::from(track); + pub async fn subscribe_catalog(&self) -> Result, MoqError> { + let track = self.inner.consume_track(&hang::catalog::Catalog::default_track())?; + let subscriber = track.subscribe(moq_lite::Subscription::default()).await?; + let consumer = hang::CatalogConsumer::new(subscriber); Ok(Arc::new(MoqCatalogConsumer { task: Task::new(Catalog { inner: consumer }), })) @@ -88,11 +88,11 @@ impl MoqBroadcastConsumer { /// Subscribe to a media track by name, delivering frames in decode order. /// /// `max_latency_ms` controls the maximum buffering before skipping a GoP. - pub fn subscribe_media(&self, name: String, max_latency_ms: u64) -> Result, MoqError> { - let _guard = crate::ffi::RUNTIME.enter(); - let track = self.inner.subscribe_track(&moq_lite::Track { name, priority: 0 })?; + pub async fn subscribe_media(&self, name: String, max_latency_ms: u64) -> Result, MoqError> { + let track = self.inner.consume_track(&moq_lite::Track::new(name))?; + let subscriber = track.subscribe(moq_lite::Subscription::default()).await?; let latency = std::time::Duration::from_millis(max_latency_ms); - let consumer = LegacyConsumer::new(track, moq_mux::consumer::Legacy, latency); + let consumer = LegacyConsumer::new(subscriber, moq_mux::consumer::Legacy, latency); Ok(Arc::new(MoqMediaConsumer { task: Task::new(Media { inner: consumer }), })) diff --git a/rs/moq-ffi/src/test.rs b/rs/moq-ffi/src/test.rs index 10150767a..030b06eff 100644 --- a/rs/moq-ffi/src/test.rs +++ b/rs/moq-ffi/src/test.rs @@ -83,7 +83,7 @@ async fn local_publish_consume_audio() { assert_eq!(announcement.path(), "live"); let broadcast_consumer = announcement.broadcast(); - let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap(); + let catalog_consumer = broadcast_consumer.subscribe_catalog().await.unwrap(); let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next()) .await @@ -98,7 +98,10 @@ async fn local_publish_consume_audio() { assert_eq!(audio.channel_count, 2); assert!(catalog.video.is_empty()); - let media_consumer = broadcast_consumer.subscribe_media(track_name.clone(), 10_000).unwrap(); + let media_consumer = broadcast_consumer + .subscribe_media(track_name.clone(), 10_000) + .await + .unwrap(); let payload = b"opus audio payload data".to_vec(); media.write_frame(payload.clone(), 1_000_000).unwrap(); @@ -131,7 +134,7 @@ async fn video_publish_consume() { .expect("expected announcement"); let broadcast_consumer = announcement.broadcast(); - let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap(); + let catalog_consumer = broadcast_consumer.subscribe_catalog().await.unwrap(); let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next()) .await @@ -151,7 +154,10 @@ async fn video_publish_consume() { assert_eq!(coded.height, 720); assert!(catalog.audio.is_empty()); - let media_consumer = broadcast_consumer.subscribe_media(track_name.clone(), 10_000).unwrap(); + let media_consumer = broadcast_consumer + .subscribe_media(track_name.clone(), 10_000) + .await + .unwrap(); let keyframe = vec![0x00, 0x00, 0x00, 0x01, 0x65, 0xAA, 0xBB, 0xCC]; media.write_frame(keyframe, 0).unwrap(); @@ -183,7 +189,7 @@ async fn multiple_frames_ordering() { .unwrap(); let broadcast_consumer = announcement.broadcast(); - let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap(); + let catalog_consumer = broadcast_consumer.subscribe_catalog().await.unwrap(); let catalog = tokio::time::timeout(TIMEOUT, catalog_consumer.next()) .await .unwrap() @@ -191,7 +197,7 @@ async fn multiple_frames_ordering() { .unwrap(); let track_name = catalog.audio.keys().next().unwrap().clone(); - let media_consumer = broadcast_consumer.subscribe_media(track_name, 10_000).unwrap(); + let media_consumer = broadcast_consumer.subscribe_media(track_name, 10_000).await.unwrap(); let timestamps: [u64; 5] = [0, 20_000, 40_000, 60_000, 80_000]; for (i, &ts) in timestamps.iter().enumerate() { @@ -229,7 +235,7 @@ async fn catalog_update_on_new_track() { .unwrap(); let broadcast_consumer = announcement.broadcast(); - let catalog_consumer = broadcast_consumer.subscribe_catalog().unwrap(); + let catalog_consumer = broadcast_consumer.subscribe_catalog().await.unwrap(); let catalog1 = tokio::time::timeout(TIMEOUT, catalog_consumer.next()) .await @@ -278,7 +284,7 @@ async fn announced_broadcast() { .expect("expected announcement"); assert_eq!(announcement.path(), "test/broadcast"); - let _catalog = announcement.broadcast().subscribe_catalog().unwrap(); + let _catalog = announcement.broadcast().subscribe_catalog().await.unwrap(); } #[test] diff --git a/rs/moq-lite/src/error.rs b/rs/moq-lite/src/error.rs index 09d30d5ec..d4c08cab8 100644 --- a/rs/moq-lite/src/error.rs +++ b/rs/moq-lite/src/error.rs @@ -47,9 +47,16 @@ pub enum Error { #[error("app code={0}")] App(u16), + #[deprecated(note = "Use UnknownBroadcast or UnknownTrack instead")] #[error("not found")] NotFound, + #[error("unknown broadcast")] + UnknownBroadcast, + + #[error("unknown track")] + UnknownTrack, + #[error("wrong frame size")] WrongSize, @@ -84,6 +91,7 @@ pub enum Error { Closed, } +#[allow(deprecated)] impl Error { /// An integer code that is sent over the wire. pub fn to_code(&self) -> u32 { @@ -100,6 +108,8 @@ impl Error { Self::BoundsExceeded => 11, Self::Duplicate => 12, Self::NotFound => 13, + Self::UnknownBroadcast => 26, + Self::UnknownTrack => 27, Self::WrongSize => 14, Self::ProtocolViolation => 15, Self::UnexpectedMessage => 16, @@ -138,6 +148,8 @@ impl Error { 20 => Self::InvalidRole, 24 => Self::Dropped, 25 => Self::Closed, + 26 => Self::UnknownBroadcast, + 27 => Self::UnknownTrack, code if code >= 64 => match u16::try_from(code - 64) { Ok(app) => Self::App(app), Err(_) => Self::ProtocolViolation, diff --git a/rs/moq-lite/src/ietf/adapter.rs b/rs/moq-lite/src/ietf/adapter.rs index 6fda7848c..42080d93d 100644 --- a/rs/moq-lite/src/ietf/adapter.rs +++ b/rs/moq-lite/src/ietf/adapter.rs @@ -634,7 +634,7 @@ impl ControlStreamAdapter { .unwrap() .get(&ns) .copied() - .ok_or(Error::NotFound) + .ok_or(Error::UnknownBroadcast) } } diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 4997d4348..d559df3a8 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -5,7 +5,7 @@ use web_async::FuturesExt; use web_transport_trait::SendStream; use crate::{ - AsPath, Error, Origin, OriginConsumer, Track, TrackConsumer, + AsPath, Error, Origin, OriginConsumer, Subscription, Track, TrackSubscriber, coding::{Stream, Writer}, ietf::{self, Control, FetchHeader, FetchType, FilterType, GroupOrder, Location, RequestId}, model::GroupConsumer, @@ -111,12 +111,9 @@ impl Publisher { return Ok(()); }; - let track = Track { - name: msg.track_name.to_string(), - priority: msg.subscriber_priority, - }; + let track = Track::new(msg.track_name.to_string()); - let track = match broadcast.subscribe_track(&track) { + let track = match broadcast.consume_track(&track) { Ok(track) => track, Err(err) => { self.write_subscribe_error(&mut stream.writer, request_id, 404, &err.to_string()) @@ -125,6 +122,15 @@ impl Publisher { } }; + let subscriber = match track.subscribe(Subscription::default()).await { + Ok(sub) => sub, + Err(err) => { + self.write_subscribe_error(&mut stream.writer, request_id, 404, &err.to_string()) + .await?; + return Ok(()); + } + }; + // Send SubscribeOk on the stream stream.writer.encode(&ietf::SubscribeOk::ID).await?; stream @@ -140,7 +146,7 @@ impl Publisher { // Run the track, cancelling on reader close (Unsubscribe or stream close) let res = tokio::select! { - res = self.run_track(track, request_id) => res, + res = self.run_track(subscriber, request_id) => res, _ = stream.reader.closed() => Ok(()), _ = self.session.closed() => Ok(()), }; @@ -215,7 +221,7 @@ impl Publisher { } /// Serve a track using FuturesUnordered for unlimited concurrent groups. - async fn run_track(&self, mut track: TrackConsumer, request_id: RequestId) -> Result<(), Error> { + async fn run_track(&self, mut track: TrackSubscriber, request_id: RequestId) -> Result<(), Error> { let mut tasks = FuturesUnordered::new(); loop { @@ -240,8 +246,7 @@ impl Publisher { flags: Default::default(), }; - tasks - .push(Self::run_group(self.session.clone(), msg, track.info.priority, group, self.version).map(|_| ())); + tasks.push(Self::run_group(self.session.clone(), msg, 0, group, self.version).map(|_| ())); } } diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index 477dad01c..f1da84a1a 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -454,7 +454,7 @@ impl Subscriber { entry.remove(); } } - Entry::Vacant(_) => return Err(Error::NotFound), + Entry::Vacant(_) => return Err(Error::UnknownBroadcast), }; Ok(()) @@ -463,11 +463,7 @@ impl Subscriber { fn start_publish(&mut self, msg: &ietf::Publish<'_>) -> Result<(), Error> { let request_id = msg.request_id; - let track = Track { - name: msg.track_name.to_string(), - priority: 0, - } - .produce(); + let track = Track::new(msg.track_name.to_string()).produce(); let mut state = self.state.lock(); match state.subscribes.entry(request_id) { @@ -501,8 +497,8 @@ impl Subscriber { async fn run_broadcast(&self, path: Path<'_>, mut broadcast: BroadcastDynamic) -> Result<(), Error> { loop { let track = tokio::select! { - producer = broadcast.requested_track() => match producer { - Ok(producer) => producer, + track = broadcast.requested_track() => match track { + Ok(track) => track, Err(err) => { tracing::debug!(%err, "broadcast closed"); break; @@ -627,7 +623,7 @@ impl Subscriber { request_id, track_namespace: broadcast.to_owned(), track_name: (&track.info.name).into(), - subscriber_priority: track.info.priority, + subscriber_priority: 0, group_order: GroupOrder::Descending, filter_type: FilterType::LargestObject, }) @@ -678,7 +674,7 @@ impl Subscriber { RequestId(group.track_alias) } }; - let track = state.subscribes.get_mut(&request_id).ok_or(Error::NotFound)?; + let track = state.subscribes.get_mut(&request_id).ok_or(Error::UnknownTrack)?; let group = Group { sequence: group.group_id, diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index aaef28ed4..f7e9e768d 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -5,7 +5,7 @@ use web_async::FuturesExt; use web_transport_trait::Stats; use crate::{ - AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, Track, TrackConsumer, + AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, Subscription, Track, TrackSubscriber, coding::{Stream, Writer}, lite::{ self, @@ -255,20 +255,22 @@ impl Publisher { priority: PriorityQueue, version: Version, ) -> Result<(), Error> { - let track = Track { - name: subscribe.track.to_string(), - priority: subscribe.priority, - }; - - let broadcast = consumer.ok_or(Error::NotFound)?; - let track = broadcast.subscribe_track(&track)?; - - // TODO wait until track.info() to get the *real* priority + let track = Track::new(subscribe.track.to_string()); + + let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; + let track = broadcast.consume_track(&track)?; + let subscriber = track + .subscribe(Subscription { + start: subscribe.start_group, + end: subscribe.end_group, + ..Default::default() + }) + .await?; let info = lite::SubscribeOk { - priority: track.info.priority, + priority: 0, ordered: false, - max_latency: std::time::Duration::ZERO, + max_latency: Duration::ZERO, start_group: None, end_group: None, }; @@ -276,7 +278,7 @@ impl Publisher { stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; tokio::select! { - res = Self::run_track(session, track, subscribe, priority, version) => res?, + res = Self::run_track(session, subscriber, subscribe, priority, version) => res?, res = stream.reader.closed() => res?, } @@ -286,18 +288,13 @@ impl Publisher { async fn run_track( session: S, - mut track: TrackConsumer, + mut track: TrackSubscriber, subscribe: &lite::Subscribe<'_>, priority: PriorityQueue, version: Version, ) -> Result<(), Error> { let mut tasks = FuturesUnordered::new(); - // Start the consumer at the specified sequence, otherwise start at the latest group. - if let Some(start_group) = subscribe.start_group.or_else(|| track.latest()) { - track.start_at(start_group); - } - loop { let group = tokio::select! { // Poll all active group futures; never matches but keeps them running. @@ -317,7 +314,7 @@ impl Publisher { sequence, }; - let priority = priority.insert(track.info.priority, sequence); + let priority = priority.insert(subscribe.priority, sequence); tasks.push(Self::serve_group(session.clone(), msg, priority, group, version).map(|_| ())); } } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 92bb52428..250dedbeb 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -110,7 +110,7 @@ impl Subscriber { tracing::debug!(broadcast = %self.log_path(&path), "unannounced"); // Abort the producer. - let mut producer = producers.remove(&path.into_owned()).ok_or(Error::NotFound)?; + let mut producer = producers.remove(&path.into_owned()).ok_or(Error::UnknownBroadcast)?; producer.abort(Error::Cancel).ok(); } } @@ -160,8 +160,8 @@ impl Subscriber { // Keep serving requests until there are no more consumers. // This way we'll clean up the task when the broadcast is no longer needed. let track = tokio::select! { - producer = broadcast.requested_track() => match producer { - Ok(producer) => producer, + track = broadcast.requested_track() => match track { + Ok(track) => track, Err(err) => { tracing::debug!(%err, "broadcast closed"); break; @@ -181,21 +181,29 @@ impl Subscriber { } } - async fn run_subscribe(&mut self, id: u64, broadcast: Path<'_>, mut track: TrackProducer) { + async fn run_subscribe(&mut self, id: u64, broadcast_path: Path<'_>, mut track: TrackProducer) { + let track_name = track.info.name.clone(); + self.subscribes.lock().insert(id, track.clone()); + // Read the initial subscription parameters from the consumer. + let sub = match track.subscription().await { + Some(sub) => sub, + None => return, // Consumer already dropped. + }; + let msg = lite::Subscribe { id, - broadcast: broadcast.to_owned(), + broadcast: broadcast_path.to_owned(), track: (&track.info.name).into(), - priority: track.info.priority, - ordered: true, - max_latency: std::time::Duration::ZERO, - start_group: None, - end_group: None, + priority: sub.priority, + ordered: sub.ordered, + max_latency: sub.max_latency, + start_group: sub.start, + end_group: sub.end, }; - tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe started"); + tracing::info!(id, broadcast = %self.log_path(&broadcast_path), track = %track_name, "subscribe started"); let res = tokio::select! { _ = track.unused() => Err(Error::Cancel), @@ -204,15 +212,15 @@ impl Subscriber { match res { Err(Error::Cancel) => { - tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe cancelled"); + tracing::info!(id, broadcast = %self.log_path(&broadcast_path), track = %track_name, "subscribe cancelled"); let _ = track.abort(Error::Cancel); } Err(err) => { - tracing::warn!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, %err, "subscribe error"); + tracing::warn!(id, broadcast = %self.log_path(&broadcast_path), track = %track_name, %err, "subscribe error"); let _ = track.abort(err); } _ => { - tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe complete"); + tracing::info!(id, broadcast = %self.log_path(&broadcast_path), track = %track_name, "subscribe complete"); let _ = track.finish(); } } diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index f1d93c73c..c38b40d70 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -5,7 +5,7 @@ use std::{ use std::ops::Deref; -use crate::{Error, TrackConsumer, TrackProducer, model::track::TrackWeak}; +use crate::{Error, Subscription, TrackConsumer, TrackProducer, TrackSubscriber, model::track::TrackWeak}; use super::Track; @@ -39,17 +39,35 @@ struct State { // Weak references for deduplication. Doesn't prevent track auto-close. tracks: HashMap, - // Dynamic tracks that have been requested. - requests: Vec, + // Track producers queued by consume_track for the dynamic handler. + requested: Vec, // The current number of dynamic producers. - // If this is 0, requests must be empty. + // If this is 0, requests will fail with NotFound. dynamic: usize, // The error that caused the broadcast to be aborted, if any. abort: Option, } +impl State { + /// Insert a track into the lookup, returning an error if a live track with the same name exists. + fn insert_track(&mut self, track: &TrackProducer) -> Result<(), Error> { + match self.tracks.entry(track.info.name.clone()) { + hash_map::Entry::Occupied(mut entry) => { + if !entry.get().is_closed() { + return Err(Error::Duplicate); + } + entry.insert(track.weak()); + } + hash_map::Entry::Vacant(entry) => { + entry.insert(track.weak()); + } + } + Ok(()) + } +} + fn modify(state: &conducer::Producer) -> Result, Error> { match state.write() { Ok(state) => Ok(state), @@ -85,21 +103,13 @@ impl BroadcastProducer { /// /// NOTE: You probably want to [TrackProducer::clone] first to keep publishing to the track. pub fn insert_track(&mut self, track: &TrackProducer) -> Result<(), Error> { - let mut state = modify(&self.state)?; - - let hash_map::Entry::Vacant(entry) = state.tracks.entry(track.info.name.clone()) else { - return Err(Error::Duplicate); - }; - - entry.insert(track.weak()); - - Ok(()) + insert_track_impl(&self.state, track) } /// Remove a track from the lookup. pub fn remove_track(&mut self, name: &str) -> Result<(), Error> { let mut state = modify(&self.state)?; - state.tracks.remove(name).ok_or(Error::NotFound)?; + state.tracks.remove(name).ok_or(Error::UnknownTrack)?; Ok(()) } @@ -132,9 +142,9 @@ impl BroadcastProducer { weak.abort(err.clone()); } - // Abort any pending dynamic track requests. - for mut request in guard.requests.drain(..) { - request.abort(err.clone()).ok(); + // Abort any pending requested track producers. + for mut track in guard.requested.drain(..) { + let _ = track.abort(err.clone()); } guard.abort = Some(err); @@ -167,11 +177,18 @@ impl BroadcastProducer { } } +/// Insert a track into the broadcast lookup. +fn insert_track_impl(state: &conducer::Producer, track: &TrackProducer) -> Result<(), Error> { + let mut guard = modify(state)?; + guard.insert_track(track)?; + Ok(()) +} + /// Handles on-demand track creation for a broadcast. /// -/// When a consumer requests a track that doesn't exist, a [TrackProducer] is created -/// and queued for the dynamic producer to fulfill via [Self::requested_track]. -/// Dropped when no longer needed; pending requests are automatically aborted. +/// When a consumer requests a track that doesn't exist, a [`TrackProducer`] is +/// created and queued. The dynamic handler receives it via [`Self::requested_track`] +/// and starts filling it with data. #[derive(Clone)] pub struct BroadcastDynamic { info: Broadcast, @@ -181,7 +198,6 @@ pub struct BroadcastDynamic { impl BroadcastDynamic { fn new(info: Broadcast, state: conducer::Producer) -> Self { if let Ok(mut state) = state.write() { - // If the broadcast is already closed, we can't handle any new requests. state.dynamic += 1; } @@ -200,17 +216,25 @@ impl BroadcastDynamic { } pub fn poll_requested_track(&mut self, waiter: &conducer::Waiter) -> Poll> { - self.poll(waiter, |state| match state.requests.pop() { + self.poll(waiter, |state| match state.requested.pop() { Some(producer) => Poll::Ready(producer), None => Poll::Pending, }) } - /// Block until a consumer requests a track, returning its producer. + /// Block until a consumer requests a track, returning the producer. + /// + /// The handler should start filling the [`TrackProducer`] with data + /// (e.g., by subscribing upstream). pub async fn requested_track(&mut self) -> Result { conducer::wait(|waiter| self.poll_requested_track(waiter)).await } + /// Insert a track into the broadcast lookup. + pub fn insert_track(&self, track: &TrackProducer) -> Result<(), Error> { + insert_track_impl(&self.state, track) + } + /// Create a consumer that can subscribe to tracks in this broadcast. pub fn consume(&self) -> BroadcastConsumer { BroadcastConsumer { @@ -228,9 +252,9 @@ impl BroadcastDynamic { weak.abort(err.clone()); } - // Abort any pending dynamic track requests. - for mut request in guard.requests.drain(..) { - request.abort(err.clone()).ok(); + // Abort any pending requested track producers. + for mut track in guard.requested.drain(..) { + let _ = track.abort(err.clone()); } guard.abort = Some(err); @@ -253,9 +277,9 @@ impl Drop for BroadcastDynamic { return; } - // Abort all pending requests since there's no dynamic producer to handle them. - for mut request in state.requests.drain(..) { - request.abort(Error::Cancel).ok(); + // Abort all pending requested tracks since there's no dynamic producer to handle them. + for mut track in state.requested.drain(..) { + let _ = track.abort(Error::Cancel); } } } @@ -294,8 +318,8 @@ impl Deref for BroadcastConsumer { } impl BroadcastConsumer { - pub fn subscribe_track(&self, track: &Track) -> Result { - // Upgrade to a temporary producer so we can modify the state. + /// Returns the track if it exists, otherwise tries to route it to [`BroadcastDynamic`]. + pub fn consume_track(&self, track: &Track) -> Result { let producer = self .state .produce() @@ -310,42 +334,27 @@ impl BroadcastConsumer { state.tracks.remove(&track.name); } - // Otherwise we have never seen this track before and need to create a new producer. - let producer = track.clone().produce(); - let consumer = producer.consume(); - if state.dynamic == 0 { - return Err(Error::NotFound); + return Err(Error::UnknownTrack); } - // Insert a weak reference for deduplication. - let weak = producer.weak(); - state.tracks.insert(producer.info.name.clone(), weak.clone()); - state.requests.push(producer); - - // Remove the track from the lookup when it's unused. - let consumer_state = self.state.clone(); - web_async::spawn(async move { - let _ = weak.unused().await; - - let Some(producer) = consumer_state.produce() else { - return; - }; - let Ok(mut state) = producer.write() else { - return; - }; - - // Remove the entry, but reinsert if it was replaced by a different reference. - if let Some(current) = state.tracks.remove(&weak.info.name) - && !current.is_clone(&weak) - { - state.tracks.insert(current.info.name.clone(), current); - } - }); + // Create a new TrackProducer, insert into lookup, and queue for dynamic handler. + let track_producer = TrackProducer::new(track.clone()); + state.insert_track(&track_producer)?; + let consumer = track_producer.consume(); + state.requested.push(track_producer); Ok(consumer) } + /// Subscribe to a track, blocking until the first group exists (or finish/abort). + /// + /// Convenience: calls [`Self::consume_track`] then [`TrackConsumer::subscribe`]. + pub async fn subscribe_track(&self, track: &Track, sub: Subscription) -> Result { + let consumer = self.consume_track(track)?; + consumer.subscribe(sub).await + } + pub async fn closed(&self) -> Error { self.state.closed().await; self.state.read().abort.clone().unwrap_or(Error::Dropped) @@ -359,8 +368,8 @@ impl BroadcastConsumer { #[cfg(test)] impl BroadcastConsumer { - pub fn assert_subscribe_track(&self, track: &Track) -> TrackConsumer { - self.subscribe_track(track).expect("should not have errored") + pub fn assert_consume_track(&self, track: &Track) -> TrackConsumer { + self.consume_track(track).expect("should not have errored") } pub fn assert_not_closed(&self) { @@ -387,168 +396,163 @@ mod test { let consumer = producer.consume(); - let mut track1_sub = consumer.assert_subscribe_track(&Track::new("track1")); - track1_sub.assert_group(); + let track1_consumer = consumer.assert_consume_track(&Track::new("track1")); + assert_eq!(track1_consumer.latest(), Some(0)); let mut track2 = Track::new("track2").produce(); producer.assert_insert_track(&track2); let consumer2 = producer.consume(); - let mut track2_consumer = consumer2.assert_subscribe_track(&Track::new("track2")); - track2_consumer.assert_no_group(); + let track2_consumer = consumer2.assert_consume_track(&Track::new("track2")); + assert_eq!(track2_consumer.latest(), None); track2.append_group().unwrap(); - - track2_consumer.assert_group(); + assert_eq!(track2_consumer.latest(), Some(0)); } #[tokio::test] async fn closed() { let mut producer = Broadcast::new().produce(); - let _dynamic = producer.dynamic(); + let dynamic = producer.dynamic(); let consumer = producer.consume(); consumer.assert_not_closed(); // Create a new track and insert it into the broadcast. let track1 = producer.assert_create_track(&Track::new("track1")); - let track1c = consumer.assert_subscribe_track(&track1.info); - let track2 = consumer.assert_subscribe_track(&Track::new("track2")); + let track1c = consumer.assert_consume_track(&track1.info); // Explicitly aborting the broadcast should cascade to child tracks. + drop(dynamic); producer.abort(Error::Cancel).unwrap(); - // The requested TrackProducer should have been aborted. - track2.assert_error(); - - // track1 should also be closed because close() cascades. + // track1 should be closed because close() cascades. track1c.assert_error(); - - // track1's producer should also be closed. assert!(track1.is_closed()); } #[tokio::test] async fn requests() { - let mut producer = Broadcast::new().produce().dynamic(); + let broadcast = Broadcast::new().produce(); + let mut dynamic = broadcast.dynamic(); - let consumer = producer.consume(); + let consumer = broadcast.consume(); let consumer2 = consumer.clone(); - let mut track1 = consumer.assert_subscribe_track(&Track::new("track1")); - track1.assert_not_closed(); - track1.assert_no_group(); - - // Make sure we deduplicate requests while track1 is still active. - let mut track2 = consumer2.assert_subscribe_track(&Track::new("track1")); - track2.assert_is_clone(&track1); - - // Get the requested track, and there should only be one. - let mut track3 = producer.assert_request(); - producer.assert_no_request(); + // consume_track with dynamic handler should create a producer and queue it. + let track1_consumer = consumer.assert_consume_track(&Track::new("track1")); + assert_eq!(track1_consumer.latest(), None); - // Make sure the consumer is the same. - track3.consume().assert_is_clone(&track1); + // Get the request -- there should be exactly one. + let mut track1_producer = dynamic.assert_request(); + dynamic.assert_no_request(); + assert_eq!(track1_producer.info.name, "track1"); - // Append a group and make sure they all get it. - track3.append_group().unwrap(); - track1.assert_group(); - track2.assert_group(); + // Dedup: consuming the same track again should return the existing one. + let track1_dup = consumer2.assert_consume_track(&Track::new("track1")); + track1_dup.assert_is_clone(&track1_consumer); - // Make sure that tracks are cancelled when the producer is dropped. - let track4 = consumer.assert_subscribe_track(&Track::new("track2")); - drop(producer); + // No new request should be queued. + dynamic.assert_no_request(); - // Make sure the track is errored, not closed. - track4.assert_error(); - - let track5 = consumer2.subscribe_track(&Track::new("track3")); - assert!(track5.is_err(), "should have errored"); + // Append a group and make sure both see it. + track1_producer.append_group().unwrap(); + assert_eq!(track1_consumer.latest(), Some(0)); + assert_eq!(track1_dup.latest(), Some(0)); } #[tokio::test] async fn stale_producer() { - let mut broadcast = Broadcast::new().produce().dynamic(); + tokio::time::pause(); + + let broadcast = Broadcast::new().produce(); + let mut dynamic = broadcast.dynamic(); let consumer = broadcast.consume(); - // Subscribe to a track, creating a request - let track1 = consumer.assert_subscribe_track(&Track::new("track1")); + // Subscribe to a track (creates producer via dynamic). + let track1_consumer = consumer.assert_consume_track(&Track::new("track1")); + + // Handle the request. + let mut producer1 = dynamic.assert_request(); - // Get the requested producer and close it (simulating publisher disconnect) - let mut producer1 = broadcast.assert_request(); + // Close the producer (simulating publisher disconnect). producer1.append_group().unwrap(); producer1.finish().unwrap(); drop(producer1); - // The consumer should see the track as closed - track1.assert_closed(); + // The consumer should see the track as closed. + track1_consumer.assert_closed(); + drop(track1_consumer); - // Subscribe again to the same track - should get a NEW producer, not the stale one - let mut track2 = consumer.assert_subscribe_track(&Track::new("track1")); - track2.assert_not_closed(); - track2.assert_not_clone(&track1); + // Subscribe again to the same track -- should get a new request + // because the old producer was dropped (weak ref is closed). + let track2_consumer = consumer.assert_consume_track(&Track::new("track1")); - // There should be a new request for the track - let mut producer2 = broadcast.assert_request(); - producer2.append_group().unwrap(); + let mut producer2 = dynamic.assert_request(); - // The new consumer should receive the new group - track2.assert_group(); + producer2.append_group().unwrap(); + assert_eq!(track2_consumer.latest(), Some(0)); } #[tokio::test] async fn requested_unused() { - tokio::time::pause(); - let mut broadcast = Broadcast::new().produce().dynamic(); + let broadcast = Broadcast::new().produce(); + let mut dynamic = broadcast.dynamic(); - // Subscribe to a track that doesn't exist - this creates a request - let consumer1 = broadcast.consume().assert_subscribe_track(&Track::new("unknown_track")); + // Subscribe to a track. + let c1 = broadcast.consume(); + let consumer1 = c1.assert_consume_track(&Track::new("unknown_track")); - // Get the requested track producer - let producer1 = broadcast.assert_request(); + // Handle the request. + let producer1 = dynamic.assert_request(); - // The track producer should NOT be unused yet because there's a consumer + // The track producer should NOT be unused yet because there's a consumer. assert!( producer1.unused().now_or_never().is_none(), "track producer should be used" ); - // Making a new consumer will keep the producer alive - let consumer2 = broadcast.consume().assert_subscribe_track(&Track::new("unknown_track")); + // Making a new consumer will keep the producer alive. + let consumer2 = c1.assert_consume_track(&Track::new("unknown_track")); consumer2.assert_is_clone(&consumer1); - // Drop the consumer subscription drop(consumer1); - - // The track producer should NOT be unused yet because there's a consumer assert!( producer1.unused().now_or_never().is_none(), "track producer should be used" ); - // Drop the second consumer, now the producer should be unused + // Drop the second consumer, now the producer should be unused. drop(consumer2); - - // BUG: The track producer should become unused after dropping the consumer, - // but it won't because the broadcast keeps a reference in the lookup HashMap - // This assertion will fail, demonstrating the bug assert!( producer1.unused().now_or_never().is_some(), "track producer should be unused after consumer is dropped" ); - // Advance paused time to let the async cleanup task run. - tokio::time::advance(std::time::Duration::from_millis(1)).await; + // Drop the producer so the weak ref is closed. + drop(producer1); - // Now the cleanup task should have run and we can subscribe again to the unknown track. - let consumer3 = broadcast.consume().subscribe_track(&Track::new("unknown_track")); - let producer2 = broadcast.assert_request(); + // Now consume_track finds the stale entry, removes it, and creates a new request. + let c2 = broadcast.consume(); + let _consumer3 = c2.assert_consume_track(&Track::new("unknown_track")); - // Drop the consumer, now the producer should be unused - drop(consumer3); - assert!( - producer2.unused().now_or_never().is_some(), - "track producer should be unused after consumer is dropped" - ); + let _producer2 = dynamic.assert_request(); + dynamic.assert_no_request(); + } + + #[tokio::test] + async fn pending_requests_rejected_on_drop() { + let broadcast = Broadcast::new().produce(); + let dynamic = broadcast.dynamic(); + let consumer = broadcast.consume(); + + // consume_track creates a producer and queues it. + let track_consumer = consumer.assert_consume_track(&Track::new("track2")); + + // Drop dynamic -- pending producers should be aborted. + drop(dynamic); + + // Track consumer should see an error. + track_consumer.assert_error(); } } diff --git a/rs/moq-lite/src/model/group.rs b/rs/moq-lite/src/model/group.rs index 504189032..07bc85382 100644 --- a/rs/moq-lite/src/model/group.rs +++ b/rs/moq-lite/src/model/group.rs @@ -201,6 +201,11 @@ impl GroupProducer { .await .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) } + + /// Returns true if this group was aborted (closed with an error). + pub fn is_aborted(&self) -> bool { + self.state.read().abort.is_some() + } } impl Clone for GroupProducer { diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index 5fd931d31..c1ac86130 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -26,20 +26,16 @@ use std::{ // TODO: Replace with a configurable cache size. const MAX_GROUP_AGE: Duration = Duration::from_secs(30); -/// A track is a collection of groups, delivered out-of-order until expired. -#[derive(Clone, Debug, PartialEq, Eq)] +/// A track is a collection of groups, identified by name. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Track { pub name: String, - pub priority: u8, } impl Track { pub fn new>(name: T) -> Self { - Self { - name: name.into(), - priority: 0, - } + Self { name: name.into() } } pub fn produce(self) -> TrackProducer { @@ -47,6 +43,23 @@ impl Track { } } +/// Subscription preferences for a subscription or producer cap. +/// +/// Describes how groups should be delivered: priority, ordering, latency bounds, +/// and the range of groups requested. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct Subscription { + pub priority: u8, + pub ordered: bool, + /// Maximum cache/latency. `Duration::ZERO` means unlimited. + pub max_latency: Duration, + /// First group sequence to deliver. `None` means no preference. + /// Use [`TrackSubscriber::update`] to set a concrete value once `latest()` is known. + pub start: Option, + /// Last group sequence to deliver. `None` means no preference (live). + pub end: Option, +} + #[derive(Default)] struct State { /// Groups in arrival order. `None` entries are tombstones for evicted groups. @@ -56,6 +69,9 @@ struct State { max_sequence: Option, final_sequence: Option, abort: Option, + + /// Per-subscriber subscription values, read by the producer for aggregation. + subscriptions: Vec>, } impl State { @@ -143,6 +159,77 @@ impl State { } } + /// Compute the aggregate subscription from all active entries. + /// + /// Returns `None` if there are no subscriptions. + fn subscription(&self) -> Option { + if self.subscriptions.is_empty() { + return None; + } + + // Read each subscriber's current subscription value into owned copies. + let subs: Vec = self + .subscriptions + .iter() + .filter_map(|c| { + let r = c.read(); + if r.is_closed() { None } else { Some(r.clone()) } + }) + .collect(); + + if subs.is_empty() { + return None; + } + + let priority = subs.iter().map(|s| s.priority).max().unwrap(); + + // ordered is true only if ALL subscribers want ordered. + let ordered = subs.iter().all(|s| s.ordered); + + // max_latency: max across subscriptions. ZERO = unlimited wins. + let max_latency = subs + .iter() + .map(|s| s.max_latency) + .reduce(|a, b| { + if a.is_zero() || b.is_zero() { + Duration::ZERO + } else { + a.max(b) + } + }) + .unwrap(); + + // start: min across all concrete values. None = no preference (defer to others). + let start = subs + .iter() + .map(|s| s.start) + .reduce(|a, b| match (a, b) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) | (None, Some(a)) => Some(a), + (None, None) => None, + }) + .unwrap(); + + // end: max across all concrete values. None = no end (defer to others). + let end = subs + .iter() + .map(|s| s.end) + .reduce(|a, b| match (a, b) { + (Some(a), Some(b)) => Some(a.max(b)), + (Some(a), None) | (None, Some(a)) => Some(a), + (None, None) => None, + }) + .unwrap(); + + Some(Subscription { + priority, + ordered, + max_latency, + start, + end, + }) + } + fn poll_finished(&self) -> Poll> { if let Some(fin) = self.final_sequence { Poll::Ready(Ok(fin)) @@ -152,12 +239,26 @@ impl State { Poll::Pending } } + + /// Check if the track is "ready" for a subscriber: has at least one group, + /// is finished (with 0+ groups), or is aborted. + fn poll_ready(&self) -> Poll> { + if self.max_sequence.is_some() || self.final_sequence.is_some() { + Poll::Ready(Ok(())) + } else if let Some(err) = &self.abort { + Poll::Ready(Err(err.clone())) + } else { + Poll::Pending + } + } } /// A producer for a track, used to create new groups. pub struct TrackProducer { pub info: Track, state: conducer::Producer, + /// The last aggregate subscription returned by [`Self::subscription`]. + prev_subscription: Option, } impl TrackProducer { @@ -165,10 +266,15 @@ impl TrackProducer { Self { info, state: conducer::Producer::default(), + prev_subscription: None, } } /// Create a new group with the given sequence number. + /// + /// If a group with the same sequence already exists but was aborted (e.g. due to a + /// cancelled subscription), it will be replaced with a tombstone and the new group + /// pushed to the end. Successfully completed groups return `Err(Error::Duplicate)`. pub fn create_group(&mut self, info: Group) -> Result { let group = info.produce(); @@ -180,7 +286,25 @@ impl TrackProducer { } if !state.duplicates.insert(group.info.sequence) { - return Err(Error::Duplicate); + // Sequence exists -- check if the existing group was aborted. + for slot in state.groups.iter_mut() { + if let Some((existing, _)) = slot { + if existing.info.sequence == group.info.sequence { + if !existing.is_aborted() { + return Err(Error::Duplicate); + } + // Tombstone the old entry and push the new group at the end. + *slot = None; + break; + } + } + } + + let now = tokio::time::Instant::now(); + state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence)); + state.groups.push_back(Some((group.clone(), now))); + state.evict_expired(now); + return Ok(group); } let now = tokio::time::Instant::now(); @@ -285,8 +409,6 @@ impl TrackProducer { TrackConsumer { info: self.info.clone(), state: self.state.consume(), - index: 0, - min_sequence: 0, } } @@ -316,6 +438,49 @@ impl TrackProducer { } } + /// Poll for changes to the aggregate subscription. + /// + /// Returns `Ready(sub)` when the aggregate differs from the last value returned. + /// Returns `Ready(None)` when no subscriptions are active (or the track is closed). + pub fn poll_subscription(&mut self, waiter: &conducer::Waiter) -> Poll> { + let prev = self.prev_subscription.as_ref(); + match self.state.poll(waiter, |state| { + // Remove closed subscription consumers. + state.subscriptions.retain(|c| !c.read().is_closed()); + + // Also poll each subscription consumer so the waiter is registered + // on inner changes too. + for sub in &state.subscriptions { + // Register the waiter on each subscription's conducer channel. + let _ = sub.poll(waiter, |_| Poll::<()>::Pending); + } + + let current = state.subscription(); + if current.as_ref() != prev { + Poll::Ready(current) + } else { + Poll::Pending + } + }) { + Poll::Ready(Ok(sub)) => { + self.prev_subscription = sub.clone(); + Poll::Ready(sub) + } + Poll::Ready(Err(_)) => { + self.prev_subscription = None; + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } + + /// Block until the aggregate subscription changes. + /// + /// Returns `None` when all subscriptions are dropped or the track is closed. + pub async fn subscription(&mut self) -> Option { + conducer::wait(|waiter| self.poll_subscription(waiter)).await + } + fn modify(&self) -> Result> { self.state .write() @@ -328,13 +493,14 @@ impl Clone for TrackProducer { Self { info: self.info.clone(), state: self.state.clone(), + prev_subscription: self.prev_subscription.clone(), } } } -impl From for TrackProducer { - fn from(info: Track) -> Self { - TrackProducer::new(info) +impl> From for TrackProducer { + fn from(info: T) -> Self { + TrackProducer::new(info.into()) } } @@ -366,20 +532,95 @@ impl TrackWeak { TrackConsumer { info: self.info.clone(), state: self.state.consume(), - index: 0, - min_sequence: 0, } } +} - pub async fn unused(&self) -> crate::Result<()> { - self.state - .unused() - .await - .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) +/// Iterates groups from a track while managing this subscriber's subscription lifecycle. +/// +/// Created via [`TrackConsumer::subscribe`]. Registers a subscription in the +/// shared state on creation; automatically removes it on drop. +/// Blocks in `subscribe()` until the first group exists (or finish/abort). +pub struct TrackSubscriber { + pub info: Track, + state: conducer::Consumer, + sub: conducer::Producer, + index: usize, +} + +impl TrackSubscriber { + /// Receive the next group respecting the subscription start/end range. + pub async fn recv_group(&mut self) -> Result> { + conducer::wait(|waiter| self.poll_recv_group(waiter)).await } - pub fn is_clone(&self, other: &Self) -> bool { - self.state.same_channel(&other.state) + pub fn poll_recv_group(&mut self, waiter: &conducer::Waiter) -> Poll>> { + let sub = self.sub.read(); + let min_sequence = sub.start.unwrap_or(0); + let end = sub.end; + drop(sub); + + let Some((consumer, found_index)) = + ready!(self.poll(waiter, |state| { state.poll_next_group(self.index, min_sequence) })?) + else { + return Poll::Ready(Ok(None)); + }; + + // Check end bound. + if let Some(end) = end { + if consumer.info.sequence > end { + return Poll::Ready(Ok(None)); + } + } + + self.index = found_index + 1; + Poll::Ready(Ok(Some(consumer))) + } + + /// Update this subscription's preferences. + pub fn update(&mut self, sub: Subscription) { + if let Ok(mut guard) = self.sub.write() { + *guard = sub; + } + } + + /// Return the latest sequence number in the track. + pub fn latest(&self) -> Option { + self.state.read().max_sequence + } + + /// The current subscription preferences. + pub fn subscription(&self) -> Subscription { + self.sub.read().clone() + } + + /// Poll for track closure, without blocking. + pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll> { + self.poll(waiter, |state| state.poll_closed()) + } + + /// Block until the track is closed. + pub async fn closed(&self) -> Result<()> { + conducer::wait(|waiter| self.poll_closed(waiter)).await + } + + // A helper to automatically apply Dropped if the state is closed without an error. + fn poll(&self, waiter: &conducer::Waiter, f: F) -> Poll> + where + F: Fn(&conducer::Ref<'_, State>) -> Poll>, + { + Poll::Ready(match ready!(self.state.poll(waiter, f)) { + Ok(res) => res, + Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)), + }) + } +} + +impl Drop for TrackSubscriber { + fn drop(&mut self) { + // Close the subscription producer so the TrackProducer knows to remove it. + // The producer's poll_subscription will clean up closed consumers. + let _ = self.sub.close(); } } @@ -388,9 +629,6 @@ impl TrackWeak { pub struct TrackConsumer { pub info: Track, state: conducer::Consumer, - index: usize, - - min_sequence: u64, } impl TrackConsumer { @@ -406,46 +644,16 @@ impl TrackConsumer { }) } - /// Poll for the next group received over the network, without blocking. - /// - /// Groups may arrive out of order or with gaps due to network conditions. - /// Use `OrderedConsumer` if you need groups in sequence order, - /// skipping those that arrive too late. - /// - /// Returns `Poll::Ready(Ok(Some(group)))` when a group is available, - /// `Poll::Ready(Ok(None))` when the track is finished, - /// `Poll::Ready(Err(e))` when the track has been aborted, or - /// `Poll::Pending` when no group is available yet. - pub fn poll_recv_group(&mut self, waiter: &conducer::Waiter) -> Poll>> { - let Some((consumer, found_index)) = - ready!(self.poll(waiter, |state| state.poll_next_group(self.index, self.min_sequence))?) - else { - return Poll::Ready(Ok(None)); - }; - - self.index = found_index + 1; - Poll::Ready(Ok(Some(consumer))) - } - - /// Receive the next group available on this track. - /// - /// Groups may arrive out of order or with gaps due to network conditions. - /// Use `OrderedConsumer` if you need groups in sequence order, - /// skipping those that arrive too late. - pub async fn recv_group(&mut self) -> Result> { - conducer::wait(|waiter| self.poll_recv_group(waiter)).await - } - - /// Deprecated: Use [`recv_group`](Self::recv_group) instead. - #[deprecated(note = "Use recv_group instead")] + /// Removed: Use [`TrackSubscriber::recv_group`] instead. + #[deprecated(note = "Use TrackSubscriber::recv_group instead")] pub async fn next_group(&mut self) -> Result> { - self.recv_group().await + unimplemented!("Use TrackSubscriber::recv_group instead") } - /// Deprecated: Use [`poll_recv_group`](Self::poll_recv_group) instead. - #[deprecated(note = "Use poll_recv_group instead")] - pub fn poll_next_group(&mut self, waiter: &conducer::Waiter) -> Poll>> { - self.poll_recv_group(waiter) + /// Removed: Use [`TrackSubscriber::poll_recv_group`] instead. + #[deprecated(note = "Use TrackSubscriber::poll_recv_group instead")] + pub fn poll_next_group(&mut self, _waiter: &conducer::Waiter) -> Poll>> { + unimplemented!("Use TrackSubscriber::poll_recv_group instead") } /// Poll for the group with the given sequence, without blocking. @@ -486,9 +694,31 @@ impl TrackConsumer { conducer::wait(|waiter| self.poll_finished(waiter)).await } - /// Start the consumer at the specified sequence. - pub fn start_at(&mut self, sequence: u64) { - self.min_sequence = sequence; + /// Register a subscription and block until the first group exists (or finish/abort). + /// + /// The returned [`TrackSubscriber`] iterates groups respecting start/end range. + /// Dropping it removes the subscription from the aggregate. + pub async fn subscribe(&self, sub: Subscription) -> Result { + // Create a conducer channel for this subscription's preferences. + let sub_producer = conducer::Producer::new(sub); + let sub_consumer = sub_producer.consume(); + + { + // Upgrade to a temporary producer to insert the subscription consumer. + let producer = self.state.produce().ok_or(Error::Dropped)?; + let mut state = producer.write().map_err(|_| Error::Dropped)?; + state.subscriptions.push(sub_consumer); + } + + // Wait until the track is "ready": has at least one group, is finished, or aborted. + conducer::wait(|waiter| self.poll(waiter, |state| state.poll_ready())).await?; + + Ok(TrackSubscriber { + info: self.info.clone(), + state: self.state.clone(), + sub: sub_producer, + index: 0, + }) } /// Return the latest sequence number in the track. @@ -501,9 +731,10 @@ impl TrackConsumer { use futures::FutureExt; #[cfg(test)] +#[allow(deprecated)] impl TrackConsumer { pub fn assert_group(&mut self) -> GroupConsumer { - self.recv_group() + self.next_group() .now_or_never() .expect("group would have blocked") .expect("would have errored") @@ -512,8 +743,8 @@ impl TrackConsumer { pub fn assert_no_group(&mut self) { assert!( - self.recv_group().now_or_never().is_none(), - "recv_group would not have blocked" + self.next_group().now_or_never().is_none(), + "next_group would not have blocked" ); } @@ -540,6 +771,12 @@ impl TrackConsumer { pub fn assert_not_clone(&self, other: &Self) { assert!(!self.is_clone(other), "should not be clone"); } + + pub async fn assert_subscribe(&self) -> TrackSubscriber { + self.subscribe(Subscription::default()) + .await + .expect("subscribe should not have errored") + } } #[cfg(test)] @@ -631,19 +868,20 @@ mod test { } #[tokio::test] - async fn consumer_skips_evicted_groups() { + async fn subscriber_skips_evicted_groups() { tokio::time::pause(); let mut producer = Track::new("test").produce(); producer.append_group().unwrap(); // seq 0 - let mut consumer = producer.consume(); + let consumer = producer.consume(); + let mut subscriber = consumer.subscribe(Subscription::default()).await.unwrap(); tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await; producer.append_group().unwrap(); // seq 1 - // Group 0 was evicted. Consumer should get group 1. - let group = consumer.assert_group(); + // Group 0 was evicted. Subscriber should get group 1. + let group = subscriber.recv_group().now_or_never().unwrap().unwrap().unwrap(); assert_eq!(group.info.sequence, 1); } @@ -724,10 +962,11 @@ mod test { assert!(state.duplicates.contains(&2)); } - // Consumer should still be able to read through the hole. - let mut consumer = producer.consume(); - let group = consumer.assert_group(); - // consume() starts at index 0, first non-tombstoned group is seq 5. + // Subscriber should still be able to read through the hole. + let consumer = producer.consume(); + let mut subscriber = consumer.subscribe(Subscription::default()).await.unwrap(); + let group = subscriber.recv_group().now_or_never().unwrap().unwrap().unwrap(); + // subscribe() starts at index 0, first non-tombstoned group is seq 5. assert_eq!(group.info.sequence, 5); } @@ -776,10 +1015,21 @@ mod test { producer.create_group(Group { sequence: 1 }).unwrap(); producer.finish_at(1).unwrap(); - let mut consumer = producer.consume(); - assert_eq!(consumer.assert_group().info.sequence, 1); + let consumer = producer.consume(); + let mut subscriber = consumer.subscribe(Subscription::default()).await.unwrap(); + assert_eq!( + subscriber + .recv_group() + .now_or_never() + .expect("should not block") + .expect("would have errored") + .expect("track was closed") + .info + .sequence, + 1 + ); - let done = consumer + let done = subscriber .recv_group() .now_or_never() .expect("should not block") @@ -820,4 +1070,120 @@ mod test { assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded))); } + + #[tokio::test] + async fn create_group_replaces_aborted() { + let mut producer = Track::new("test").produce(); + + // Create and abort a group. + let mut group = producer.create_group(Group { sequence: 5 }).unwrap(); + group.abort(Error::Cancel).unwrap(); + + // Creating the same group again should succeed (replaces aborted). + let group2 = producer.create_group(Group { sequence: 5 }); + assert!(group2.is_ok(), "should replace aborted group"); + + // Creating again on a non-aborted group should fail. + let group3 = producer.create_group(Group { sequence: 5 }); + assert!( + matches!(group3, Err(Error::Duplicate)), + "should not replace active group" + ); + } + + #[tokio::test] + async fn create_group_does_not_replace_finished() { + let mut producer = Track::new("test").produce(); + + // Create and finish a group. + let mut group = producer.create_group(Group { sequence: 5 }).unwrap(); + group.finish().unwrap(); + + // Should fail because the group finished successfully. + let result = producer.create_group(Group { sequence: 5 }); + assert!(matches!(result, Err(Error::Duplicate))); + } + + #[tokio::test] + async fn subscribe_unblocks_on_first_group() { + let mut producer = Track::new("test").produce(); + let consumer = producer.consume(); + + // subscribe blocks until a group exists. + let handle = tokio::spawn(async move { consumer.subscribe(Subscription::default()).await }); + + // Yield to let subscribe start waiting. + tokio::task::yield_now().await; + + // Write a group to unblock subscribe. + producer.append_group().unwrap(); + + let subscriber = handle.await.unwrap().unwrap(); + assert_eq!(subscriber.latest(), Some(0)); + } + + #[tokio::test] + async fn subscribe_unblocks_on_finish() { + let mut producer = Track::new("test").produce(); + let consumer = producer.consume(); + + let handle = tokio::spawn(async move { consumer.subscribe(Subscription::default()).await }); + + tokio::task::yield_now().await; + + // Finishing without any groups should also unblock subscribe. + producer.finish().unwrap(); + + let subscriber = handle.await.unwrap().unwrap(); + assert!(subscriber.latest().is_none()); + } + + #[tokio::test] + async fn subscribe_unblocks_on_abort() { + let mut producer = Track::new("test").produce(); + let consumer = producer.consume(); + + let handle = tokio::spawn(async move { consumer.subscribe(Subscription::default()).await }); + + tokio::task::yield_now().await; + + // Aborting should unblock subscribe with an error. + producer.abort(Error::Cancel).unwrap(); + + let result = handle.await.unwrap(); + assert!(result.is_err(), "subscribe should return error on abort"); + } + + #[tokio::test] + async fn subscribe_returns_immediately_if_group_exists() { + let mut producer = Track::new("test").produce(); + producer.append_group().unwrap(); + + let consumer = producer.consume(); + + // Should not block since a group already exists. + let subscriber = consumer + .subscribe(Subscription::default()) + .now_or_never() + .expect("should not block") + .expect("should not error"); + + assert_eq!(subscriber.latest(), Some(0)); + } + + #[tokio::test] + async fn subscribe_returns_immediately_if_finished() { + let mut producer = Track::new("test").produce(); + producer.finish().unwrap(); + + let consumer = producer.consume(); + + let subscriber = consumer + .subscribe(Subscription::default()) + .now_or_never() + .expect("should not block") + .expect("should not error"); + + assert!(subscriber.latest().is_none()); + } } diff --git a/rs/moq-mux/src/catalog.rs b/rs/moq-mux/src/catalog.rs index a99b85a02..b51573ce0 100644 --- a/rs/moq-mux/src/catalog.rs +++ b/rs/moq-mux/src/catalog.rs @@ -29,10 +29,7 @@ impl CatalogProducer { catalog: hang::Catalog, ) -> Result { let hang_track = broadcast.create_track(hang::Catalog::default_track())?; - let msf_track = broadcast.create_track(moq_lite::Track { - name: moq_msf::DEFAULT_NAME.to_string(), - priority: 100, - })?; + let msf_track = broadcast.create_track(moq_lite::Track::new(moq_msf::DEFAULT_NAME))?; Ok(Self { hang_track, @@ -57,8 +54,10 @@ impl CatalogProducer { } /// Create a consumer for this catalog, receiving updates as they're published. - pub fn consume(&self) -> hang::CatalogConsumer { - hang::CatalogConsumer::new(self.hang_track.consume()) + pub async fn consume(&self) -> Result { + let track = self.hang_track.consume(); + let subscriber = track.subscribe(moq_lite::Subscription::default()).await?; + Ok(hang::CatalogConsumer::new(subscriber)) } /// Finish publishing to this catalog. diff --git a/rs/moq-mux/src/consumer/ordered.rs b/rs/moq-mux/src/consumer/ordered.rs index cb1f8a94d..cc0c4d249 100644 --- a/rs/moq-mux/src/consumer/ordered.rs +++ b/rs/moq-mux/src/consumer/ordered.rs @@ -8,7 +8,7 @@ use super::{ContainerFormat, OrderedFrame, Timestamp}; /// A consumer for media tracks with timestamp reordering. /// -/// This wraps a `moq_lite::TrackConsumer` and adds functionality +/// This wraps a `moq_lite::TrackSubscriber` and adds functionality /// like timestamp decoding, latency management, and frame buffering. /// /// Generic over `F: ContainerFormat` to support different container encodings. @@ -18,7 +18,7 @@ use super::{ContainerFormat, OrderedFrame, Timestamp}; /// The consumer can skip groups that are too far behind to maintain low latency. /// Configure the maximum acceptable delay through the consumer's latency settings. pub struct OrderedConsumer { - pub track: moq_lite::TrackConsumer, + pub track: moq_lite::TrackSubscriber, format: F, @@ -38,7 +38,7 @@ pub struct OrderedConsumer { impl OrderedConsumer { /// Create a new OrderedConsumer wrapping the given moq-lite consumer. - pub fn new(track: moq_lite::TrackConsumer, format: F, max_latency: std::time::Duration) -> Self { + pub fn new(track: moq_lite::TrackSubscriber, format: F, max_latency: std::time::Duration) -> Self { Self { track, format, @@ -218,14 +218,14 @@ impl OrderedConsumer { } } -impl From> for moq_lite::TrackConsumer { +impl From> for moq_lite::TrackSubscriber { fn from(inner: OrderedConsumer) -> Self { inner.track } } impl std::ops::Deref for OrderedConsumer { - type Target = moq_lite::TrackConsumer; + type Target = moq_lite::TrackSubscriber; fn deref(&self) -> &Self::Target { &self.track @@ -421,7 +421,11 @@ mod tests { #[tokio::test] async fn read_single_group() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0)]); @@ -439,7 +443,11 @@ mod tests { #[tokio::test] async fn read_multiple_frames_single_group() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0), ts(33_000), ts(66_000)]); @@ -459,7 +467,11 @@ mod tests { #[tokio::test] async fn read_multiple_groups_within_latency() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); // 5 groups, 20ms spacing. Total span = 80ms, well within 500ms latency. @@ -478,7 +490,11 @@ mod tests { async fn latency_skip_delivers_recent_groups() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); // Group 0: 5 frames, NOT finished (blocks consumer) @@ -515,7 +531,11 @@ mod tests { async fn zero_latency_skips_aggressively() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::ZERO); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -547,7 +567,11 @@ mod tests { async fn latency_skip_correctness() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -585,7 +609,11 @@ mod tests { async fn groups_delivered_in_sequence_order() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -616,7 +644,11 @@ mod tests { #[tokio::test] async fn adjacent_group_flushed_immediately() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0)]); @@ -634,7 +666,11 @@ mod tests { #[tokio::test] async fn bframes_within_group() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0), ts(66_000), ts(33_000)]); @@ -653,7 +689,11 @@ mod tests { async fn empty_track_returns_none() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); track.finish().unwrap(); @@ -671,7 +711,11 @@ mod tests { async fn track_closed_with_error() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0)]); @@ -693,7 +737,11 @@ mod tests { async fn closed_resolves_when_track_ends() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); assert!( @@ -716,7 +764,11 @@ mod tests { #[tokio::test] async fn gap_in_group_sequence_recovery() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); write_group(&mut track, 0, &[ts(0), ts(20_000)]); @@ -734,7 +786,11 @@ mod tests { #[tokio::test] async fn gap_at_start_of_sequence() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(80)); write_group(&mut track, 5, &[ts(0), ts(20_000)]); @@ -752,7 +808,11 @@ mod tests { #[tokio::test] async fn frame_timestamp_and_index_decoding() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0), ts(33_333), ts(66_666)]); @@ -774,7 +834,11 @@ mod tests { #[tokio::test] async fn frame_payload_preserved() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); let payload_bytes = vec![0x01, 0x02, 0x03, 0x04, 0x05]; @@ -806,7 +870,11 @@ mod tests { async fn no_infinite_loop_with_buffered_frames() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_secs(10)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -847,7 +915,11 @@ mod tests { #[tokio::test] async fn large_timestamps() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_secs(3700)); let one_hour = 3_600_000_000u64; @@ -863,7 +935,11 @@ mod tests { #[tokio::test] async fn set_max_latency_changes_behavior() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_secs(10)); write_group(&mut track, 0, &[ts(0)]); @@ -881,7 +957,11 @@ mod tests { async fn max_timestamp_tracks_through_bframes() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); // max_latency must exceed (group1_max - group0_min) = 100ms - 0ms = 100ms // to avoid the latency skip and test B-frame timestamp tracking. let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(110)); @@ -928,7 +1008,11 @@ mod tests { async fn startup_selects_earliest_group() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); write_group(&mut track, 3, &[ts(0)]); @@ -974,7 +1058,11 @@ mod tests { async fn startup_skips_groups_without_data() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); let _group5 = track.create_group(moq_lite::Group { sequence: 5 }).unwrap(); @@ -998,7 +1086,11 @@ mod tests { #[tokio::test] async fn startup_single_group_mid_stream() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 100, &[ts(3_000_000)]); @@ -1013,7 +1105,11 @@ mod tests { async fn multiple_sequential_latency_skips() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(50)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -1043,7 +1139,11 @@ mod tests { async fn latency_skip_boundary_exact() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -1075,7 +1175,11 @@ mod tests { async fn single_newer_group_triggers_skip() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); // Group 0: stalled at ts=0, NOT finished @@ -1110,7 +1214,11 @@ mod tests { async fn single_missing_sequence_near_eof_skips() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(100)); // Group 0: finished normally @@ -1129,7 +1237,11 @@ mod tests { #[tokio::test] async fn group_error_skips_to_next() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); @@ -1147,7 +1259,11 @@ mod tests { async fn track_finishes_while_reading() { tokio::time::pause(); let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); write_group(&mut track, 0, &[ts(0)]); @@ -1176,7 +1292,11 @@ mod tests { #[tokio::test] async fn empty_group_advances() { let mut track = moq_lite::Track::new("test").produce(); - let consumer_track = track.consume(); + let consumer_track = track + .consume() + .subscribe(moq_lite::Subscription::default()) + .await + .unwrap(); let mut consumer = OrderedConsumer::new(consumer_track, Legacy, Duration::from_millis(500)); let mut group0 = track.create_group(moq_lite::Group { sequence: 0 }).unwrap(); diff --git a/rs/moq-mux/src/convert/fmp4.rs b/rs/moq-mux/src/convert/fmp4.rs index b83c62cef..d4648fc91 100644 --- a/rs/moq-mux/src/convert/fmp4.rs +++ b/rs/moq-mux/src/convert/fmp4.rs @@ -27,97 +27,151 @@ impl Fmp4 { let mut broadcast = self.output; let catalog_producer = crate::CatalogProducer::new(&mut broadcast)?; - let catalog_track = self.input.subscribe_track(&hang::Catalog::default_track())?; + let catalog_track = self + .input + .subscribe_track(&hang::Catalog::default_track(), moq_lite::Subscription::default()) + .await?; let mut catalog_consumer = hang::CatalogConsumer::new(catalog_track); let catalog = catalog_consumer.next().await?.context("empty catalog")?; let mut output_catalog = catalog_producer.clone(); - let mut guard = output_catalog.lock(); let mut tasks = tokio::task::JoinSet::new(); + // Collect track subscriptions and output tracks first (async), + // then update the catalog guard only for the minimal mutation window. + struct VideoTask { + name: String, + config: VideoConfig, + input: moq_lite::TrackSubscriber, + output: moq_lite::TrackProducer, + convert: Option<(Vec, u64)>, // (init_data, timescale) for Legacy conversion + } + struct AudioTask { + name: String, + config: AudioConfig, + input: moq_lite::TrackSubscriber, + output: moq_lite::TrackProducer, + convert: Option<(Vec, u64)>, // (init_data, timescale) for Legacy conversion + } + + let mut video_tasks = Vec::new(); for (name, config) in &catalog.video.renditions { - let input_track = self.input.subscribe_track(&moq_lite::Track { - name: name.clone(), - priority: 1, - })?; - - match &config.container { - Container::Cmaf { .. } => { - guard.video.renditions.insert(name.clone(), config.clone()); - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 1, - })?; - let track_name = name.clone(); - tasks.spawn(async move { - if let Err(e) = passthrough_track(input_track, output_track).await { - tracing::error!(%e, track = %track_name, "passthrough_track failed"); - } - }); - } + let input_track = self + .input + .subscribe_track(&moq_lite::Track::new(name.clone()), moq_lite::Subscription::default()) + .await?; + let output_track = broadcast.create_track(moq_lite::Track::new(name.clone()))?; + + let convert = match &config.container { + Container::Cmaf { .. } => None, Container::Legacy => { let init_data = build_video_init(config)?; let timescale = guess_video_timescale(config); + Some((init_data, timescale)) + } + }; - let mut cmaf_config = config.clone(); - cmaf_config.container = Container::Cmaf { - init_data: base64::engine::general_purpose::STANDARD.encode(&init_data), - }; - guard.video.renditions.insert(name.clone(), cmaf_config); + video_tasks.push(VideoTask { + name: name.clone(), + config: config.clone(), + input: input_track, + output: output_track, + convert, + }); + } - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 1, - })?; + let mut audio_tasks = Vec::new(); + for (name, config) in &catalog.audio.renditions { + let input_track = self + .input + .subscribe_track(&moq_lite::Track::new(name.clone()), moq_lite::Subscription::default()) + .await?; + let output_track = broadcast.create_track(moq_lite::Track::new(name.clone()))?; + + let convert = match &config.container { + Container::Cmaf { .. } => None, + Container::Legacy => { + let init_data = build_audio_init(config)?; + let timescale = config.sample_rate as u64; + Some((init_data, timescale)) + } + }; + + audio_tasks.push(AudioTask { + name: name.clone(), + config: config.clone(), + input: input_track, + output: output_track, + convert, + }); + } - let track_name = name.clone(); + // Now acquire the guard only for catalog mutations (no awaits). + { + let mut guard = output_catalog.lock(); + for task in &video_tasks { + match &task.convert { + None => { + guard.video.renditions.insert(task.name.clone(), task.config.clone()); + } + Some((init_data, _)) => { + let mut cmaf_config = task.config.clone(); + cmaf_config.container = Container::Cmaf { + init_data: base64::engine::general_purpose::STANDARD.encode(init_data), + }; + guard.video.renditions.insert(task.name.clone(), cmaf_config); + } + } + } + for task in &audio_tasks { + match &task.convert { + None => { + guard.audio.renditions.insert(task.name.clone(), task.config.clone()); + } + Some((init_data, _)) => { + let mut cmaf_config = task.config.clone(); + cmaf_config.container = Container::Cmaf { + init_data: base64::engine::general_purpose::STANDARD.encode(init_data), + }; + guard.audio.renditions.insert(task.name.clone(), cmaf_config); + } + } + } + } + + // Spawn track conversion/passthrough tasks. + for task in video_tasks { + let track_name = task.name; + match task.convert { + None => { + tasks.spawn(async move { + if let Err(e) = passthrough_track(task.input, task.output).await { + tracing::error!(%e, track = %track_name, "passthrough_track failed"); + } + }); + } + Some((_, timescale)) => { tasks.spawn(async move { - if let Err(e) = convert_legacy_to_cmaf(input_track, output_track, timescale, true).await { + if let Err(e) = convert_legacy_to_cmaf(task.input, task.output, timescale, true).await { tracing::error!(%e, track = %track_name, "convert_legacy_to_cmaf failed"); } }); } } } - - for (name, config) in &catalog.audio.renditions { - let input_track = self.input.subscribe_track(&moq_lite::Track { - name: name.clone(), - priority: 2, - })?; - - match &config.container { - Container::Cmaf { .. } => { - guard.audio.renditions.insert(name.clone(), config.clone()); - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 2, - })?; - let track_name = name.clone(); + for task in audio_tasks { + let track_name = task.name; + match task.convert { + None => { tasks.spawn(async move { - if let Err(e) = passthrough_track(input_track, output_track).await { + if let Err(e) = passthrough_track(task.input, task.output).await { tracing::error!(%e, track = %track_name, "passthrough_track failed"); } }); } - Container::Legacy => { - let init_data = build_audio_init(config)?; - - let mut cmaf_config = config.clone(); - cmaf_config.container = Container::Cmaf { - init_data: base64::engine::general_purpose::STANDARD.encode(&init_data), - }; - guard.audio.renditions.insert(name.clone(), cmaf_config); - - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 2, - })?; - - let timescale = config.sample_rate as u64; - let track_name = name.clone(); + Some((_, timescale)) => { tasks.spawn(async move { - if let Err(e) = convert_legacy_to_cmaf(input_track, output_track, timescale, false).await { + if let Err(e) = convert_legacy_to_cmaf(task.input, task.output, timescale, false).await { tracing::error!(%e, track = %track_name, "convert_legacy_to_cmaf failed"); } }); @@ -125,8 +179,6 @@ impl Fmp4 { } } - drop(guard); - // Keep broadcast and catalog alive until all track tasks complete. while tasks.join_next().await.is_some() {} @@ -135,7 +187,7 @@ impl Fmp4 { } async fn passthrough_track( - mut input: moq_lite::TrackConsumer, + mut input: moq_lite::TrackSubscriber, mut output: moq_lite::TrackProducer, ) -> anyhow::Result<()> { while let Some(group) = input.recv_group().await? { @@ -151,7 +203,7 @@ async fn passthrough_track( } async fn convert_legacy_to_cmaf( - input: moq_lite::TrackConsumer, + input: moq_lite::TrackSubscriber, mut output: moq_lite::TrackProducer, timescale: u64, is_video: bool, diff --git a/rs/moq-mux/src/convert/hang.rs b/rs/moq-mux/src/convert/hang.rs index e9745203c..aa983df0d 100644 --- a/rs/moq-mux/src/convert/hang.rs +++ b/rs/moq-mux/src/convert/hang.rs @@ -32,7 +32,10 @@ impl Hang { let catalog_producer = crate::CatalogProducer::new(&mut broadcast)?; // Subscribe to the input catalog - let catalog_track = self.input.subscribe_track(&hang::Catalog::default_track())?; + let catalog_track = self + .input + .subscribe_track(&hang::Catalog::default_track(), moq_lite::Subscription::default()) + .await?; let mut catalog_consumer = hang::CatalogConsumer::new(catalog_track); let catalog = catalog_consumer.next().await?.context("empty catalog")?; @@ -42,19 +45,16 @@ impl Hang { // Convert video tracks for (name, config) in &catalog.video.renditions { - let input_track = self.input.subscribe_track(&moq_lite::Track { - name: name.clone(), - priority: 1, - })?; + let input_track = self + .input + .subscribe_track(&moq_lite::Track::new(name.clone()), moq_lite::Subscription::default()) + .await?; match &config.container { Container::Legacy => { // Already Legacy — pass through guard.video.renditions.insert(name.clone(), config.clone()); - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 1, - })?; + let output_track = broadcast.create_track(moq_lite::Track::new(name.clone()))?; let track_name = name.clone(); tasks.spawn(async move { if let Err(e) = passthrough_track(input_track, output_track).await { @@ -73,10 +73,7 @@ impl Hang { legacy_config.container = Container::Legacy; guard.video.renditions.insert(name.clone(), legacy_config); - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 1, - })?; + let output_track = broadcast.create_track(moq_lite::Track::new(name.clone()))?; let track_name = name.clone(); tasks.spawn(async move { @@ -90,18 +87,15 @@ impl Hang { // Convert audio tracks for (name, config) in &catalog.audio.renditions { - let input_track = self.input.subscribe_track(&moq_lite::Track { - name: name.clone(), - priority: 2, - })?; + let input_track = self + .input + .subscribe_track(&moq_lite::Track::new(name.clone()), moq_lite::Subscription::default()) + .await?; match &config.container { Container::Legacy => { guard.audio.renditions.insert(name.clone(), config.clone()); - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 2, - })?; + let output_track = broadcast.create_track(moq_lite::Track::new(name.clone()))?; let track_name = name.clone(); tasks.spawn(async move { if let Err(e) = passthrough_track(input_track, output_track).await { @@ -120,10 +114,7 @@ impl Hang { legacy_config.container = Container::Legacy; guard.audio.renditions.insert(name.clone(), legacy_config); - let output_track = broadcast.create_track(moq_lite::Track { - name: name.clone(), - priority: 2, - })?; + let output_track = broadcast.create_track(moq_lite::Track::new(name.clone()))?; let track_name = name.clone(); tasks.spawn(async move { @@ -158,7 +149,7 @@ fn parse_timescale(init_data: &[u8]) -> anyhow::Result { /// Pass a track through unchanged. async fn passthrough_track( - mut input: moq_lite::TrackConsumer, + mut input: moq_lite::TrackSubscriber, mut output: moq_lite::TrackProducer, ) -> anyhow::Result<()> { while let Some(group) = input.recv_group().await? { @@ -175,7 +166,7 @@ async fn passthrough_track( /// Convert CMAF moof+mdat frames to hang Legacy frames. async fn convert_cmaf_to_legacy( - mut input: moq_lite::TrackConsumer, + mut input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer, timescale: u64, is_video: bool, diff --git a/rs/moq-mux/src/convert/test.rs b/rs/moq-mux/src/convert/test.rs index 1582c978d..dc064840a 100644 --- a/rs/moq-mux/src/convert/test.rs +++ b/rs/moq-mux/src/convert/test.rs @@ -79,12 +79,7 @@ fn setup_input( group.write_frame(catalog_json).unwrap(); group.finish().unwrap(); - let video_track = broadcast - .create_track(moq_lite::Track { - name: "video".to_string(), - priority: 1, - }) - .unwrap(); + let video_track = broadcast.create_track(moq_lite::Track::new("video")).unwrap(); let consumer = broadcast.consume(); @@ -141,7 +136,7 @@ fn write_cmaf_frames(track: &mut moq_lite::TrackProducer, frames: &[(u64, Vec Vec<(Timestamp, Vec, bool)> { +async fn read_legacy_frames(track: moq_lite::TrackSubscriber) -> Vec<(Timestamp, Vec, bool)> { let mut ordered = crate::consumer::OrderedConsumer::new(track, crate::consumer::Legacy, Duration::MAX); let mut result = Vec::new(); @@ -159,7 +154,7 @@ async fn read_legacy_frames(track: moq_lite::TrackConsumer) -> Vec<(Timestamp, V } /// Read all raw CMAF frames from a track consumer (must be subscribed before converter finishes). -async fn read_cmaf_raw_frames(mut track: moq_lite::TrackConsumer) -> Vec { +async fn read_cmaf_raw_frames(mut track: moq_lite::TrackSubscriber) -> Vec { let mut result = Vec::new(); while let Some(group) = tokio::time::timeout(Duration::from_millis(500), track.recv_group()) .await @@ -203,13 +198,13 @@ fn parse_cmaf_frame(data: &Bytes, timescale: u64) -> (Timestamp, Vec, bool) } /// Subscribe to the video track, retrying until it appears. -async fn subscribe_video(consumer: &moq_lite::BroadcastConsumer) -> moq_lite::TrackConsumer { - let track = moq_lite::Track { - name: "video".to_string(), - priority: 1, - }; +async fn subscribe_video(consumer: &moq_lite::BroadcastConsumer) -> moq_lite::TrackSubscriber { + let track = moq_lite::Track::new("video"); loop { - match consumer.subscribe_track(&track) { + match consumer + .subscribe_track(&track, moq_lite::Subscription::default()) + .await + { Ok(t) => return t, Err(_) => tokio::task::yield_now().await, } diff --git a/rs/moq-native/examples/chat.rs b/rs/moq-native/examples/chat.rs index c459d2f91..f28cfd279 100644 --- a/rs/moq-native/examples/chat.rs +++ b/rs/moq-native/examples/chat.rs @@ -41,10 +41,7 @@ async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // Create a track that we'll insert into the broadcast. // A track is a series of groups representing a live stream. - let mut track = broadcast.create_track(moq_lite::Track { - name: "chat".to_string(), - priority: 0, - })?; + let mut track = broadcast.create_track(moq_lite::Track::new("chat"))?; // NOTE: The path is empty because we're using the URL to scope the broadcast. // If you put "alice" here, it would be published as "anon/chat-example/alice". diff --git a/rs/moq-native/tests/backend.rs b/rs/moq-native/tests/backend.rs index c1a4410a1..239572443 100644 --- a/rs/moq-native/tests/backend.rs +++ b/rs/moq-native/tests/backend.rs @@ -69,11 +69,13 @@ async fn backend_test(scheme: &str, backend: moq_native::QuicBackend) { assert_eq!(path.as_str(), "test"); let bc = bc.expect("expected announce, got unannounce"); - let mut track_sub = bc - .subscribe_track(&Track::new("video")) - .expect("subscribe_track failed"); + let track_sub = bc.consume_track(&Track::new("video")).expect("consume_track failed"); + let mut subscriber = tokio::time::timeout(TIMEOUT, track_sub.subscribe(Default::default())) + .await + .expect("subscribe timed out") + .expect("subscribe failed"); - let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group()) + let mut group_sub = tokio::time::timeout(TIMEOUT, subscriber.recv_group()) .await .expect("recv_group timed out") .expect("recv_group failed") @@ -221,11 +223,13 @@ async fn iroh_connect() { assert_eq!(path.as_str(), "test"); let bc = bc.expect("expected announce, got unannounce"); - let mut track_sub = bc - .subscribe_track(&Track::new("video")) - .expect("subscribe_track failed"); + let track_sub = bc.consume_track(&Track::new("video")).expect("consume_track failed"); + let mut subscriber = tokio::time::timeout(TIMEOUT, track_sub.subscribe(Default::default())) + .await + .expect("subscribe timed out") + .expect("subscribe failed"); - let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group()) + let mut group_sub = tokio::time::timeout(TIMEOUT, subscriber.recv_group()) .await .expect("recv_group timed out") .expect("recv_group failed") diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index ecd910492..0c9f7f254 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -87,12 +87,14 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi let bc = bc.expect("expected announce, got unannounce"); // Subscribe to the track. - let mut track_sub = bc - .subscribe_track(&Track::new("video")) - .expect("subscribe_track failed"); + let track_sub = bc.consume_track(&Track::new("video")).expect("consume_track failed"); + let mut subscriber = tokio::time::timeout(TIMEOUT, track_sub.subscribe(Default::default())) + .await + .expect("subscribe timed out") + .expect("subscribe failed"); // Read one group. - let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group()) + let mut group_sub = tokio::time::timeout(TIMEOUT, subscriber.recv_group()) .await .expect("recv_group timed out") .expect("recv_group failed") @@ -427,12 +429,14 @@ async fn broadcast_websocket() { let bc = bc.expect("expected announce, got unannounce"); // Subscribe to the track. - let mut track_sub = bc - .subscribe_track(&Track::new("video")) - .expect("subscribe_track failed"); + let track_sub = bc.consume_track(&Track::new("video")).expect("consume_track failed"); + let mut subscriber = tokio::time::timeout(TIMEOUT, track_sub.subscribe(Default::default())) + .await + .expect("subscribe timed out") + .expect("subscribe failed"); // Read one group. - let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group()) + let mut group_sub = tokio::time::timeout(TIMEOUT, subscriber.recv_group()) .await .expect("recv_group timed out") .expect("recv_group failed") @@ -534,11 +538,13 @@ async fn broadcast_websocket_fallback() { let bc = bc.expect("expected announce, got unannounce"); // Subscribe to the track. - let mut track_sub = bc - .subscribe_track(&Track::new("video")) - .expect("subscribe_track failed"); + let track_sub = bc.consume_track(&Track::new("video")).expect("consume_track failed"); + let mut subscriber = tokio::time::timeout(TIMEOUT, track_sub.subscribe(Default::default())) + .await + .expect("subscribe timed out") + .expect("subscribe failed"); - let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group()) + let mut group_sub = tokio::time::timeout(TIMEOUT, subscriber.recv_group()) .await .expect("recv_group timed out") .expect("recv_group failed") diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index e48fa3721..63f563332 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -277,34 +277,34 @@ async fn serve_fetch( tracing::info!(%broadcast, %track, "fetching track"); - let track = moq_lite::Track { - name: track, - priority: 0, - }; + let track = moq_lite::Track::new(track); // NOTE: The auth token is already scoped to the broadcast. let broadcast = origin.consume_broadcast("").ok_or(StatusCode::NOT_FOUND)?; - let mut track = broadcast.subscribe_track(&track).map_err(|err| match err { - moq_lite::Error::NotFound => StatusCode::NOT_FOUND, + let track = broadcast.consume_track(&track).map_err(|err| match err { + moq_lite::Error::UnknownTrack | moq_lite::Error::UnknownBroadcast => StatusCode::NOT_FOUND, _ => StatusCode::INTERNAL_SERVER_ERROR, })?; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(30); let result = tokio::time::timeout_at(deadline, async { + let to_err = |_: moq_lite::Error| StatusCode::INTERNAL_SERVER_ERROR; + let group = match params.group { FetchGroup::Latest => match track.latest() { - Some(sequence) => track.get_group(sequence).await, - None => track.recv_group().await, + Some(sequence) => track.get_group(sequence).await.map_err(to_err)?, + None => { + let mut sub = track + .subscribe(moq_lite::Subscription::default()) + .await + .map_err(to_err)?; + sub.recv_group().await.map_err(to_err)? + } }, - FetchGroup::Num(sequence) => track.get_group(sequence).await, - }; - - let group = match group { - Ok(Some(group)) => group, - Ok(None) => return Err(StatusCode::NOT_FOUND), - Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR), - }; + FetchGroup::Num(sequence) => track.get_group(sequence).await.map_err(to_err)?, + } + .ok_or(StatusCode::NOT_FOUND)?; tracing::info!(track = %track.info.name, group = %group.info.sequence, "serving group");