diff --git a/runtimes/edge/core/backend_state.py b/runtimes/edge/core/backend_state.py new file mode 100644 index 000000000..43a34ddc5 --- /dev/null +++ b/runtimes/edge/core/backend_state.py @@ -0,0 +1,64 @@ +"""Process-wide backend readiness state for the edge runtime. + +The edge runtime serves both HTTP clients and a Zenoh IPC bus +(local/llm/{request,response,status}) used by drone flight-control. +Flight-control needs an honest signal on local/llm/status to decide +whether to issue LLM-dependent commands. Without this state, init or +preload failures are swallowed and the heartbeat publishes "ready" +forever — flight-control then issues commands that are silently +dropped at the inference layer. + +This module provides a single process-wide BACKEND_STATE object that +the lifespan code mutates as initialization progresses, and that the +Zenoh heartbeat reads to publish honest status. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from threading import Lock +import time + + +class Readiness(str, Enum): + INITIALIZING = "initializing" + READY = "ready" + DEGRADED = "degraded" + UNAVAILABLE = "unavailable" + + +@dataclass +class BackendState: + readiness: Readiness = Readiness.INITIALIZING + reason: str = "" + backend_initialized: bool = False + last_transition_ms: int = field(default_factory=lambda: int(time.time() * 1000)) + _lock: Lock = field(default_factory=Lock) + + def set(self, readiness: Readiness, reason: str = "") -> None: + with self._lock: + self.readiness = readiness + self.reason = reason + self.last_transition_ms = int(time.time() * 1000) + + def mark_backend_initialized(self) -> None: + with self._lock: + self.backend_initialized = True + # Bump last_transition_ms so heartbeat consumers diffing on the + # timestamp can detect this transition. Without this, the + # backend_initialized flip is invisible in the published snapshot + # to anyone watching last_transition_ms as a change marker. + self.last_transition_ms = int(time.time() * 1000) + + def snapshot(self) -> dict: + with self._lock: + return { + "readiness": self.readiness.value, + "reason": self.reason, + "backend_initialized": self.backend_initialized, + "last_transition_ms": self.last_transition_ms, + } + + +BACKEND_STATE = BackendState() diff --git a/runtimes/edge/server.py b/runtimes/edge/server.py index 764fda23a..91e81e38c 100644 --- a/runtimes/edge/server.py +++ b/runtimes/edge/server.py @@ -58,6 +58,7 @@ from fastapi.middleware.cors import CORSMiddleware from llamafarm_common import offline_mode as _offline_mode_bootstrap # noqa: F401 +from core.backend_state import BACKEND_STATE, Readiness from core.logging import UniversalRuntimeLogger, setup_logging from models import ( BaseModel, @@ -110,18 +111,54 @@ def _init_llama_backend(): """Initialize llama.cpp backend in the main thread. Critical for Jetson/Tegra CUDA stability on unified memory architectures. + + Records outcome on BACKEND_STATE so the Zenoh heartbeat and request + handler can refuse traffic when the backend is dead, instead of + silently dropping requests at the inference layer. """ try: from llamafarm_llama._bindings import ensure_backend logger.info("Initializing llama.cpp backend in main thread...") ensure_backend() + BACKEND_STATE.mark_backend_initialized() logger.info("llama.cpp backend initialized successfully") except ImportError: - logger.debug("llamafarm_llama not installed, skipping backend init") + BACKEND_STATE.set(Readiness.UNAVAILABLE, "llamafarm_llama not installed") + logger.warning("llamafarm_llama not installed — LLM inference unavailable") except Exception as e: + BACKEND_STATE.set(Readiness.UNAVAILABLE, f"backend init failed: {e}") logger.warning(f"Failed to initialize llama.cpp backend: {e}") +def _finalize_backend_readiness( + preload_csv: str, + preload_succeeded: list[str], + preload_failed: list[str], +) -> None: + """Compute final backend readiness from preload outcomes. + + Backend init may have already set UNAVAILABLE; if so, leave it alone + so the original failure reason survives. Otherwise project preload + outcomes onto readiness so the Zenoh heartbeat publishes honest state. + """ + if not BACKEND_STATE.backend_initialized: + return + if not preload_csv: + BACKEND_STATE.set(Readiness.READY) + elif preload_failed and not preload_succeeded: + BACKEND_STATE.set( + Readiness.UNAVAILABLE, + f"all preloads failed: {', '.join(preload_failed)}", + ) + elif preload_failed: + BACKEND_STATE.set( + Readiness.DEGRADED, + f"preload failed: {', '.join(preload_failed)}", + ) + else: + BACKEND_STATE.set(Readiness.READY) + + _init_llama_backend() @@ -463,17 +500,27 @@ async def lifespan(app: FastAPI): start_session_cleanup() logger.info("startup-step END: session-cleanup-start") - # Start Zenoh IPC interface (non-blocking — falls back to HTTP-only on failure) - logger.info("startup-step BEGIN: zenoh-ipc-init") - _zenoh_ipc = ZenohIPC(inference_fn=_zenoh_inference) - logger.info("startup-step END: zenoh-ipc-init") + # Start Zenoh IPC interface (opt-in via LLAMAFARM_ZENOH_ENABLED). + # Off by default: most edge-runtime adopters don't need a pub/sub bus. + # Drone/flight-control deployments (e.g. Arc) set the flag in their compose/env. + if os.getenv("LLAMAFARM_ZENOH_ENABLED", "").strip().lower() in ("1", "true", "yes"): + logger.info("startup-step BEGIN: zenoh-ipc-init") + _zenoh_ipc = ZenohIPC( + inference_fn=_zenoh_inference, + state_provider=BACKEND_STATE.snapshot, + ) + logger.info("startup-step END: zenoh-ipc-init") - logger.info("startup-step BEGIN: zenoh-ipc-start") - await _zenoh_ipc.start() - logger.info("startup-step END: zenoh-ipc-start") + logger.info("startup-step BEGIN: zenoh-ipc-start") + await _zenoh_ipc.start() + logger.info("startup-step END: zenoh-ipc-start") + else: + logger.info("Zenoh IPC disabled (set LLAMAFARM_ZENOH_ENABLED=1 to enable)") # Preload and pin models if configured preload_csv = os.getenv("PRELOAD_MODELS", "").strip() + preload_succeeded: list[str] = [] + preload_failed: list[str] = [] if preload_csv: logger.info("startup-step BEGIN: preload-models-loop") preload_n_ctx_str = os.getenv("PRELOAD_N_CTX", "").strip() @@ -503,10 +550,14 @@ async def lifespan(app: FastAPI): _models.pin(cache_key) logger.info(f"Preloaded and pinned model: {model_id} ({cache_key})") logger.info(f"startup-step END: pin:{model_id}") + preload_succeeded.append(model_id) except Exception as e: logger.warning(f"Failed to preload model '{model_id}': {e}") + preload_failed.append(model_id) logger.info("startup-step END: preload-models-loop") + _finalize_backend_readiness(preload_csv, preload_succeeded, preload_failed) + yield # Shutdown diff --git a/runtimes/edge/services/zenoh_ipc.py b/runtimes/edge/services/zenoh_ipc.py index 99da43a87..57b3bf86f 100644 --- a/runtimes/edge/services/zenoh_ipc.py +++ b/runtimes/edge/services/zenoh_ipc.py @@ -19,13 +19,13 @@ import logging import os import time +from typing import Callable logger = logging.getLogger("edge-runtime.zenoh") ZENOH_ENDPOINT = os.getenv( "ZENOH_ENDPOINT", "unixsock-stream//run/arc/zenoh.sock" ) -ZENOH_ENABLED = os.getenv("ZENOH_ENABLED", "true").lower() in ("true", "1", "yes") TOPIC_REQUEST = "local/llm/request" TOPIC_RESPONSE = "local/llm/response" @@ -37,13 +37,28 @@ class ZenohIPC: """Manages a Zenoh session for LLM inference over IPC.""" - def __init__(self, inference_fn): + def __init__( + self, + inference_fn, + state_provider: Callable[[], dict] | None = None, + ): """ Args: inference_fn: async callable(request_dict) -> response content string. Called for each incoming inference request. + state_provider: optional callable returning a backend-state snapshot + dict with at least {"readiness": str, "reason": str}. + When provided, the heartbeat publishes the snapshot + and the request handler refuses non-ready traffic. + When None, legacy "always ready" behavior is kept. + + Per-request inference failures are intentionally request-scoped: they + log and publish an error response but never mutate global readiness. + Otherwise a single un-cached model would permanently block inference + for every other model on the bus until process restart. """ self._inference_fn = inference_fn + self._state_provider = state_provider self._session = None self._subscriber = None self._loop: asyncio.AbstractEventLoop | None = None @@ -54,11 +69,9 @@ async def start(self) -> bool: """Open Zenoh session and start subscriber + heartbeat tasks. Returns True if started successfully, False on failure (graceful degradation). + Callers gate construction with LLAMAFARM_ZENOH_ENABLED upstream; this + method assumes Zenoh is wanted by the time it is invoked. """ - if not ZENOH_ENABLED: - logger.info("Zenoh IPC disabled (ZENOH_ENABLED=false)") - return False - logger.info("startup-step BEGIN: %s", "zenoh-import") try: import zenoh @@ -72,6 +85,11 @@ async def start(self) -> bool: try: logger.info("startup-step BEGIN: %s", "zenoh-config") config = zenoh.Config() + # Connect as a client to the comms router. Without explicit + # client mode, zenoh.open() returns a peer-mode session that + # silently fails to attach to the router — every put() becomes + # a no-op and the heartbeat never reaches the bus. + config.insert_json5("mode", '"client"') config.insert_json5( "connect/endpoints", json.dumps([ZENOH_ENDPOINT]), @@ -157,11 +175,41 @@ def _remove_future(f): logger.error("Error dispatching Zenoh request", exc_info=True) async def _handle_request(self, request: dict): - """Process a single inference request and publish the response.""" + """Process a single inference request and publish the response. + + If a state_provider is configured and the backend is not READY, + refuse immediately with an explicit error response instead of + passing the request to the inference layer (which would either + silently drop it or raise after a long timeout). + """ request_id = request.get("request_id", "unknown") model = request.get("model", "unknown") - t0 = time.monotonic() + # Admission control: refuse requests when backend isn't READY. + if self._state_provider is not None: + state = self._state_provider() + if state.get("readiness") != "ready": + response = { + "request_id": request_id, + "model": model, + "content": "", + "error": "backend_unavailable", + "reason": state.get("reason", ""), + "readiness": state.get("readiness"), + "timestamp_ms": int(time.time() * 1000), + } + self._session.put( + TOPIC_RESPONSE, json.dumps(response).encode() + ) + logger.warning( + "Refused request %s: backend readiness=%s reason=%s", + request_id, + state.get("readiness"), + state.get("reason"), + ) + return + + t0 = time.monotonic() try: content = await self._inference_fn(request) inference_ms = int((time.monotonic() - t0) * 1000) @@ -194,7 +242,15 @@ async def _handle_request(self, request: dict): # ------------------------------------------------------------------ async def _heartbeat_loop(self): - """Publish periodic status to local/llm/status.""" + """Publish periodic status to local/llm/status. + + When a state_provider is wired in, the heartbeat reflects real + backend readiness ("ready" | "degraded" | "unavailable" | "initializing") + so flight-control can refuse LLM-dependent commands instead of + issuing them and seeing them silently dropped at the inference + layer. The legacy `status` field is preserved (mirrors readiness) + for clients that haven't migrated to `readiness`. + """ logger.info( "Status heartbeat started (interval=%.1fs, topic=%s)", STATUS_INTERVAL_S, @@ -202,11 +258,20 @@ async def _heartbeat_loop(self): ) try: while True: - status = { - "service": "edge-runtime", - "status": "ready", - "timestamp_ms": int(time.time() * 1000), - } + if self._state_provider is not None: + snapshot = self._state_provider() + status = { + "service": "edge-runtime", + "status": snapshot.get("readiness", "unknown"), + "timestamp_ms": int(time.time() * 1000), + **snapshot, + } + else: + status = { + "service": "edge-runtime", + "status": "ready", + "timestamp_ms": int(time.time() * 1000), + } self._session.put( TOPIC_STATUS, json.dumps(status).encode() ) diff --git a/runtimes/edge/tests/test_backend_state.py b/runtimes/edge/tests/test_backend_state.py new file mode 100644 index 000000000..e7c632d1a --- /dev/null +++ b/runtimes/edge/tests/test_backend_state.py @@ -0,0 +1,83 @@ +"""Tests for the BackendState process-wide readiness object.""" + +from __future__ import annotations + +from core.backend_state import BackendState, Readiness + + +class TestBackendStateInitial: + def test_initial_readiness_is_initializing(self): + state = BackendState() + snap = state.snapshot() + assert snap["readiness"] == "initializing" + assert snap["backend_initialized"] is False + assert snap["reason"] == "" + assert isinstance(snap["last_transition_ms"], int) + + +class TestBackendStateTransitions: + def test_set_updates_readiness_and_reason(self): + state = BackendState() + state.set(Readiness.READY) + snap = state.snapshot() + assert snap["readiness"] == "ready" + assert snap["reason"] == "" + + def test_set_unavailable_with_reason(self): + state = BackendState() + state.set(Readiness.UNAVAILABLE, "backend init failed: foo") + snap = state.snapshot() + assert snap["readiness"] == "unavailable" + assert snap["reason"] == "backend init failed: foo" + + def test_set_degraded_with_reason(self): + state = BackendState() + state.set(Readiness.DEGRADED, "preload failed: m1") + snap = state.snapshot() + assert snap["readiness"] == "degraded" + assert snap["reason"] == "preload failed: m1" + + def test_mark_backend_initialized(self): + state = BackendState() + assert state.snapshot()["backend_initialized"] is False + state.mark_backend_initialized() + assert state.snapshot()["backend_initialized"] is True + + def test_mark_backend_initialized_advances_transition_ms(self): + """Heartbeat consumers diff on last_transition_ms to detect changes; + without bumping it, the backend_initialized flip is invisible.""" + import time + + state = BackendState() + first = state.snapshot()["last_transition_ms"] + # 5 ms is enough that int(time.time()*1000) reliably advances even + # on jittery CI runners. + time.sleep(0.005) + state.mark_backend_initialized() + second = state.snapshot()["last_transition_ms"] + assert second > first + + def test_last_transition_ms_advances(self): + import time + + state = BackendState() + first = state.snapshot()["last_transition_ms"] + time.sleep(0.002) + state.set(Readiness.READY) + second = state.snapshot()["last_transition_ms"] + assert second >= first + + +class TestBackendStateSnapshotShape: + def test_snapshot_is_a_plain_dict(self): + """Heartbeat publishes JSON; snapshot must be JSON-serializable.""" + import json + + state = BackendState() + state.set(Readiness.DEGRADED, "x") + state.mark_backend_initialized() + # Round-trip through json to confirm serializability. + round_tripped = json.loads(json.dumps(state.snapshot())) + assert round_tripped["readiness"] == "degraded" + assert round_tripped["reason"] == "x" + assert round_tripped["backend_initialized"] is True diff --git a/runtimes/edge/tests/test_server_lifecycle.py b/runtimes/edge/tests/test_server_lifecycle.py new file mode 100644 index 000000000..4998d3771 --- /dev/null +++ b/runtimes/edge/tests/test_server_lifecycle.py @@ -0,0 +1,157 @@ +"""Tests for server.py backend-state wiring. + +Closes the unit-coverage gap left by the original PR: + +1. `_init_llama_backend()` outcome recording — success marks the backend + as initialized and leaves readiness at INITIALIZING (the lifespan + finalize step decides READY/DEGRADED later); ImportError and generic + Exception both flip readiness to UNAVAILABLE with a reason string. + +2. `_finalize_backend_readiness()` — projects preload outcomes onto + readiness without ever overwriting an UNAVAILABLE set by backend init, + so the original failure reason survives. + +The finalize logic is split out of `lifespan()` so it can be tested +without spinning up the full FastAPI app, KV cache manager, Zenoh +session, etc. +""" + +from __future__ import annotations + +import sys +import types + +import pytest + +import server +from core.backend_state import BackendState, Readiness + + +@pytest.fixture +def fresh_state(monkeypatch): + """Replace server.BACKEND_STATE with a clean instance for the test. + + The module-level `_init_llama_backend()` call at import time has + already mutated the real singleton; swapping in a fresh instance + here gives each test a known starting point and prevents cross-test + pollution. + """ + state = BackendState() + monkeypatch.setattr(server, "BACKEND_STATE", state) + return state + + +def _install_fake_bindings(monkeypatch, ensure_backend): + """Install a fake `llamafarm_llama._bindings` module that exposes + `ensure_backend`. Both the parent and submodule must be in sys.modules + for `from llamafarm_llama._bindings import ensure_backend` to resolve.""" + parent = types.ModuleType("llamafarm_llama") + submodule = types.ModuleType("llamafarm_llama._bindings") + submodule.ensure_backend = ensure_backend + parent._bindings = submodule + monkeypatch.setitem(sys.modules, "llamafarm_llama", parent) + monkeypatch.setitem(sys.modules, "llamafarm_llama._bindings", submodule) + + +class TestInitLlamaBackend: + def test_success_marks_backend_initialized(self, fresh_state, monkeypatch): + calls = {"n": 0} + + def ensure_backend(): + calls["n"] += 1 + + _install_fake_bindings(monkeypatch, ensure_backend) + + server._init_llama_backend() + + assert calls["n"] == 1 + snap = fresh_state.snapshot() + assert snap["backend_initialized"] is True + # Readiness stays INITIALIZING — finalize step decides READY/DEGRADED. + assert snap["readiness"] == "initializing" + assert snap["reason"] == "" + + def test_import_error_sets_unavailable(self, fresh_state, monkeypatch): + # sys.modules[name] = None forces `import name` to raise ImportError. + monkeypatch.setitem(sys.modules, "llamafarm_llama", None) + monkeypatch.setitem(sys.modules, "llamafarm_llama._bindings", None) + + server._init_llama_backend() + + snap = fresh_state.snapshot() + assert snap["readiness"] == "unavailable" + assert "llamafarm_llama not installed" in snap["reason"] + assert snap["backend_initialized"] is False + + def test_runtime_failure_sets_unavailable_with_message( + self, fresh_state, monkeypatch + ): + def ensure_backend(): + raise RuntimeError("CUDA driver mismatch") + + _install_fake_bindings(monkeypatch, ensure_backend) + + server._init_llama_backend() + + snap = fresh_state.snapshot() + assert snap["readiness"] == "unavailable" + assert "backend init failed" in snap["reason"] + # Underlying error message must propagate so operators can diagnose. + assert "CUDA driver mismatch" in snap["reason"] + assert snap["backend_initialized"] is False + + +class TestFinalizeBackendReadiness: + def test_uninitialized_backend_is_not_overwritten(self, fresh_state): + """If backend init failed (UNAVAILABLE + initialized=False), the + finalize step must preserve the original reason — not mask it + with a preload-derived state.""" + fresh_state.set(Readiness.UNAVAILABLE, "backend init failed: missing lib") + + server._finalize_backend_readiness("model-a", ["model-a"], []) + + snap = fresh_state.snapshot() + assert snap["readiness"] == "unavailable" + assert snap["reason"] == "backend init failed: missing lib" + + def test_no_preload_configured_is_ready(self, fresh_state): + fresh_state.mark_backend_initialized() + + server._finalize_backend_readiness("", [], []) + + snap = fresh_state.snapshot() + assert snap["readiness"] == "ready" + assert snap["reason"] == "" + + def test_all_preloads_succeeded_is_ready(self, fresh_state): + fresh_state.mark_backend_initialized() + + server._finalize_backend_readiness("a,b", ["a", "b"], []) + + snap = fresh_state.snapshot() + assert snap["readiness"] == "ready" + assert snap["reason"] == "" + + def test_partial_preload_failure_is_degraded(self, fresh_state): + fresh_state.mark_backend_initialized() + + server._finalize_backend_readiness("a,b", ["a"], ["b"]) + + snap = fresh_state.snapshot() + assert snap["readiness"] == "degraded" + assert "preload failed" in snap["reason"] + assert "b" in snap["reason"] + # Successful model name must not appear in the degraded reason — + # operators should see only what failed. + assert "a" not in snap["reason"].split(":")[-1] + + def test_all_preloads_failed_is_unavailable(self, fresh_state): + fresh_state.mark_backend_initialized() + + server._finalize_backend_readiness("a,b", [], ["a", "b"]) + + snap = fresh_state.snapshot() + assert snap["readiness"] == "unavailable" + assert "all preloads failed" in snap["reason"] + assert "a" in snap["reason"] + assert "b" in snap["reason"] diff --git a/runtimes/edge/tests/test_zenoh_ipc_state.py b/runtimes/edge/tests/test_zenoh_ipc_state.py new file mode 100644 index 000000000..e580b492a --- /dev/null +++ b/runtimes/edge/tests/test_zenoh_ipc_state.py @@ -0,0 +1,202 @@ +"""Tests for ZenohIPC admission control and heartbeat state propagation. + +These tests stub out the Zenoh session and event loop wiring — they +validate the *logic* in _handle_request and _heartbeat_loop, not the +zenoh transport itself. +""" + +from __future__ import annotations + +import asyncio +import json + +import pytest + +from core.backend_state import BackendState, Readiness +from services.zenoh_ipc import ZenohIPC + + +class FakeSession: + def __init__(self): + self.puts: list[tuple[str, dict]] = [] + + def put(self, topic, payload): + self.puts.append((topic, json.loads(bytes(payload)))) + + +@pytest.fixture +def fake_session(): + return FakeSession() + + +@pytest.fixture +def state(): + return BackendState() + + +@pytest.fixture +def inference_calls(): + return [] + + +@pytest.fixture +def ipc(fake_session, state, inference_calls): + async def fake_inference(request): + inference_calls.append(request) + return "ok" + + ipc = ZenohIPC( + inference_fn=fake_inference, + state_provider=state.snapshot, + ) + ipc._session = fake_session + return ipc + + +class TestRequestAdmissionControl: + @pytest.mark.asyncio + async def test_refuses_when_initializing(self, ipc, fake_session, inference_calls): + await ipc._handle_request({"request_id": "r1", "model": "m"}) + assert inference_calls == [] + assert len(fake_session.puts) == 1 + topic, payload = fake_session.puts[0] + assert topic == "local/llm/response" + assert payload["error"] == "backend_unavailable" + assert payload["readiness"] == "initializing" + assert payload["request_id"] == "r1" + + @pytest.mark.asyncio + async def test_refuses_when_unavailable(self, ipc, state, fake_session, inference_calls): + state.set(Readiness.UNAVAILABLE, "backend init failed: missing lib") + await ipc._handle_request({"request_id": "r2", "model": "m"}) + assert inference_calls == [] + topic, payload = fake_session.puts[0] + assert payload["error"] == "backend_unavailable" + assert payload["reason"] == "backend init failed: missing lib" + assert payload["readiness"] == "unavailable" + + @pytest.mark.asyncio + async def test_refuses_when_degraded(self, ipc, state, fake_session, inference_calls): + state.set(Readiness.DEGRADED, "preload failed: m1") + await ipc._handle_request({"request_id": "r3", "model": "m"}) + assert inference_calls == [] + topic, payload = fake_session.puts[0] + assert payload["error"] == "backend_unavailable" + assert payload["readiness"] == "degraded" + + @pytest.mark.asyncio + async def test_passes_through_when_ready(self, ipc, state, fake_session, inference_calls): + state.mark_backend_initialized() + state.set(Readiness.READY) + await ipc._handle_request({"request_id": "r4", "model": "m"}) + assert len(inference_calls) == 1 + topic, payload = fake_session.puts[0] + assert payload["content"] == "ok" + assert "error" not in payload + + +class TestRuntimeFailuresStayRequestScoped: + """Per-request inference failures must NOT mutate global readiness. + + Scoping rule: a model-resolution miss (HF offline, missing local entry) + or a decode/prompt error is a problem with *that* request, not with the + backend as a whole. Flipping global state would mean a single un-cached + model permanently blocks every other model on the bus until restart — + the HTTP path treats the same exceptions as request-scoped 404s and + Zenoh follows the same scope. + """ + + @pytest.mark.asyncio + async def test_offline_mode_error_does_not_mutate_state( + self, fake_session, state + ): + # Skip if huggingface_hub isn't installed in this environment. + pytest.importorskip("huggingface_hub") + from huggingface_hub.errors import OfflineModeIsEnabled + + async def boom(request): + raise OfflineModeIsEnabled("offline") + + ipc = ZenohIPC(inference_fn=boom, state_provider=state.snapshot) + ipc._session = fake_session + state.mark_backend_initialized() + state.set(Readiness.READY) + + await ipc._handle_request({"request_id": "r5", "model": "m1"}) + + # Backend stays READY — the next request to a *different* (cached) + # model would still be admitted. + assert state.snapshot()["readiness"] == "ready" + topic, payload = fake_session.puts[0] + assert payload["error"] == "inference failed" + + @pytest.mark.asyncio + async def test_generic_exception_does_not_mutate_state( + self, fake_session, state + ): + async def boom(request): + raise RuntimeError("decode error") + + ipc = ZenohIPC(inference_fn=boom, state_provider=state.snapshot) + ipc._session = fake_session + state.mark_backend_initialized() + state.set(Readiness.READY) + + await ipc._handle_request({"request_id": "r6", "model": "m1"}) + + assert state.snapshot()["readiness"] == "ready" + + +class TestHeartbeatPublishesSnapshot: + @pytest.mark.asyncio + async def test_heartbeat_publishes_current_state(self, fake_session, state): + ipc = ZenohIPC(inference_fn=None, state_provider=state.snapshot) + ipc._session = fake_session + state.set(Readiness.UNAVAILABLE, "backend init failed: foo") + + # Run one heartbeat iteration by patching sleep to immediately cancel. + async def fake_sleep(_): + raise asyncio.CancelledError + + import services.zenoh_ipc as mod + + original_sleep = mod.asyncio.sleep + mod.asyncio.sleep = fake_sleep + try: + with pytest.raises(asyncio.CancelledError): + await ipc._heartbeat_loop() + finally: + mod.asyncio.sleep = original_sleep + + # Exactly one heartbeat published before the cancel. + assert len(fake_session.puts) == 1 + topic, payload = fake_session.puts[0] + assert topic == "local/llm/status" + assert payload["status"] == "unavailable" + assert payload["readiness"] == "unavailable" + assert payload["reason"] == "backend init failed: foo" + assert payload["service"] == "edge-runtime" + + @pytest.mark.asyncio + async def test_heartbeat_legacy_mode_without_state_provider(self, fake_session): + """Without a state_provider, fall back to the old hardcoded `ready` + so existing deployments that haven't wired in BACKEND_STATE still + work.""" + ipc = ZenohIPC(inference_fn=None) + ipc._session = fake_session + + async def fake_sleep(_): + raise asyncio.CancelledError + + import services.zenoh_ipc as mod + + original_sleep = mod.asyncio.sleep + mod.asyncio.sleep = fake_sleep + try: + with pytest.raises(asyncio.CancelledError): + await ipc._heartbeat_loop() + finally: + mod.asyncio.sleep = original_sleep + + topic, payload = fake_session.puts[0] + assert payload["status"] == "ready"