Skip to content

feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827

Open
postevanus-scale wants to merge 2 commits into
mainfrom
feat/gcp-pubsub-queue-delegate
Open

feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827
postevanus-scale wants to merge 2 commits into
mainfrom
feat/gcp-pubsub-queue-delegate

Conversation

@postevanus-scale
Copy link
Copy Markdown
Collaborator

@postevanus-scale postevanus-scale commented May 11, 2026

Summary

Adds a Google Cloud Pub/Sub-backed implementation of QueueEndpointResourceDelegate so that cloud_provider=gcp clusters can deploy async inference endpoints.

Before this PR, async deploys on GCP fail at live_endpoint_resource_gateway.create_queue because the only wired delegate is SQS (AWS), which raises NoCredentialsError on a GCP node. We see this consistently on gke_scale-dev-mofa_asia-northeast3_sgp-pmodev-kubernetes-cluster.

Repro

Temporal workflow IDs from a recent attempt on the GCP cluster:

  • 1020f4b2-e55f-4b6e-acf3-7c0090a4c5c2 (qwen3-e-4b-7, sync — fails at workflow create_activity, separate bug)
  • 9f86bd97-4804-4945-b2e1-29e0dd2c794a (qwen3-e-4b-9-async, async — fails at sqs_queue_endpoint_resource_delegate.create_queue_if_not_exists with NoCredentialsError: Unable to locate credentials)

Changes

  • New: infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py mirrors the ASB shape. Creates a topic + pull subscription per endpoint, idempotent (catches AlreadyExists/NotFound).
  • Wires the new delegate into the 3 factory sites: api/dependencies.py, entrypoints/start_batch_job_orchestration.py, entrypoints/k8s_cache.py.
  • Adds an elif "num_undelivered_messages" in sqs_attributes branch in live_endpoint_resource_gateway.py so the gateway recognizes the new attribute shape.
  • Adds google-cloud-pubsub to requirements.in.
  • Unit tests mirror the ASB suite.
  • Helm chart: surfaces gcp.project_id, gcp.pubsub_topic_prefix, gcp.pubsub_subscription_prefix and pipes them as env vars to the deployments that build/manage queues.

Out of scope (deliberately)

  • Cloud Tasks as an alternative: Pub/Sub picked here because it's the closest semantic match to SQS/ASB (queue with pull subscription). The launch team may prefer Cloud Tasks for at-least-once HTTP push delivery — happy to switch direction.
  • IAM / Workload Identity wiring: deployer-side. Bind the chart's serviceAccount.annotations GCP SA with roles/pubsub.editor on the target project.
  • Observability for num_undelivered_messages: Pub/Sub doesn't expose an undelivered-count attribute synchronously. The delegate currently returns -1 — wiring Cloud Monitoring's pubsub.googleapis.com/subscription/num_undelivered_messages metric belongs in a follow-up.
  • Migration of existing endpoints: this PR only affects newly-created async endpoints. Existing failed-state SGP records aren't backfilled.
  • Benchmarks: no perf comparison yet against SQS/ASB. The Pub/Sub path is API-symmetric so behavior should be similar; will measure once a real deploy lands.

Test plan

  • Unit tests for the new delegate
  • python -m py_compile on every Python file touched
  • helm template charts/model-engine renders without new errors
  • Manual integration test on GCP cluster (deferred — needs IAM wiring on the cluster side first)

Verification commands

# Verify factory wiring import-resolves
python -c "from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import GcpPubSubQueueEndpointResourceDelegate"

# Verify chart still renders
helm template charts/model-engine -f charts/model-engine/values_sample.yaml > /dev/null

Greptile Summary

This PR adds GcpPubSubQueueEndpointResourceDelegate, a Pub/Sub-backed implementation of QueueEndpointResourceDelegate, and wires it into the three factory sites that select the queue delegate based on cloud_provider. It also extends the Helm chart with GCP-specific env vars and adds a new attribute-shape branch in LiveEndpointResourceGateway.

  • P1 — dead prefix parameters: topic_prefix and subscription_prefix are accepted in __init__ and stored as instance attributes, but every method constructs Pub/Sub resource paths via the static endpoint_id_to_queue_name helper which hardcodes the prefix. Custom prefixes (including the PUBSUB_TOPIC_PREFIX/PUBSUB_SUBSCRIPTION_PREFIX env vars injected by Helm) are silently ignored.
  • P2 — num_queued_items = -1: get_queue_attributes always returns -1 for num_undelivered_messages; the gateway then sets resources.num_queued_items = -1, which may produce unexpected behaviour in autoscaling consumers that don't guard against negative values.

Confidence Score: 3/5

Mergeable with caveats — the P1 prefix bug means custom PUBSUB_TOPIC_PREFIX/PUBSUB_SUBSCRIPTION_PREFIX Helm values are silently ignored, but since the hardcoded default matches the chart defaults the immediate impact is limited.

One P1 (dead prefix parameters that cause Helm config knobs to be silently no-ops) and two P2s (client re-instantiation, -1 queue depth). The P1 ceiling is 4, pulled down to 3 because the bug also makes the exposed Helm configuration surface misleading.

gcp_pubsub_queue_endpoint_resource_delegate.py (dead prefix params, per-call client construction) and live_endpoint_resource_gateway.py (num_queued_items = -1 propagation)

Important Files Changed

Filename Overview
model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py New Pub/Sub delegate: topic_prefix/subscription_prefix params are stored but never applied (P1); clients re-created per call (P2)
model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py New branch for GCP queue attributes; sets num_queued_items = -1 which may confuse downstream autoscaling
model-engine/model_engine_server/api/dependencies.py GCP delegate wired correctly; gcp_project_id Optional[str] may silently pass None to delegate
charts/model-engine/templates/_helpers.tpl Injects GCP_PROJECT_ID, PUBSUB_TOPIC_PREFIX, PUBSUB_SUBSCRIPTION_PREFIX env vars; prefix vars are never consumed by Python code
model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py Good coverage of happy-path and AlreadyExists/NotFound error cases; doesn't test custom prefix behavior (which is broken)
model-engine/requirements.in Adds google-cloud-pubsub>=2.18; loose lower-bound pin is appropriate given existing GCP deps

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 4 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 4
model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py:21-45
**`topic_prefix` / `subscription_prefix` are stored but never used**

`self.topic_prefix` and `self.subscription_prefix` are saved in `__init__` but every method that builds a Pub/Sub resource path (`create_queue_if_not_exists`, `delete_queue`, `get_queue_attributes`) calls the static `endpoint_id_to_queue_name` helper instead, which hardcodes the `"launch-endpoint-id-"` prefix. Any caller that passes a custom prefix (or any operator who tunes `PUBSUB_TOPIC_PREFIX` / `PUBSUB_SUBSCRIPTION_PREFIX` via Helm values) will have those values silently dropped — topic and subscription paths will always use the static default regardless of the configured prefix.

### Issue 2 of 4
model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py:44-45
**Pub/Sub clients re-instantiated on every call**

`pubsub_v1.PublisherClient()` and `pubsub_v1.SubscriberClient()` are constructed inside both `create_queue_if_not_exists` and `delete_queue`, meaning a new gRPC channel and auth handshake occur on every endpoint lifecycle operation. Both clients are goroutine-safe and designed to be shared; they should be created once in `__init__` and stored as `self._publisher` / `self._subscriber`.

### Issue 3 of 4
model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py:104-105
**`num_queued_items = -1` may confuse autoscaler**

`get_queue_attributes` unconditionally returns `"num_undelivered_messages": -1` because Pub/Sub doesn't expose a synchronous count. This code then sets `resources.num_queued_items = int(-1)`. If any downstream autoscaling or routing logic uses a comparison that doesn't guard against negative values (e.g. `>= 0` to decide whether to scale up), async endpoints on GCP will observe permanently stale or incorrect queue-depth metrics. Consider either omitting the key so the attribute-presence check simply doesn't match, or guarding the assignment with `if sqs_attributes["num_undelivered_messages"] >= 0`.

### Issue 4 of 4
model-engine/model_engine_server/api/dependencies.py:253-256
**`gcp_project_id` is `Optional[str]``None` produces silently-invalid resource paths**

`infra_config().gcp_project_id` is typed `Optional[str]` and defaults to `None` if not set in the YAML config. Passing `None` to `GcpPubSubQueueEndpointResourceDelegate(project_id=None)` won't raise at construction time; it will silently produce paths like `projects/None/topics/launch-endpoint-id-<id>`, causing every Pub/Sub API call to fail at runtime with a cryptic `NotFound` error. An explicit guard (`if not infra_config().gcp_project_id: raise ...`) or an assertion at construction time would surface the misconfiguration early. The same pattern is repeated in `k8s_cache.py` and `start_batch_job_orchestration.py`.

Reviews (1): Last reviewed commit: "feat(model-engine): add GcpPubSubQueueEn..." | Re-trigger Greptile

Greptile also left 4 inline comments on this PR.

Three small fixes uncovered while diagnosing a stuck endpoint build on
GKE running model-engine 798747b…:

1. service_template_config_map.yaml: HPA `type: Pods` was paired with
   `target.type: Value`, which is invalid for HPA v2. Kubernetes rejects
   the metric (status: `InvalidMetricSourceType`) and falls back to
   `minReplicas`. Switch to the correct `AverageValue`.

2. service_template_config_map.yaml: `virtual-service.yaml` and
   `destination-rule.yaml` are gated on `Values.virtualservice.enabled`
   and `Values.destinationrule.enabled` respectively, but the runtime
   code (`k8s_endpoint_resource_delegate.py:_create_or_update_resources`)
   reads `config.values.launch.istio_enabled` and unconditionally calls
   `load_k8s_yaml("virtual-service.yaml", …)` / `destination-rule.yaml`
   whenever that flag is true. When the two flags disagree, the build
   task crashes with `FileNotFoundError` and the endpoint never reaches
   READY (SGP reports `deployment_timeout`).

   Couple the chart gating to the runtime flag — the templates are now
   emitted when *either* `virtualservice.enabled` / `destinationrule.enabled`
   *or* `config.values.launch.istio_enabled` is true. Existing operators
   who explicitly set the chart flags see no change.

3. endpoint_builder_deployment.yaml: `readinessProbe` hardcoded
   `bash -c 'test -f /tmp/readyz'`. Minimal-base images (e.g. the
   798747b… build) no longer ship `bash`, so the probe permanently
   errors with `executable file not found in $PATH` and the pod stays
   `0/1` Ready, which times out `helm --wait` at 1200s and stalls the
   whole HelmRelease. Make the probe overridable via
   `endpointBuilder.readinessProbe`; default behavior unchanged.

Render-verified with `helm template`:
- HPA target.type renders as `AverageValue`.
- VS / DR templates appear when either flag is true; absent otherwise.
- Default readinessProbe still uses `bash`; override via values works.
…ync endpoints on GCP

Async inference endpoints fail on GCP clusters with NoCredentialsError
because the codebase only supports SQS / ASB / OnPrem queue delegates.
This wires a Pub/Sub-based delegate so cloud_provider=gcp can create and
manage the queues that async workers consume from.

Affects: launch namespace queue resource creation for async deploys.
@postevanus-scale postevanus-scale requested a review from a team May 11, 2026 19:06
@postevanus-scale postevanus-scale marked this pull request as ready for review May 11, 2026 19:06
Comment on lines +21 to +45
def __init__(
self,
project_id: str,
topic_prefix: str = "launch-endpoint-id-",
subscription_prefix: str = "launch-endpoint-id-",
) -> None:
self.project_id = project_id
self.topic_prefix = topic_prefix
self.subscription_prefix = subscription_prefix

async def create_queue_if_not_exists(
self,
endpoint_id: str,
endpoint_name: str,
endpoint_created_by: str,
endpoint_labels: Dict[str, Any],
queue_message_timeout_seconds: Optional[int] = None,
) -> QueueInfo:
queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id)
topic_path = f"projects/{self.project_id}/topics/{queue_name}"
subscription_path = f"projects/{self.project_id}/subscriptions/{queue_name}"
ack_deadline = min(queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS)

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 topic_prefix / subscription_prefix are stored but never used

self.topic_prefix and self.subscription_prefix are saved in __init__ but every method that builds a Pub/Sub resource path (create_queue_if_not_exists, delete_queue, get_queue_attributes) calls the static endpoint_id_to_queue_name helper instead, which hardcodes the "launch-endpoint-id-" prefix. Any caller that passes a custom prefix (or any operator who tunes PUBSUB_TOPIC_PREFIX / PUBSUB_SUBSCRIPTION_PREFIX via Helm values) will have those values silently dropped — topic and subscription paths will always use the static default regardless of the configured prefix.

Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py
Line: 21-45

Comment:
**`topic_prefix` / `subscription_prefix` are stored but never used**

`self.topic_prefix` and `self.subscription_prefix` are saved in `__init__` but every method that builds a Pub/Sub resource path (`create_queue_if_not_exists`, `delete_queue`, `get_queue_attributes`) calls the static `endpoint_id_to_queue_name` helper instead, which hardcodes the `"launch-endpoint-id-"` prefix. Any caller that passes a custom prefix (or any operator who tunes `PUBSUB_TOPIC_PREFIX` / `PUBSUB_SUBSCRIPTION_PREFIX` via Helm values) will have those values silently dropped — topic and subscription paths will always use the static default regardless of the configured prefix.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Comment on lines +44 to +45
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Pub/Sub clients re-instantiated on every call

pubsub_v1.PublisherClient() and pubsub_v1.SubscriberClient() are constructed inside both create_queue_if_not_exists and delete_queue, meaning a new gRPC channel and auth handshake occur on every endpoint lifecycle operation. Both clients are goroutine-safe and designed to be shared; they should be created once in __init__ and stored as self._publisher / self._subscriber.

Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py
Line: 44-45

Comment:
**Pub/Sub clients re-instantiated on every call**

`pubsub_v1.PublisherClient()` and `pubsub_v1.SubscriberClient()` are constructed inside both `create_queue_if_not_exists` and `delete_queue`, meaning a new gRPC channel and auth handshake occur on every endpoint lifecycle operation. Both clients are goroutine-safe and designed to be shared; they should be created once in `__init__` and stored as `self._publisher` / `self._subscriber`.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Comment on lines +104 to +105
elif "num_undelivered_messages" in sqs_attributes: # from GcpPubSubQueueEndpointResourceDelegate
resources.num_queued_items = int(sqs_attributes["num_undelivered_messages"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 num_queued_items = -1 may confuse autoscaler

get_queue_attributes unconditionally returns "num_undelivered_messages": -1 because Pub/Sub doesn't expose a synchronous count. This code then sets resources.num_queued_items = int(-1). If any downstream autoscaling or routing logic uses a comparison that doesn't guard against negative values (e.g. >= 0 to decide whether to scale up), async endpoints on GCP will observe permanently stale or incorrect queue-depth metrics. Consider either omitting the key so the attribute-presence check simply doesn't match, or guarding the assignment with if sqs_attributes["num_undelivered_messages"] >= 0.

Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py
Line: 104-105

Comment:
**`num_queued_items = -1` may confuse autoscaler**

`get_queue_attributes` unconditionally returns `"num_undelivered_messages": -1` because Pub/Sub doesn't expose a synchronous count. This code then sets `resources.num_queued_items = int(-1)`. If any downstream autoscaling or routing logic uses a comparison that doesn't guard against negative values (e.g. `>= 0` to decide whether to scale up), async endpoints on GCP will observe permanently stale or incorrect queue-depth metrics. Consider either omitting the key so the attribute-presence check simply doesn't match, or guarding the assignment with `if sqs_attributes["num_undelivered_messages"] >= 0`.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Comment on lines 253 to +256
elif infra_config().cloud_provider == "gcp":
# GCP uses Redis (Memorystore) for Celery, so use Redis-based queue delegate
queue_delegate = RedisQueueEndpointResourceDelegate(redis_client=redis_client)
queue_delegate = GcpPubSubQueueEndpointResourceDelegate(
project_id=infra_config().gcp_project_id,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 gcp_project_id is Optional[str]None produces silently-invalid resource paths

infra_config().gcp_project_id is typed Optional[str] and defaults to None if not set in the YAML config. Passing None to GcpPubSubQueueEndpointResourceDelegate(project_id=None) won't raise at construction time; it will silently produce paths like projects/None/topics/launch-endpoint-id-<id>, causing every Pub/Sub API call to fail at runtime with a cryptic NotFound error. An explicit guard (if not infra_config().gcp_project_id: raise ...) or an assertion at construction time would surface the misconfiguration early. The same pattern is repeated in k8s_cache.py and start_batch_job_orchestration.py.

Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/api/dependencies.py
Line: 253-256

Comment:
**`gcp_project_id` is `Optional[str]``None` produces silently-invalid resource paths**

`infra_config().gcp_project_id` is typed `Optional[str]` and defaults to `None` if not set in the YAML config. Passing `None` to `GcpPubSubQueueEndpointResourceDelegate(project_id=None)` won't raise at construction time; it will silently produce paths like `projects/None/topics/launch-endpoint-id-<id>`, causing every Pub/Sub API call to fail at runtime with a cryptic `NotFound` error. An explicit guard (`if not infra_config().gcp_project_id: raise ...`) or an assertion at construction time would surface the misconfiguration early. The same pattern is repeated in `k8s_cache.py` and `start_batch_job_orchestration.py`.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Copy link
Copy Markdown
Collaborator

@lilyz-ai lilyz-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address greptile comments and fix ci tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants