-
Notifications
You must be signed in to change notification settings - Fork 74
feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP #827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5ad5016
e5d3050
5afce44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,6 +89,9 @@ | |
| from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( | ||
| ASBQueueEndpointResourceDelegate, | ||
| ) | ||
| from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( | ||
| GcpPubSubQueueEndpointResourceDelegate, | ||
| ) | ||
| from model_engine_server.infra.gateways.resources.endpoint_resource_gateway import ( | ||
| EndpointResourceGateway, | ||
| ) | ||
|
|
@@ -248,8 +251,9 @@ def _get_external_interfaces( | |
| elif infra_config().cloud_provider == "azure": | ||
| queue_delegate = ASBQueueEndpointResourceDelegate() | ||
| elif infra_config().cloud_provider == "gcp": | ||
| # GCP uses Redis (Memorystore) for Celery, so use Redis-based queue delegate | ||
| queue_delegate = RedisQueueEndpointResourceDelegate(redis_client=redis_client) | ||
| queue_delegate = GcpPubSubQueueEndpointResourceDelegate( | ||
| project_id=infra_config().gcp_project_id, | ||
| ) | ||
|
Comment on lines
253
to
+256
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Prompt To Fix With AIThis is a comment left during a code review.
Path: model-engine/model_engine_server/api/dependencies.py
Line: 253-256
Comment:
**`gcp_project_id` is `Optional[str]` — `None` produces silently-invalid resource paths**
`infra_config().gcp_project_id` is typed `Optional[str]` and defaults to `None` if not set in the YAML config. Passing `None` to `GcpPubSubQueueEndpointResourceDelegate(project_id=None)` won't raise at construction time; it will silently produce paths like `projects/None/topics/launch-endpoint-id-<id>`, causing every Pub/Sub API call to fail at runtime with a cryptic `NotFound` error. An explicit guard (`if not infra_config().gcp_project_id: raise ...`) or an assertion at construction time would surface the misconfiguration early. The same pattern is repeated in `k8s_cache.py` and `start_batch_job_orchestration.py`.
How can I resolve this? If you propose a fix, please make it concise. |
||
| else: | ||
| queue_delegate = SQSQueueEndpointResourceDelegate( | ||
| sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| from typing import Any, Dict, Optional | ||
|
|
||
| from google.api_core import exceptions as gcp_exceptions | ||
| from google.cloud import pubsub_v1 | ||
| from google.protobuf import field_mask_pb2 | ||
| from model_engine_server.core.loggers import logger_name, make_logger | ||
| from model_engine_server.domain.exceptions import EndpointResourceInfraException | ||
| from model_engine_server.infra.gateways.resources.queue_endpoint_resource_delegate import ( | ||
| QueueEndpointResourceDelegate, | ||
| QueueInfo, | ||
| ) | ||
|
|
||
| logger = make_logger(logger_name()) | ||
|
|
||
| GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS = 600 # Pub/Sub hard limit | ||
|
|
||
|
|
||
| class GcpPubSubQueueEndpointResourceDelegate(QueueEndpointResourceDelegate): | ||
| """ | ||
| Using GCP Pub/Sub (topic + subscription per endpoint). | ||
|
|
||
| topic_prefix and subscription_prefix control the GCP resource name prefix. | ||
| The logical queue_name returned to callers always uses the canonical | ||
| QueueEndpointResourceDelegate.endpoint_id_to_queue_name format, independent | ||
| of these prefixes. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| project_id: str, | ||
| topic_prefix: str = "launch-endpoint-id-", | ||
| subscription_prefix: str = "launch-endpoint-id-", | ||
| ) -> None: | ||
| if not project_id: | ||
| raise ValueError( | ||
| "GcpPubSubQueueEndpointResourceDelegate requires a non-empty project_id; " | ||
| "set infra.gcp_project_id in the service config." | ||
| ) | ||
| self.project_id = project_id | ||
| self.topic_prefix = topic_prefix | ||
| self.subscription_prefix = subscription_prefix | ||
| self._publisher = pubsub_v1.PublisherClient() | ||
| self._subscriber = pubsub_v1.SubscriberClient() | ||
|
|
||
| def _topic_id(self, endpoint_id: str) -> str: | ||
| return f"{self.topic_prefix}{endpoint_id}" | ||
|
|
||
| def _subscription_id(self, endpoint_id: str) -> str: | ||
| return f"{self.subscription_prefix}{endpoint_id}" | ||
|
|
||
| async def create_queue_if_not_exists( | ||
| self, | ||
| endpoint_id: str, | ||
| endpoint_name: str, | ||
| endpoint_created_by: str, | ||
| endpoint_labels: Dict[str, Any], | ||
| queue_message_timeout_seconds: Optional[int] = None, | ||
| ) -> QueueInfo: | ||
| queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name( | ||
| endpoint_id | ||
| ) | ||
| topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" | ||
| subscription_path = f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" | ||
| ack_deadline = min( | ||
| queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS | ||
| ) | ||
|
|
||
| try: | ||
| self._publisher.create_topic(name=topic_path) | ||
| except gcp_exceptions.AlreadyExists: | ||
| pass | ||
|
|
||
| try: | ||
| self._subscriber.create_subscription( | ||
| name=subscription_path, | ||
| topic=topic_path, | ||
| ack_deadline_seconds=ack_deadline, | ||
| ) | ||
| except gcp_exceptions.AlreadyExists: | ||
| try: | ||
| self._subscriber.update_subscription( | ||
| subscription=pubsub_v1.types.Subscription( | ||
| name=subscription_path, | ||
| ack_deadline_seconds=ack_deadline, | ||
| ), | ||
| update_mask=field_mask_pb2.FieldMask( | ||
| paths=["ack_deadline_seconds"] | ||
| ), | ||
| ) | ||
| except gcp_exceptions.GoogleAPIError as e: | ||
| logger.warning( | ||
| f"Failed to update ack_deadline for Pub/Sub subscription {subscription_path}: {e}" | ||
| ) | ||
|
|
||
| # Pub/Sub has no URL concept analogous to SQS queue URLs | ||
| return QueueInfo(queue_name, queue_url=None) | ||
|
|
||
| async def delete_queue(self, endpoint_id: str) -> None: | ||
| subscription_path = f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" | ||
| topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" | ||
|
|
||
| try: | ||
| self._subscriber.delete_subscription(subscription=subscription_path) | ||
| except gcp_exceptions.NotFound: | ||
| logger.info( | ||
| f"Could not find Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}" | ||
| ) | ||
| except gcp_exceptions.GoogleAPIError as e: | ||
| raise EndpointResourceInfraException( | ||
| f"Failed to delete Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}: {e}" | ||
| ) from e | ||
|
|
||
| try: | ||
| self._publisher.delete_topic(topic=topic_path) | ||
| except gcp_exceptions.NotFound: | ||
| logger.info( | ||
| f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}" | ||
| ) | ||
| except gcp_exceptions.GoogleAPIError as e: | ||
| raise EndpointResourceInfraException( | ||
| f"Failed to delete Pub/Sub topic {topic_path} for endpoint {endpoint_id}: {e}" | ||
| ) from e | ||
|
|
||
| async def get_queue_attributes(self, endpoint_id: str) -> Dict[str, Any]: | ||
| queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name( | ||
| endpoint_id | ||
| ) | ||
| return { | ||
| "name": queue_name, | ||
| # Pub/Sub does not expose a synchronous undelivered message count; | ||
| # real observability requires the Cloud Monitoring API as a separate concern. | ||
| "num_undelivered_messages": -1, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GCP_PROJECT_IDenv var is dead config — delegate will always raiseValueErroron GCP clustersThe Helm chart injects
GCP_PROJECT_IDfrom.Values.gcp.project_id, but the delegate is instantiated withproject_id=infra_config().gcp_project_id, which is loaded from theinfra_service_configYAML ConfigMap. That ConfigMap is rendered from.Values.config.values.infrainservice_config_map.yaml— not from.Values.gcp. Since the newgcp.project_idkey is in a separate Helm section andgcp_project_idis never injected intoconfig.values.infra,infra_config().gcp_project_idwill always beNoneat runtime. The delegate's ownif not project_id: raise ValueError(...)guard will fire on every startup on any GCP cluster that follows the sample values. The same broken path is repeated ink8s_cache.pyandstart_batch_job_orchestration.py. Either reados.getenv("GCP_PROJECT_ID")directly in the delegate (consistent with howASBQueueEndpointResourceDelegatereadsos.getenv("SERVICEBUS_NAMESPACE")), or addgcp_project_idtoconfig.values.infrain the chart.Prompt To Fix With AI