Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions js/msf/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "@moq/msf",
"type": "module",
"version": "0.1.0",
"description": "MSF (MOQT Streaming Format) catalog types for MoQ",
"license": "(MIT OR Apache-2.0)",
"repository": "github:moq-dev/moq",
"exports": {
".": "./src/index.ts"
},
"scripts": {
"build": "rimraf dist && tsc -b && bun ../common/package.ts",
"check": "tsc --noEmit",
"release": "bun ../common/release.ts"
},
"dependencies": {
"@moq/lite": "workspace:^",
"zod": "^4.1.5"
},
"devDependencies": {
"rimraf": "^6.0.1",
"typescript": "^5.9.2"
}
}
61 changes: 61 additions & 0 deletions js/msf/src/catalog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import type * as Moq from "@moq/lite";
import { z } from "zod";

export const PackagingSchema = z.enum(["loc", "cmaf", "legacy", "mediatimeline", "eventtimeline"]).or(z.string());

export type Packaging = z.infer<typeof PackagingSchema>;

export const RoleSchema = z
.enum(["video", "audio", "audiodescription", "caption", "subtitle", "signlanguage"])
.or(z.string());

export type Role = z.infer<typeof RoleSchema>;

export const TrackSchema = z.object({
name: z.string(),
packaging: PackagingSchema,
isLive: z.boolean(),
role: RoleSchema.optional(),
codec: z.string().optional(),
width: z.number().optional(),
height: z.number().optional(),
framerate: z.number().optional(),
samplerate: z.number().optional(),
channelConfig: z.string().optional(),
bitrate: z.number().optional(),
initData: z.string().optional(),
renderGroup: z.number().optional(),
altGroup: z.number().optional(),
});

export type Track = z.infer<typeof TrackSchema>;

export const CatalogSchema = z.object({
version: z.literal(1),
tracks: z.array(TrackSchema),
});

export type Catalog = z.infer<typeof CatalogSchema>;

export function encode(catalog: Catalog): Uint8Array {
const encoder = new TextEncoder();
return encoder.encode(JSON.stringify(catalog));
}

export function decode(raw: Uint8Array): Catalog {
const decoder = new TextDecoder();
const str = decoder.decode(raw);
try {
const json = JSON.parse(str);
return CatalogSchema.parse(json);
} catch (error) {
console.warn("invalid MSF catalog", str);
throw error;
}
}

export async function fetch(track: Moq.Track): Promise<Catalog | undefined> {
const frame = await track.readFrame();
if (!frame) return undefined;
return decode(frame);
}
1 change: 1 addition & 0 deletions js/msf/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./catalog";
8 changes: 8 additions & 0 deletions js/msf/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "./src"
},
"include": ["src"]
}
4 changes: 3 additions & 1 deletion js/watch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
"dependencies": {
"@moq/hang": "workspace:^",
"@moq/lite": "workspace:^",
"@moq/msf": "workspace:^",
"@moq/signals": "workspace:^",
"@moq/ui-core": "workspace:^"
"@moq/ui-core": "workspace:^",
"zod": "^4.1.5"
},
"devDependencies": {
"@types/audioworklet": "^0.0.77",
Expand Down
89 changes: 81 additions & 8 deletions js/watch/src/broadcast.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
import * as Catalog from "@moq/hang/catalog";
import type * as Moq from "@moq/lite";
import { Path } from "@moq/lite";
import * as Msf from "@moq/msf";
import { Effect, type Getter, Signal } from "@moq/signals";
import { z } from "zod";

import { toHang } from "./msf";

export const catalogFormatSchema = z.enum(["hang", "msf"]);
export type CatalogFormat = z.infer<typeof catalogFormatSchema>;

export const catalogAttrSchema = z.enum(["hang", "msf", "auto"]);
export type CatalogAttr = z.infer<typeof catalogAttrSchema>;

/** Delay (ms) before attempting MSF catalog fetch, giving hang format a headstart. */
const HANG_HEADSTART_MS = 100;

export interface BroadcastProps {
connection?: Moq.Connection.Established | Signal<Moq.Connection.Established | undefined>;
Expand All @@ -19,6 +32,9 @@ export interface BroadcastProps {
// Whether to reload the broadcast when it goes offline.
// Defaults to false; pass true to wait for an announcement before subscribing.
reload?: boolean | Signal<boolean>;

// Which catalog formats to try. Default: ["hang"]
catalog?: CatalogFormat[] | Signal<CatalogFormat[]>;
}

// A catalog source that (optionally) reloads automatically when live/offline.
Expand All @@ -30,6 +46,8 @@ export class Broadcast {
status = new Signal<"offline" | "loading" | "live">("offline");
reload: Signal<boolean>;

catalogFormats: Signal<CatalogFormat[]>;

#active = new Signal<Moq.Broadcast | undefined>(undefined);
readonly active: Getter<Moq.Broadcast | undefined> = this.#active;

Expand All @@ -46,6 +64,7 @@ export class Broadcast {
this.name = Signal.from(props?.name ?? Path.empty());
this.enabled = Signal.from(props?.enabled ?? false);
this.reload = Signal.from(props?.reload ?? false);
this.catalogFormats = Signal.from(props?.catalog ?? (["hang"] as CatalogFormat[]));

this.#announced = props?.announced ?? new Signal(new Set());

Expand Down Expand Up @@ -84,21 +103,75 @@ export class Broadcast {
if (!values) return;
const [_, broadcast] = values;

const formats = effect.get(this.catalogFormats);
this.status.set("loading");

const catalog = broadcast.subscribe("catalog.json", Catalog.PRIORITY.catalog);
effect.cleanup(() => catalog.close());
const hangTrack = formats.includes("hang")
? broadcast.subscribe("catalog.json", Catalog.PRIORITY.catalog)
: undefined;
const msfTrack = formats.includes("msf") ? broadcast.subscribe("catalog", Catalog.PRIORITY.catalog) : undefined;

if (hangTrack) effect.cleanup(() => hangTrack.close());
if (msfTrack) effect.cleanup(() => msfTrack.close());

effect.spawn(async () => {
try {
for (;;) {
const update = await Promise.race([effect.cancel, Catalog.fetch(catalog)]);
if (!update) break;
// Race the first catalog fetch, giving hang a headstart.
// Wrap each fetch so undefined results reject, ensuring only
// successful fetches compete (via Promise.any).
const hangFetch = hangTrack
? Catalog.fetch(hangTrack).then((r) => {
if (r) return { kind: "hang" as const, root: r };
throw new Error("hang catalog empty");
})
: undefined;

const msfFetch = msfTrack
? new Promise((r) => setTimeout(r, HANG_HEADSTART_MS))
.then(() => Msf.fetch(msfTrack))
.then((c) => {
if (c) return { kind: "msf" as const, root: toHang(c) };
throw new Error("msf catalog empty");
})
: undefined;

const candidates = [hangFetch, msfFetch].filter((c): c is NonNullable<typeof c> => c != null);
if (candidates.length === 0) return;

const first = await Promise.race([effect.cancel.then(() => undefined), Promise.any(candidates)]);
if (!first) return;

// Close the loser
if (first.kind === "hang") {
msfTrack?.close();
} else {
hangTrack?.close();
}

console.debug("received catalog", this.name.peek(), update);
console.debug("received catalog", first.kind, this.name.peek(), first.root);
this.#catalog.set(first.root);
this.status.set("live");

// Continue reading updates from the winner
const fetchNext =
first.kind === "hang"
? async () => {
const update = await Promise.race([
effect.cancel,
Catalog.fetch(hangTrack as Moq.Track),
]);
return update;
}
: async () => {
const update = await Promise.race([effect.cancel, Msf.fetch(msfTrack as Moq.Track)]);
return update ? toHang(update) : undefined;
};

this.#catalog.set(update);
this.status.set("live");
for (;;) {
const root = await fetchNext();
if (!root) break;
console.debug("received catalog", first.kind, this.name.peek(), root);
this.#catalog.set(root);
}
} catch (err) {
console.warn("error fetching catalog", this.name.peek(), err);
Expand Down
25 changes: 23 additions & 2 deletions js/watch/src/element.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ import type { Time } from "@moq/lite";
import * as Moq from "@moq/lite";
import { Effect, Signal } from "@moq/signals";
import { MultiBackend } from "./backend";
import { Broadcast } from "./broadcast";
import { Broadcast, type CatalogAttr, type CatalogFormat, catalogAttrSchema } from "./broadcast";
import { Sync } from "./sync";

const OBSERVED = ["url", "name", "paused", "volume", "muted", "reload", "jitter"] as const;
function parseCatalogAttr(value: string | null): CatalogFormat[] {
const parsed = catalogAttrSchema.safeParse(value);
if (!parsed.success) return ["hang"];
if (parsed.data === "auto") return ["hang", "msf"];
return [parsed.data];
}

const OBSERVED = ["url", "name", "paused", "volume", "muted", "reload", "jitter", "catalog"] as const;
type Observed = (typeof OBSERVED)[number];

// Close everything when this element is garbage collected.
Expand Down Expand Up @@ -152,6 +159,8 @@ export default class MoqWatch extends HTMLElement {
this.broadcast.reload.set(newValue !== null);
} else if (name === "jitter") {
this.backend.jitter.set((newValue ? Number.parseFloat(newValue) : 100) as Time.Milli);
} else if (name === "catalog") {
this.broadcast.catalogFormats.set(parseCatalogAttr(newValue));
} else {
const exhaustive: never = name;
throw new Error(`Invalid attribute: ${exhaustive}`);
Expand Down Expand Up @@ -213,6 +222,18 @@ export default class MoqWatch extends HTMLElement {
set jitter(value: number) {
this.backend.jitter.set(value as Time.Milli);
}

get catalog(): CatalogFormat[] {
return this.broadcast.catalogFormats.peek();
}

set catalog(value: CatalogAttr | CatalogFormat[]) {
if (typeof value === "string") {
this.broadcast.catalogFormats.set(parseCatalogAttr(value));
} else {
this.broadcast.catalogFormats.set(value);
}
}
}

customElements.define("moq-watch", MoqWatch);
Expand Down
Loading