proposal: temporal endpoint type for Temporal activity workers (MLI-6425)#815
proposal: temporal endpoint type for Temporal activity workers (MLI-6425)#815
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
RFC for natively managing Temporal activity workers via Launch, with phased implementation plan (fixed-replica MVP + KEDA autoscaling). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| Recommendation: KEDA Temporal trigger. Annotation format: | ||
| ```yaml | ||
| temporal.keda.sh/task-queue: "${TEMPORAL_TASK_QUEUE}" | ||
| temporal.keda.sh/namespace: "default" | ||
| temporal.keda.sh/targetQueueSize: "${PER_WORKER}" | ||
| ``` |
There was a problem hiding this comment.
KEDA annotation format does not match KEDA's actual Temporal scaler API
The temporal.keda.sh/ annotation keys used here aren't part of KEDA's Temporal scaler — KEDA uses a ScaledObject CRD with spec.triggers[].type: temporal and structured metadata, not pod annotations. Using the wrong format in Phase 2 implementation will silently produce a non-functional autoscaler.
Correct KEDA Temporal trigger format (from keda.sh docs):
triggers:
- type: temporal
metadata:
namespace: "default"
taskQueue: "${TEMPORAL_TASK_QUEUE}"
targetQueueSize: "${PER_WORKER}"
endpoint: "${TEMPORAL_SERVER_HOSTNAME}:${TEMPORAL_SERVER_PORT}"
queueTypes: activityThis needs to be a separate ScaledObject resource (not a Deployment annotation), managed alongside the Deployment template.
Prompt To Fix With AI
This is a comment left during a code review.
Path: docs/temporal-endpoint-type-proposal.md
Line: 132-137
Comment:
**KEDA annotation format does not match KEDA's actual Temporal scaler API**
The `temporal.keda.sh/` annotation keys used here aren't part of KEDA's Temporal scaler — KEDA uses a `ScaledObject` CRD with `spec.triggers[].type: temporal` and structured metadata, not pod annotations. Using the wrong format in Phase 2 implementation will silently produce a non-functional autoscaler.
Correct KEDA Temporal trigger format (from [keda.sh docs](https://keda.sh/docs/2.19/scalers/temporal/)):
```yaml
triggers:
- type: temporal
metadata:
namespace: "default"
taskQueue: "${TEMPORAL_TASK_QUEUE}"
targetQueueSize: "${PER_WORKER}"
endpoint: "${TEMPORAL_SERVER_HOSTNAME}:${TEMPORAL_SERVER_PORT}"
queueTypes: activity
```
This needs to be a separate `ScaledObject` resource (not a Deployment annotation), managed alongside the Deployment template.
How can I resolve this? If you propose a fix, please make it concise.Adds TEMPORAL as a fourth endpoint type in Launch so teams can run Temporal activity worker pods through the standard Launch API instead of maintaining custom raw K8s Deployments. - domain/entities: add TEMPORAL to ModelEndpointType enum - domain/entities: add temporal_task_queue field to ModelEndpointRecord - common/dtos: add temporal_task_queue to Create/Update request DTOs with validator requiring it for temporal endpoints - domain/use_cases: allow min_workers=0 for temporal (like async) - domain/services + infra/services: thread temporal_task_queue through create/update endpoint call chain - db/models: add temporal_task_queue column to ORM model - db/migrations: alembic migration to add the column - infra/repositories: persist and read temporal_task_queue - k8s_resource_types: add DeploymentRunnableImageTemporalCpuArguments and DeploymentRunnableImageTemporalGpuArguments TypedDicts; add temporal branch in get_endpoint_resource_arguments_from_request - k8s_endpoint_resource_delegate: add temporal resource name handling in _create_or_update_resources, _get_resources_from_deployment_type, _get_temporal_autoscaling_params (reads annotations); skip SQS for temporal in delete_resources - live_endpoint_resource_gateway: skip SQS queue deletion for temporal - service_template_config_map_circleci.yaml: add cpu+gpu temporal deployment templates (fixed replicas=MAX_WORKERS, no forwarder sidecar, temporal annotations) - service_template_config_map.yaml (Helm): same templates for prod MVP uses fixed replicas = max_workers; KEDA autoscaling is phase 2. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The temporal deployment template used `$security_context | nindent`
directly on a map value, causing a Helm type error. Use the same
`{{- with $security_context }} securityContext: {{- toYaml . | nindent }}
{{- end }}` pattern as the other endpoint type templates.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- temporal-endpoint-type-proposal.md: update status to Implemented, document actual design decisions (annotations, no readiness probe, no forwarder sidecar, fixed replicas MVP), resolve open questions - temporal-endpoint-e2e-test.md: step-by-step test procedure for ml-serving-new including image build, configmap patch, DB migration, endpoint creation, and K8s Deployment verification - patch_temporal_configmap.py: runnable script to patch production service-template ConfigMap with temporal templates during e2e testing - e2e test results from 2026-04-25: all checks passed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| ge=1, | ||
| description="For async endpoints, how long a task can wait in queue before expiring (in seconds). Default: 86400 (24 hours).", | ||
| ) | ||
| temporal_task_queue: Optional[str] = Field( | ||
| default=None, |
There was a problem hiding this comment.
Missing
temporal_task_queue validation on update
CreateModelEndpointV1Request validates that temporal_task_queue is non-empty for temporal endpoints, but UpdateModelEndpointV1Request has no equivalent check. Because dict_not_none only filters None (not ""), a client that sends temporal_task_queue="" on an update will silently overwrite the stored task queue with an empty string, causing the next deployment to inject TEMPORAL_TASK_QUEUE="" into workers and making them poll no queue.
@model_validator(mode="after")
def validate_temporal_task_queue(self) -> "UpdateModelEndpointV1Request":
if self.temporal_task_queue is not None and not self.temporal_task_queue:
raise ValueError("temporal_task_queue must be a non-empty string if provided")
return selfPrompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/common/dtos/model_endpoints.py
Line: 132-136
Comment:
**Missing `temporal_task_queue` validation on update**
`CreateModelEndpointV1Request` validates that `temporal_task_queue` is non-empty for temporal endpoints, but `UpdateModelEndpointV1Request` has no equivalent check. Because `dict_not_none` only filters `None` (not `""`), a client that sends `temporal_task_queue=""` on an update will silently overwrite the stored task queue with an empty string, causing the next deployment to inject `TEMPORAL_TASK_QUEUE=""` into workers and making them poll no queue.
```python
@model_validator(mode="after")
def validate_temporal_task_queue(self) -> "UpdateModelEndpointV1Request":
if self.temporal_task_queue is not None and not self.temporal_task_queue:
raise ValueError("temporal_task_queue must be a non-empty string if provided")
return self
```
How can I resolve this? If you propose a fix, please make it concise.
Summary
RFC for adding
temporalas a fourth endpoint type in Launch, enabling teams to manage Temporal activity worker pods through the standard Launch API instead of maintaining custom K8s Deployments.endpoint_type: temporal+temporal_task_queuefield to the endpoint APISee
docs/temporal-endpoint-type-proposal.mdfor full design, API changes, implementation plan, and alternatives considered.Motivation: robotics ego pipeline (SVO → Dyn-HaMR → hand annotations) needs durable multi-step orchestration across T4/H100 workers. Temporal gives crash recovery per-phase; this PR proposes Launch-native worker management so teams don't bypass Launch to use it.
Open questions for reviewers
temporal_namespace— configurable field or always default?Test plan
🤖 Generated with Claude Code
Greptile Summary
This PR proposes a
temporalendpoint type for Launch, enabling teams to deploy Temporal activity worker pods via the standard Launch API rather than maintaining custom K8s Deployments. The implementation covers the full stack: a newModelEndpointType.TEMPORALenum value, atemporal_task_queueDB column + Alembic migration, Helm/CircleCI deployment templates, and wiring through the service/repository/use-case layers.Confidence Score: 5/5
Safe to merge; all remaining findings are P2 style/cleanup suggestions with no correctness impact.
The core data-path changes (DB migration, entity, DTOs, repositories, use-cases, K8s templates) are structurally sound. The two remaining comments are minor style issues: a non-hex migration revision ID and an unnecessary autoscaling-metrics call for temporal endpoints. Both are harmless in production.
No files require special attention beyond the two P2 comments.
Important Files Changed
temporal_task_queuefield to create/update request DTOs; create-side validator enforces non-empty for temporal endpoints, but update-side lacks equivalent validation (already flagged in previous review)temporal_task_queuecolumn toendpointstable;down_revisioncorrectly chains toa1b2c3d4e5f6, but the revision IDb2c3d4e5f6g7is non-hexadecimal (containsg), breaking the hex convention used by all other migrations_TemporalDeploymentArguments,_TemporalRunnableImageDeploymentArguments, and the CPU/GPU TypedDict classes; template argument construction correctly injectsTEMPORAL_TASK_QUEUE, hostname, and port into the env var listinference_autoscaling_metrics_gatewayis still called unnecessarily during create/update for TEMPORAL endpointstemporal_task_queuethrough create and update paths;dict_not_noneensuresNonevalues are excluded from UPDATE queries, preserving existing valuesmin_workers=0for TEMPORAL endpoints in resource validation and passestemporal_task_queuethrough create/update use cases; clean changesSequence Diagram
sequenceDiagram participant Client participant API as Launch API participant UC as CreateEndpointUseCase participant Svc as LiveModelEndpointService participant Repo as DbEndpointRecordRepo participant GW as LiveEndpointResourceGateway participant K8s as K8sEndpointResourceDelegate participant Temporal as Temporal Server Client->>API: POST /endpoints {endpoint_type: temporal, temporal_task_queue: my-queue} API->>UC: execute(request) UC->>UC: validate_temporal_task_queue() UC->>Svc: create_model_endpoint(temporal_task_queue=...) Svc->>Repo: create_model_endpoint_record(temporal_task_queue=...) Repo-->>Svc: ModelEndpointRecord Svc->>GW: create_or_update_resources(request) GW->>GW: skip SQS queue creation (non-ASYNC) GW->>K8s: create_or_update_resources(sqs_queue_name='') K8s->>K8s: _get_deployment_resource_name() -> deployment-runnable-image-temporal-cpu|gpu K8s->>K8s: load_k8s_yaml(template, args with TEMPORAL_TASK_QUEUE, TEMPORAL_SERVER_HOSTNAME) K8s->>K8s: deploy Deployment with replicas=MAX_WORKERS K8s-->>GW: k8s_service_name (destination) GW-->>Svc: destination Svc-->>UC: ModelEndpointRecord UC-->>API: CreateModelEndpointV1Response API-->>Client: 200 OK Note over K8s,Temporal: Workers poll Temporal independently - Launch is NOT in the task execution path K8s-->>Temporal: workers poll task queue via TEMPORAL_SERVER_HOSTNAME:PORTPrompt To Fix All With AI
Reviews (5): Last reviewed commit: "docs: remove Key Design Decisions sectio..." | Re-trigger Greptile