Skip to content

proposal: temporal endpoint type for Temporal activity workers (MLI-6425)#815

Open
lilyz-ai wants to merge 6 commits intomainfrom
lilyz-ai/temporal-endpoint-type
Open

proposal: temporal endpoint type for Temporal activity workers (MLI-6425)#815
lilyz-ai wants to merge 6 commits intomainfrom
lilyz-ai/temporal-endpoint-type

Conversation

@lilyz-ai
Copy link
Copy Markdown
Collaborator

@lilyz-ai lilyz-ai commented Apr 24, 2026

Summary

RFC for adding temporal as a fourth endpoint type in Launch, enabling teams to manage Temporal activity worker pods through the standard Launch API instead of maintaining custom K8s Deployments.

  • Adds endpoint_type: temporal + temporal_task_queue field to the endpoint API
  • Launch manages pod lifecycle, GPU scheduling, and scaling
  • Workers connect to Temporal server directly — Launch is not in the task path
  • Phase 1: fixed-replica MVP (~2 weeks). Phase 2: KEDA Temporal autoscaler (~3 weeks follow-up)

See docs/temporal-endpoint-type-proposal.md for full design, API changes, implementation plan, and alternatives considered.

Motivation: robotics ego pipeline (SVO → Dyn-HaMR → hand annotations) needs durable multi-step orchestration across T4/H100 workers. Temporal gives crash recovery per-phase; this PR proposes Launch-native worker management so teams don't bypass Launch to use it.

Open questions for reviewers

  1. Readiness probe strategy for workers with no HTTP endpoint
  2. Should Launch eventually expose a "start workflow" API, or stay worker-only?
  3. temporal_namespace — configurable field or always default?

Test plan

  • Proposal review / design sign-off
  • Phase 1 implementation PR (separate)
  • Phase 2 KEDA autoscaling PR (separate)

🤖 Generated with Claude Code

Greptile Summary

This PR proposes a temporal endpoint type for Launch, enabling teams to deploy Temporal activity worker pods via the standard Launch API rather than maintaining custom K8s Deployments. The implementation covers the full stack: a new ModelEndpointType.TEMPORAL enum value, a temporal_task_queue DB column + Alembic migration, Helm/CircleCI deployment templates, and wiring through the service/repository/use-case layers.

Confidence Score: 5/5

Safe to merge; all remaining findings are P2 style/cleanup suggestions with no correctness impact.

The core data-path changes (DB migration, entity, DTOs, repositories, use-cases, K8s templates) are structurally sound. The two remaining comments are minor style issues: a non-hex migration revision ID and an unnecessary autoscaling-metrics call for temporal endpoints. Both are harmless in production.

No files require special attention beyond the two P2 comments.

Important Files Changed

Filename Overview
model-engine/model_engine_server/common/dtos/model_endpoints.py Adds temporal_task_queue field to create/update request DTOs; create-side validator enforces non-empty for temporal endpoints, but update-side lacks equivalent validation (already flagged in previous review)
model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py Adds nullable temporal_task_queue column to endpoints table; down_revision correctly chains to a1b2c3d4e5f6, but the revision ID b2c3d4e5f6g7 is non-hexadecimal (contains g), breaking the hex convention used by all other migrations
model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py Adds _TemporalDeploymentArguments, _TemporalRunnableImageDeploymentArguments, and the CPU/GPU TypedDict classes; template argument construction correctly injects TEMPORAL_TASK_QUEUE, hostname, and port into the env var list
model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py Correctly skips SQS queue deletion for TEMPORAL endpoints on delete, but inference_autoscaling_metrics_gateway is still called unnecessarily during create/update for TEMPORAL endpoints
charts/model-engine/templates/service_template_config_map.yaml Adds temporal CPU/GPU Deployment templates via Helm range loop; no readiness/liveness probe (acknowledged as open question in PR), otherwise structurally consistent with other deployment templates
model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml Adds fully expanded CPU and GPU temporal deployment templates for CI; matches Helm chart templates structurally
model-engine/model_engine_server/infra/repositories/db_model_endpoint_record_repository.py Correctly threads temporal_task_queue through create and update paths; dict_not_none ensures None values are excluded from UPDATE queries, preserving existing values
model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py Allows min_workers=0 for TEMPORAL endpoints in resource validation and passes temporal_task_queue through create/update use cases; clean changes

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as Launch API
    participant UC as CreateEndpointUseCase
    participant Svc as LiveModelEndpointService
    participant Repo as DbEndpointRecordRepo
    participant GW as LiveEndpointResourceGateway
    participant K8s as K8sEndpointResourceDelegate
    participant Temporal as Temporal Server

    Client->>API: POST /endpoints {endpoint_type: temporal, temporal_task_queue: my-queue}
    API->>UC: execute(request)
    UC->>UC: validate_temporal_task_queue()
    UC->>Svc: create_model_endpoint(temporal_task_queue=...)
    Svc->>Repo: create_model_endpoint_record(temporal_task_queue=...)
    Repo-->>Svc: ModelEndpointRecord
    Svc->>GW: create_or_update_resources(request)
    GW->>GW: skip SQS queue creation (non-ASYNC)
    GW->>K8s: create_or_update_resources(sqs_queue_name='')
    K8s->>K8s: _get_deployment_resource_name() -> deployment-runnable-image-temporal-cpu|gpu
    K8s->>K8s: load_k8s_yaml(template, args with TEMPORAL_TASK_QUEUE, TEMPORAL_SERVER_HOSTNAME)
    K8s->>K8s: deploy Deployment with replicas=MAX_WORKERS
    K8s-->>GW: k8s_service_name (destination)
    GW-->>Svc: destination
    Svc-->>UC: ModelEndpointRecord
    UC-->>API: CreateModelEndpointV1Response
    API-->>Client: 200 OK
    Note over K8s,Temporal: Workers poll Temporal independently - Launch is NOT in the task execution path
    K8s-->>Temporal: workers poll task queue via TEMPORAL_SERVER_HOSTNAME:PORT
Loading

Fix All in Cursor Fix All in Claude Code

Prompt To Fix All With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py
Line: 12

Comment:
**Non-hexadecimal Alembic revision ID**

`b2c3d4e5f6g7` contains `g`, making it non-hexadecimal. Every other migration in this repo uses valid hex IDs (`fa3267c80731`, `b574e9711e35`, `a1b2c3d4e5f6`, etc.). While Alembic treats revision IDs as plain strings so this won't cause a runtime error, it breaks the consistent hex convention and makes tooling that generates hex-based IDs (e.g. `alembic revision --autogenerate`) less likely to pick up the correct head. Consider replacing `b2c3d4e5f6g7` with a valid 12-char hex string (e.g. `b2c3d4e5f6a7`).

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py
Line: 72-75

Comment:
**Inference autoscaling metrics resources created unnecessarily for TEMPORAL endpoints**

The `else` branch runs for all non-ASYNC endpoint types, so `inference_autoscaling_metrics_gateway.create_or_update_resources` is called when creating a TEMPORAL endpoint. These resources are only meaningful for sync/streaming HPA-based autoscaling; temporal workers use their own `temporal.scaleml.io/*` annotations. The create call is a no-op in terms of correctness but creates stale/unused resources.

Consider guarding the call:
```python
if self.inference_autoscaling_metrics_gateway is not None and endpoint_type not in {
    ModelEndpointType.TEMPORAL
}:
    await self.inference_autoscaling_metrics_gateway.create_or_update_resources(
        endpoint_record.id
    )
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (5): Last reviewed commit: "docs: remove Key Design Decisions sectio..." | Re-trigger Greptile

lilyz-ai and others added 2 commits April 21, 2026 01:11
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
RFC for natively managing Temporal activity workers via Launch,
with phased implementation plan (fixed-replica MVP + KEDA autoscaling).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment on lines +132 to +137
Recommendation: KEDA Temporal trigger. Annotation format:
```yaml
temporal.keda.sh/task-queue: "${TEMPORAL_TASK_QUEUE}"
temporal.keda.sh/namespace: "default"
temporal.keda.sh/targetQueueSize: "${PER_WORKER}"
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 KEDA annotation format does not match KEDA's actual Temporal scaler API

The temporal.keda.sh/ annotation keys used here aren't part of KEDA's Temporal scaler — KEDA uses a ScaledObject CRD with spec.triggers[].type: temporal and structured metadata, not pod annotations. Using the wrong format in Phase 2 implementation will silently produce a non-functional autoscaler.

Correct KEDA Temporal trigger format (from keda.sh docs):

triggers:
  - type: temporal
    metadata:
      namespace: "default"
      taskQueue: "${TEMPORAL_TASK_QUEUE}"
      targetQueueSize: "${PER_WORKER}"
      endpoint: "${TEMPORAL_SERVER_HOSTNAME}:${TEMPORAL_SERVER_PORT}"
      queueTypes: activity

This needs to be a separate ScaledObject resource (not a Deployment annotation), managed alongside the Deployment template.

Prompt To Fix With AI
This is a comment left during a code review.
Path: docs/temporal-endpoint-type-proposal.md
Line: 132-137

Comment:
**KEDA annotation format does not match KEDA's actual Temporal scaler API**

The `temporal.keda.sh/` annotation keys used here aren't part of KEDA's Temporal scaler — KEDA uses a `ScaledObject` CRD with `spec.triggers[].type: temporal` and structured metadata, not pod annotations. Using the wrong format in Phase 2 implementation will silently produce a non-functional autoscaler.

Correct KEDA Temporal trigger format (from [keda.sh docs](https://keda.sh/docs/2.19/scalers/temporal/)):
```yaml
triggers:
  - type: temporal
    metadata:
      namespace: "default"
      taskQueue: "${TEMPORAL_TASK_QUEUE}"
      targetQueueSize: "${PER_WORKER}"
      endpoint: "${TEMPORAL_SERVER_HOSTNAME}:${TEMPORAL_SERVER_PORT}"
      queueTypes: activity
```
This needs to be a separate `ScaledObject` resource (not a Deployment annotation), managed alongside the Deployment template.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code

Comment thread docs/temporal-endpoint-type-proposal.md Outdated
Comment thread docs/temporal-endpoint-type-proposal.md Outdated
Comment thread docs/temporal-endpoint-type-proposal.md
lilyz-ai and others added 3 commits April 24, 2026 18:23
Adds TEMPORAL as a fourth endpoint type in Launch so teams can run
Temporal activity worker pods through the standard Launch API instead
of maintaining custom raw K8s Deployments.

- domain/entities: add TEMPORAL to ModelEndpointType enum
- domain/entities: add temporal_task_queue field to ModelEndpointRecord
- common/dtos: add temporal_task_queue to Create/Update request DTOs
  with validator requiring it for temporal endpoints
- domain/use_cases: allow min_workers=0 for temporal (like async)
- domain/services + infra/services: thread temporal_task_queue through
  create/update endpoint call chain
- db/models: add temporal_task_queue column to ORM model
- db/migrations: alembic migration to add the column
- infra/repositories: persist and read temporal_task_queue
- k8s_resource_types: add DeploymentRunnableImageTemporalCpuArguments
  and DeploymentRunnableImageTemporalGpuArguments TypedDicts; add
  temporal branch in get_endpoint_resource_arguments_from_request
- k8s_endpoint_resource_delegate: add temporal resource name handling
  in _create_or_update_resources, _get_resources_from_deployment_type,
  _get_temporal_autoscaling_params (reads annotations); skip SQS for
  temporal in delete_resources
- live_endpoint_resource_gateway: skip SQS queue deletion for temporal
- service_template_config_map_circleci.yaml: add cpu+gpu temporal
  deployment templates (fixed replicas=MAX_WORKERS, no forwarder
  sidecar, temporal annotations)
- service_template_config_map.yaml (Helm): same templates for prod

MVP uses fixed replicas = max_workers; KEDA autoscaling is phase 2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The temporal deployment template used `$security_context | nindent`
directly on a map value, causing a Helm type error. Use the same
`{{- with $security_context }} securityContext: {{- toYaml . | nindent }}
{{- end }}` pattern as the other endpoint type templates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- temporal-endpoint-type-proposal.md: update status to Implemented,
  document actual design decisions (annotations, no readiness probe,
  no forwarder sidecar, fixed replicas MVP), resolve open questions
- temporal-endpoint-e2e-test.md: step-by-step test procedure for
  ml-serving-new including image build, configmap patch, DB migration,
  endpoint creation, and K8s Deployment verification
- patch_temporal_configmap.py: runnable script to patch production
  service-template ConfigMap with temporal templates during e2e testing
- e2e test results from 2026-04-25: all checks passed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment on lines 132 to +136
ge=1,
description="For async endpoints, how long a task can wait in queue before expiring (in seconds). Default: 86400 (24 hours).",
)
temporal_task_queue: Optional[str] = Field(
default=None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Missing temporal_task_queue validation on update

CreateModelEndpointV1Request validates that temporal_task_queue is non-empty for temporal endpoints, but UpdateModelEndpointV1Request has no equivalent check. Because dict_not_none only filters None (not ""), a client that sends temporal_task_queue="" on an update will silently overwrite the stored task queue with an empty string, causing the next deployment to inject TEMPORAL_TASK_QUEUE="" into workers and making them poll no queue.

@model_validator(mode="after")
def validate_temporal_task_queue(self) -> "UpdateModelEndpointV1Request":
    if self.temporal_task_queue is not None and not self.temporal_task_queue:
        raise ValueError("temporal_task_queue must be a non-empty string if provided")
    return self
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/common/dtos/model_endpoints.py
Line: 132-136

Comment:
**Missing `temporal_task_queue` validation on update**

`CreateModelEndpointV1Request` validates that `temporal_task_queue` is non-empty for temporal endpoints, but `UpdateModelEndpointV1Request` has no equivalent check. Because `dict_not_none` only filters `None` (not `""`), a client that sends `temporal_task_queue=""` on an update will silently overwrite the stored task queue with an empty string, causing the next deployment to inject `TEMPORAL_TASK_QUEUE=""` into workers and making them poll no queue.

```python
@model_validator(mode="after")
def validate_temporal_task_queue(self) -> "UpdateModelEndpointV1Request":
    if self.temporal_task_queue is not None and not self.temporal_task_queue:
        raise ValueError("temporal_task_queue must be a non-empty string if provided")
    return self
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant