Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions runtimes/edge/core/backend_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Process-wide backend readiness state for the edge runtime.

The edge runtime serves both HTTP clients and a Zenoh IPC bus
(local/llm/{request,response,status}) used by drone flight-control.
Flight-control needs an honest signal on local/llm/status to decide
whether to issue LLM-dependent commands. Without this state, init or
preload failures are swallowed and the heartbeat publishes "ready"
forever — flight-control then issues commands that are silently
dropped at the inference layer.

This module provides a single process-wide BACKEND_STATE object that
the lifespan code mutates as initialization progresses, and that the
Zenoh heartbeat reads to publish honest status.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from threading import Lock
import time


class Readiness(str, Enum):
INITIALIZING = "initializing"
READY = "ready"
DEGRADED = "degraded"
UNAVAILABLE = "unavailable"


@dataclass
class BackendState:
readiness: Readiness = Readiness.INITIALIZING
reason: str = ""
backend_initialized: bool = False
last_transition_ms: int = field(default_factory=lambda: int(time.time() * 1000))
_lock: Lock = field(default_factory=Lock)

def set(self, readiness: Readiness, reason: str = "") -> None:
with self._lock:
self.readiness = readiness
self.reason = reason
self.last_transition_ms = int(time.time() * 1000)

def mark_backend_initialized(self) -> None:
with self._lock:
self.backend_initialized = True
# Bump last_transition_ms so heartbeat consumers diffing on the
# timestamp can detect this transition. Without this, the
# backend_initialized flip is invisible in the published snapshot
# to anyone watching last_transition_ms as a change marker.
self.last_transition_ms = int(time.time() * 1000)

def snapshot(self) -> dict:
with self._lock:
return {
"readiness": self.readiness.value,
"reason": self.reason,
"backend_initialized": self.backend_initialized,
"last_transition_ms": self.last_transition_ms,
}


BACKEND_STATE = BackendState()
67 changes: 59 additions & 8 deletions runtimes/edge/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from fastapi.middleware.cors import CORSMiddleware
from llamafarm_common import offline_mode as _offline_mode_bootstrap # noqa: F401

from core.backend_state import BACKEND_STATE, Readiness
from core.logging import UniversalRuntimeLogger, setup_logging
from models import (
BaseModel,
Expand Down Expand Up @@ -110,18 +111,54 @@
def _init_llama_backend():
"""Initialize llama.cpp backend in the main thread.
Critical for Jetson/Tegra CUDA stability on unified memory architectures.

Records outcome on BACKEND_STATE so the Zenoh heartbeat and request
handler can refuse traffic when the backend is dead, instead of
silently dropping requests at the inference layer.
"""
try:
from llamafarm_llama._bindings import ensure_backend
logger.info("Initializing llama.cpp backend in main thread...")
ensure_backend()
BACKEND_STATE.mark_backend_initialized()
logger.info("llama.cpp backend initialized successfully")
except ImportError:
logger.debug("llamafarm_llama not installed, skipping backend init")
BACKEND_STATE.set(Readiness.UNAVAILABLE, "llamafarm_llama not installed")
logger.warning("llamafarm_llama not installed — LLM inference unavailable")
except Exception as e:
BACKEND_STATE.set(Readiness.UNAVAILABLE, f"backend init failed: {e}")
logger.warning(f"Failed to initialize llama.cpp backend: {e}")


def _finalize_backend_readiness(
preload_csv: str,
preload_succeeded: list[str],
preload_failed: list[str],
) -> None:
"""Compute final backend readiness from preload outcomes.

Backend init may have already set UNAVAILABLE; if so, leave it alone
so the original failure reason survives. Otherwise project preload
outcomes onto readiness so the Zenoh heartbeat publishes honest state.
"""
if not BACKEND_STATE.backend_initialized:
return
if not preload_csv:
BACKEND_STATE.set(Readiness.READY)
elif preload_failed and not preload_succeeded:
BACKEND_STATE.set(
Readiness.UNAVAILABLE,
f"all preloads failed: {', '.join(preload_failed)}",
)
elif preload_failed:
BACKEND_STATE.set(
Readiness.DEGRADED,
f"preload failed: {', '.join(preload_failed)}",
)
else:
BACKEND_STATE.set(Readiness.READY)


_init_llama_backend()


Expand Down Expand Up @@ -463,17 +500,27 @@ async def lifespan(app: FastAPI):
start_session_cleanup()
logger.info("startup-step END: session-cleanup-start")

# Start Zenoh IPC interface (non-blocking — falls back to HTTP-only on failure)
logger.info("startup-step BEGIN: zenoh-ipc-init")
_zenoh_ipc = ZenohIPC(inference_fn=_zenoh_inference)
logger.info("startup-step END: zenoh-ipc-init")
# Start Zenoh IPC interface (opt-in via LLAMAFARM_ZENOH_ENABLED).
# Off by default: most edge-runtime adopters don't need a pub/sub bus.
# Drone/flight-control deployments (e.g. Arc) set the flag in their compose/env.
if os.getenv("LLAMAFARM_ZENOH_ENABLED", "").strip().lower() in ("1", "true", "yes"):
logger.info("startup-step BEGIN: zenoh-ipc-init")
_zenoh_ipc = ZenohIPC(
inference_fn=_zenoh_inference,
state_provider=BACKEND_STATE.snapshot,
)
logger.info("startup-step END: zenoh-ipc-init")

logger.info("startup-step BEGIN: zenoh-ipc-start")
await _zenoh_ipc.start()
logger.info("startup-step END: zenoh-ipc-start")
logger.info("startup-step BEGIN: zenoh-ipc-start")
await _zenoh_ipc.start()
logger.info("startup-step END: zenoh-ipc-start")
else:
logger.info("Zenoh IPC disabled (set LLAMAFARM_ZENOH_ENABLED=1 to enable)")

# Preload and pin models if configured
preload_csv = os.getenv("PRELOAD_MODELS", "").strip()
preload_succeeded: list[str] = []
preload_failed: list[str] = []
if preload_csv:
logger.info("startup-step BEGIN: preload-models-loop")
preload_n_ctx_str = os.getenv("PRELOAD_N_CTX", "").strip()
Expand Down Expand Up @@ -503,10 +550,14 @@ async def lifespan(app: FastAPI):
_models.pin(cache_key)
logger.info(f"Preloaded and pinned model: {model_id} ({cache_key})")
logger.info(f"startup-step END: pin:{model_id}")
preload_succeeded.append(model_id)
except Exception as e:
logger.warning(f"Failed to preload model '{model_id}': {e}")
preload_failed.append(model_id)
logger.info("startup-step END: preload-models-loop")

_finalize_backend_readiness(preload_csv, preload_succeeded, preload_failed)

yield

# Shutdown
Expand Down
93 changes: 79 additions & 14 deletions runtimes/edge/services/zenoh_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import logging
import os
import time
from typing import Callable

logger = logging.getLogger("edge-runtime.zenoh")

ZENOH_ENDPOINT = os.getenv(
"ZENOH_ENDPOINT", "unixsock-stream//run/arc/zenoh.sock"
)
ZENOH_ENABLED = os.getenv("ZENOH_ENABLED", "true").lower() in ("true", "1", "yes")

TOPIC_REQUEST = "local/llm/request"
TOPIC_RESPONSE = "local/llm/response"
Expand All @@ -37,13 +37,28 @@
class ZenohIPC:
"""Manages a Zenoh session for LLM inference over IPC."""

def __init__(self, inference_fn):
def __init__(
self,
inference_fn,
state_provider: Callable[[], dict] | None = None,
):
"""
Args:
inference_fn: async callable(request_dict) -> response content string.
Called for each incoming inference request.
state_provider: optional callable returning a backend-state snapshot
dict with at least {"readiness": str, "reason": str}.
When provided, the heartbeat publishes the snapshot
and the request handler refuses non-ready traffic.
When None, legacy "always ready" behavior is kept.

Per-request inference failures are intentionally request-scoped: they
log and publish an error response but never mutate global readiness.
Otherwise a single un-cached model would permanently block inference
for every other model on the bus until process restart.
"""
self._inference_fn = inference_fn
self._state_provider = state_provider
self._session = None
self._subscriber = None
self._loop: asyncio.AbstractEventLoop | None = None
Expand All @@ -54,11 +69,9 @@ async def start(self) -> bool:
"""Open Zenoh session and start subscriber + heartbeat tasks.

Returns True if started successfully, False on failure (graceful degradation).
Callers gate construction with LLAMAFARM_ZENOH_ENABLED upstream; this
method assumes Zenoh is wanted by the time it is invoked.
"""
if not ZENOH_ENABLED:
logger.info("Zenoh IPC disabled (ZENOH_ENABLED=false)")
return False

logger.info("startup-step BEGIN: %s", "zenoh-import")
try:
import zenoh
Expand All @@ -72,6 +85,11 @@ async def start(self) -> bool:
try:
logger.info("startup-step BEGIN: %s", "zenoh-config")
config = zenoh.Config()
# Connect as a client to the comms router. Without explicit
# client mode, zenoh.open() returns a peer-mode session that
# silently fails to attach to the router — every put() becomes
# a no-op and the heartbeat never reaches the bus.
config.insert_json5("mode", '"client"')
config.insert_json5(
"connect/endpoints",
json.dumps([ZENOH_ENDPOINT]),
Expand Down Expand Up @@ -157,11 +175,41 @@ def _remove_future(f):
logger.error("Error dispatching Zenoh request", exc_info=True)

async def _handle_request(self, request: dict):
"""Process a single inference request and publish the response."""
"""Process a single inference request and publish the response.

If a state_provider is configured and the backend is not READY,
refuse immediately with an explicit error response instead of
passing the request to the inference layer (which would either
silently drop it or raise after a long timeout).
"""
request_id = request.get("request_id", "unknown")
model = request.get("model", "unknown")
t0 = time.monotonic()

# Admission control: refuse requests when backend isn't READY.
if self._state_provider is not None:
state = self._state_provider()
if state.get("readiness") != "ready":
response = {
"request_id": request_id,
"model": model,
"content": "",
"error": "backend_unavailable",
"reason": state.get("reason", ""),
"readiness": state.get("readiness"),
"timestamp_ms": int(time.time() * 1000),
}
self._session.put(
TOPIC_RESPONSE, json.dumps(response).encode()
)
logger.warning(
"Refused request %s: backend readiness=%s reason=%s",
request_id,
state.get("readiness"),
state.get("reason"),
)
return

t0 = time.monotonic()
try:
content = await self._inference_fn(request)
inference_ms = int((time.monotonic() - t0) * 1000)
Expand Down Expand Up @@ -194,19 +242,36 @@ async def _handle_request(self, request: dict):
# ------------------------------------------------------------------

async def _heartbeat_loop(self):
"""Publish periodic status to local/llm/status."""
"""Publish periodic status to local/llm/status.

When a state_provider is wired in, the heartbeat reflects real
backend readiness ("ready" | "degraded" | "unavailable" | "initializing")
so flight-control can refuse LLM-dependent commands instead of
issuing them and seeing them silently dropped at the inference
layer. The legacy `status` field is preserved (mirrors readiness)
for clients that haven't migrated to `readiness`.
"""
logger.info(
"Status heartbeat started (interval=%.1fs, topic=%s)",
STATUS_INTERVAL_S,
TOPIC_STATUS,
)
try:
while True:
status = {
"service": "edge-runtime",
"status": "ready",
"timestamp_ms": int(time.time() * 1000),
}
if self._state_provider is not None:
snapshot = self._state_provider()
status = {
"service": "edge-runtime",
"status": snapshot.get("readiness", "unknown"),
"timestamp_ms": int(time.time() * 1000),
**snapshot,
}
else:
status = {
"service": "edge-runtime",
"status": "ready",
"timestamp_ms": int(time.time() * 1000),
}
self._session.put(
TOPIC_STATUS, json.dumps(status).encode()
)
Expand Down
Loading
Loading