feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827
feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827postevanus-scale wants to merge 2 commits into
Conversation
Three small fixes uncovered while diagnosing a stuck endpoint build on GKE running model-engine 798747b…: 1. service_template_config_map.yaml: HPA `type: Pods` was paired with `target.type: Value`, which is invalid for HPA v2. Kubernetes rejects the metric (status: `InvalidMetricSourceType`) and falls back to `minReplicas`. Switch to the correct `AverageValue`. 2. service_template_config_map.yaml: `virtual-service.yaml` and `destination-rule.yaml` are gated on `Values.virtualservice.enabled` and `Values.destinationrule.enabled` respectively, but the runtime code (`k8s_endpoint_resource_delegate.py:_create_or_update_resources`) reads `config.values.launch.istio_enabled` and unconditionally calls `load_k8s_yaml("virtual-service.yaml", …)` / `destination-rule.yaml` whenever that flag is true. When the two flags disagree, the build task crashes with `FileNotFoundError` and the endpoint never reaches READY (SGP reports `deployment_timeout`). Couple the chart gating to the runtime flag — the templates are now emitted when *either* `virtualservice.enabled` / `destinationrule.enabled` *or* `config.values.launch.istio_enabled` is true. Existing operators who explicitly set the chart flags see no change. 3. endpoint_builder_deployment.yaml: `readinessProbe` hardcoded `bash -c 'test -f /tmp/readyz'`. Minimal-base images (e.g. the 798747b… build) no longer ship `bash`, so the probe permanently errors with `executable file not found in $PATH` and the pod stays `0/1` Ready, which times out `helm --wait` at 1200s and stalls the whole HelmRelease. Make the probe overridable via `endpointBuilder.readinessProbe`; default behavior unchanged. Render-verified with `helm template`: - HPA target.type renders as `AverageValue`. - VS / DR templates appear when either flag is true; absent otherwise. - Default readinessProbe still uses `bash`; override via values works.
…ync endpoints on GCP Async inference endpoints fail on GCP clusters with NoCredentialsError because the codebase only supports SQS / ASB / OnPrem queue delegates. This wires a Pub/Sub-based delegate so cloud_provider=gcp can create and manage the queues that async workers consume from. Affects: launch namespace queue resource creation for async deploys.
| def __init__( | ||
| self, | ||
| project_id: str, | ||
| topic_prefix: str = "launch-endpoint-id-", | ||
| subscription_prefix: str = "launch-endpoint-id-", | ||
| ) -> None: | ||
| self.project_id = project_id | ||
| self.topic_prefix = topic_prefix | ||
| self.subscription_prefix = subscription_prefix | ||
|
|
||
| async def create_queue_if_not_exists( | ||
| self, | ||
| endpoint_id: str, | ||
| endpoint_name: str, | ||
| endpoint_created_by: str, | ||
| endpoint_labels: Dict[str, Any], | ||
| queue_message_timeout_seconds: Optional[int] = None, | ||
| ) -> QueueInfo: | ||
| queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) | ||
| topic_path = f"projects/{self.project_id}/topics/{queue_name}" | ||
| subscription_path = f"projects/{self.project_id}/subscriptions/{queue_name}" | ||
| ack_deadline = min(queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS) | ||
|
|
||
| publisher = pubsub_v1.PublisherClient() | ||
| subscriber = pubsub_v1.SubscriberClient() |
There was a problem hiding this comment.
topic_prefix / subscription_prefix are stored but never used
self.topic_prefix and self.subscription_prefix are saved in __init__ but every method that builds a Pub/Sub resource path (create_queue_if_not_exists, delete_queue, get_queue_attributes) calls the static endpoint_id_to_queue_name helper instead, which hardcodes the "launch-endpoint-id-" prefix. Any caller that passes a custom prefix (or any operator who tunes PUBSUB_TOPIC_PREFIX / PUBSUB_SUBSCRIPTION_PREFIX via Helm values) will have those values silently dropped — topic and subscription paths will always use the static default regardless of the configured prefix.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py
Line: 21-45
Comment:
**`topic_prefix` / `subscription_prefix` are stored but never used**
`self.topic_prefix` and `self.subscription_prefix` are saved in `__init__` but every method that builds a Pub/Sub resource path (`create_queue_if_not_exists`, `delete_queue`, `get_queue_attributes`) calls the static `endpoint_id_to_queue_name` helper instead, which hardcodes the `"launch-endpoint-id-"` prefix. Any caller that passes a custom prefix (or any operator who tunes `PUBSUB_TOPIC_PREFIX` / `PUBSUB_SUBSCRIPTION_PREFIX` via Helm values) will have those values silently dropped — topic and subscription paths will always use the static default regardless of the configured prefix.
How can I resolve this? If you propose a fix, please make it concise.| publisher = pubsub_v1.PublisherClient() | ||
| subscriber = pubsub_v1.SubscriberClient() |
There was a problem hiding this comment.
Pub/Sub clients re-instantiated on every call
pubsub_v1.PublisherClient() and pubsub_v1.SubscriberClient() are constructed inside both create_queue_if_not_exists and delete_queue, meaning a new gRPC channel and auth handshake occur on every endpoint lifecycle operation. Both clients are goroutine-safe and designed to be shared; they should be created once in __init__ and stored as self._publisher / self._subscriber.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py
Line: 44-45
Comment:
**Pub/Sub clients re-instantiated on every call**
`pubsub_v1.PublisherClient()` and `pubsub_v1.SubscriberClient()` are constructed inside both `create_queue_if_not_exists` and `delete_queue`, meaning a new gRPC channel and auth handshake occur on every endpoint lifecycle operation. Both clients are goroutine-safe and designed to be shared; they should be created once in `__init__` and stored as `self._publisher` / `self._subscriber`.
How can I resolve this? If you propose a fix, please make it concise.| elif "num_undelivered_messages" in sqs_attributes: # from GcpPubSubQueueEndpointResourceDelegate | ||
| resources.num_queued_items = int(sqs_attributes["num_undelivered_messages"]) |
There was a problem hiding this comment.
num_queued_items = -1 may confuse autoscaler
get_queue_attributes unconditionally returns "num_undelivered_messages": -1 because Pub/Sub doesn't expose a synchronous count. This code then sets resources.num_queued_items = int(-1). If any downstream autoscaling or routing logic uses a comparison that doesn't guard against negative values (e.g. >= 0 to decide whether to scale up), async endpoints on GCP will observe permanently stale or incorrect queue-depth metrics. Consider either omitting the key so the attribute-presence check simply doesn't match, or guarding the assignment with if sqs_attributes["num_undelivered_messages"] >= 0.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py
Line: 104-105
Comment:
**`num_queued_items = -1` may confuse autoscaler**
`get_queue_attributes` unconditionally returns `"num_undelivered_messages": -1` because Pub/Sub doesn't expose a synchronous count. This code then sets `resources.num_queued_items = int(-1)`. If any downstream autoscaling or routing logic uses a comparison that doesn't guard against negative values (e.g. `>= 0` to decide whether to scale up), async endpoints on GCP will observe permanently stale or incorrect queue-depth metrics. Consider either omitting the key so the attribute-presence check simply doesn't match, or guarding the assignment with `if sqs_attributes["num_undelivered_messages"] >= 0`.
How can I resolve this? If you propose a fix, please make it concise.| elif infra_config().cloud_provider == "gcp": | ||
| # GCP uses Redis (Memorystore) for Celery, so use Redis-based queue delegate | ||
| queue_delegate = RedisQueueEndpointResourceDelegate(redis_client=redis_client) | ||
| queue_delegate = GcpPubSubQueueEndpointResourceDelegate( | ||
| project_id=infra_config().gcp_project_id, | ||
| ) |
There was a problem hiding this comment.
gcp_project_id is Optional[str] — None produces silently-invalid resource paths
infra_config().gcp_project_id is typed Optional[str] and defaults to None if not set in the YAML config. Passing None to GcpPubSubQueueEndpointResourceDelegate(project_id=None) won't raise at construction time; it will silently produce paths like projects/None/topics/launch-endpoint-id-<id>, causing every Pub/Sub API call to fail at runtime with a cryptic NotFound error. An explicit guard (if not infra_config().gcp_project_id: raise ...) or an assertion at construction time would surface the misconfiguration early. The same pattern is repeated in k8s_cache.py and start_batch_job_orchestration.py.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/api/dependencies.py
Line: 253-256
Comment:
**`gcp_project_id` is `Optional[str]` — `None` produces silently-invalid resource paths**
`infra_config().gcp_project_id` is typed `Optional[str]` and defaults to `None` if not set in the YAML config. Passing `None` to `GcpPubSubQueueEndpointResourceDelegate(project_id=None)` won't raise at construction time; it will silently produce paths like `projects/None/topics/launch-endpoint-id-<id>`, causing every Pub/Sub API call to fail at runtime with a cryptic `NotFound` error. An explicit guard (`if not infra_config().gcp_project_id: raise ...`) or an assertion at construction time would surface the misconfiguration early. The same pattern is repeated in `k8s_cache.py` and `start_batch_job_orchestration.py`.
How can I resolve this? If you propose a fix, please make it concise.
lilyz-ai
left a comment
There was a problem hiding this comment.
Please address greptile comments and fix ci tests.
Summary
Adds a Google Cloud Pub/Sub-backed implementation of
QueueEndpointResourceDelegateso thatcloud_provider=gcpclusters can deploy async inference endpoints.Before this PR, async deploys on GCP fail at
live_endpoint_resource_gateway.create_queuebecause the only wired delegate is SQS (AWS), which raisesNoCredentialsErroron a GCP node. We see this consistently ongke_scale-dev-mofa_asia-northeast3_sgp-pmodev-kubernetes-cluster.Repro
Temporal workflow IDs from a recent attempt on the GCP cluster:
1020f4b2-e55f-4b6e-acf3-7c0090a4c5c2(qwen3-e-4b-7, sync — fails at workflow create_activity, separate bug)9f86bd97-4804-4945-b2e1-29e0dd2c794a(qwen3-e-4b-9-async, async — fails atsqs_queue_endpoint_resource_delegate.create_queue_if_not_existswithNoCredentialsError: Unable to locate credentials)Changes
infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.pymirrors the ASB shape. Creates a topic + pull subscription per endpoint, idempotent (catchesAlreadyExists/NotFound).api/dependencies.py,entrypoints/start_batch_job_orchestration.py,entrypoints/k8s_cache.py.elif "num_undelivered_messages" in sqs_attributesbranch inlive_endpoint_resource_gateway.pyso the gateway recognizes the new attribute shape.google-cloud-pubsubtorequirements.in.gcp.project_id,gcp.pubsub_topic_prefix,gcp.pubsub_subscription_prefixand pipes them as env vars to the deployments that build/manage queues.Out of scope (deliberately)
serviceAccount.annotationsGCP SA withroles/pubsub.editoron the target project.num_undelivered_messages: Pub/Sub doesn't expose an undelivered-count attribute synchronously. The delegate currently returns-1— wiring Cloud Monitoring'spubsub.googleapis.com/subscription/num_undelivered_messagesmetric belongs in a follow-up.Test plan
python -m py_compileon every Python file touchedhelm template charts/model-enginerenders without new errorsVerification commands
Greptile Summary
This PR adds
GcpPubSubQueueEndpointResourceDelegate, a Pub/Sub-backed implementation ofQueueEndpointResourceDelegate, and wires it into the three factory sites that select the queue delegate based oncloud_provider. It also extends the Helm chart with GCP-specific env vars and adds a new attribute-shape branch inLiveEndpointResourceGateway.topic_prefixandsubscription_prefixare accepted in__init__and stored as instance attributes, but every method constructs Pub/Sub resource paths via the staticendpoint_id_to_queue_namehelper which hardcodes the prefix. Custom prefixes (including thePUBSUB_TOPIC_PREFIX/PUBSUB_SUBSCRIPTION_PREFIXenv vars injected by Helm) are silently ignored.num_queued_items = -1:get_queue_attributesalways returns-1fornum_undelivered_messages; the gateway then setsresources.num_queued_items = -1, which may produce unexpected behaviour in autoscaling consumers that don't guard against negative values.Confidence Score: 3/5
Mergeable with caveats — the P1 prefix bug means custom PUBSUB_TOPIC_PREFIX/PUBSUB_SUBSCRIPTION_PREFIX Helm values are silently ignored, but since the hardcoded default matches the chart defaults the immediate impact is limited.
One P1 (dead prefix parameters that cause Helm config knobs to be silently no-ops) and two P2s (client re-instantiation, -1 queue depth). The P1 ceiling is 4, pulled down to 3 because the bug also makes the exposed Helm configuration surface misleading.
gcp_pubsub_queue_endpoint_resource_delegate.py (dead prefix params, per-call client construction) and live_endpoint_resource_gateway.py (num_queued_items = -1 propagation)
Important Files Changed
topic_prefix/subscription_prefixparams are stored but never applied (P1); clients re-created per call (P2)num_queued_items = -1which may confuse downstream autoscalinggcp_project_idOptional[str] may silently pass None to delegategoogle-cloud-pubsub>=2.18; loose lower-bound pin is appropriate given existing GCP depsPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "feat(model-engine): add GcpPubSubQueueEn..." | Re-trigger Greptile