Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ Some examples require extra dependencies. See each sample's directory for specif
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
* [Nexus Messaging](nexus_messaging): Demonstrates how send signal, update and query messages through Nexus.
This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus
and sends messages to it.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
16 changes: 16 additions & 0 deletions nexus_messaging/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus
operations. There are two self-contained examples, each in its own directory:

| | `callerpattern/` | `ondemandpattern/` |
|--------------------------------|--------------------------------------|--------------------------------------------------------------|
| **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them |
| **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
| **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |

Each directory is fully self-contained for clarity. The `GreetingWorkflow`, activity, and
`Language` enum are **identical** between the two -- only the Nexus service definition and its
handler implementation differ. This highlights that the same Workflow can be exposed through
Nexus in different ways depending on whether the caller needs lifecycle control.

See each directory's README for running instructions.
File renamed without changes.
56 changes: 56 additions & 0 deletions nexus_messaging/callerpattern/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
## Caller pattern

The handler worker starts a `GreetingWorkflow` for a User ID.
`NexusGreetingServiceHandler` holds that ID and routes every Nexus operation to it.
The caller's input does not have that Workflow ID as the caller doesn't know it -- but the caller
sends in the User ID, and `NexusGreetingServiceHandler` knows how to get the desired Workflow ID
from that User ID (see the `get_workflow_id` call).

The handler worker uses the same `get_workflow_id` call to generate a Workflow ID from a Wser ID
when it launches the Workflow.

The caller Workflow:
1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`)
2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity)
3. Confirms the change via a second query (`get_language`)
4. Approves the Workflow (`approve` -- backed by a `@workflow.signal`)

### Running

Start a Temporal server:

```bash
temporal server start-dev
```

Create the namespaces and Nexus endpoint:

```bash
temporal operator namespace create --namespace nexus-messaging-handler-namespace
temporal operator namespace create --namespace nexus-messaging-caller-namespace

temporal operator nexus endpoint create \
--name nexus-messaging-nexus-endpoint \
--target-namespace nexus-messaging-handler-namespace \
--target-task-queue nexus-messaging-handler-task-queue
```

In one terminal, start the handler worker:

```bash
uv run python -m nexus_messaging.callerpattern.handler.worker
```

In another terminal, run the following command to start the example:

```bash
uv run python -m nexus_messaging.callerpattern.caller.app
```

Expected output:

```
Supported languages: [<Language.CHINESE: 2>, <Language.ENGLISH: 3>]
Language changed: ENGLISH -> ARABIC
Workflow approved
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_sync_operations.caller.workflows import CallerWorkflow
from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow

NAMESPACE = "nexus-sync-operations-caller-namespace"
TASK_QUEUE = "nexus-sync-operations-caller-task-queue"
NAMESPACE = "nexus-messaging-caller-namespace"
TASK_QUEUE = "nexus-messaging-caller-task-queue"


async def execute_caller_workflow(
client: Optional[Client] = None,
) -> None:
async def execute_caller_workflow(client: Optional[Client] = None) -> None:
if client is None:
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
Expand All @@ -28,7 +26,8 @@ async def execute_caller_workflow(
):
log = await client.execute_workflow(
CallerWorkflow.run,
id=str(uuid.uuid4()),
arg="user-1",
id=f"nexus-messaging-caller-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
for line in log:
Expand Down
73 changes: 73 additions & 0 deletions nexus_messaging/callerpattern/caller/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
A caller workflow that executes Nexus operations. The caller does not have information
about how these operations are implemented by the Nexus service.
"""

from temporalio import workflow
from temporalio.exceptions import ApplicationError

from nexus_messaging.callerpattern.service import (
ApproveInput,
GetLanguageInput,
GetLanguagesInput,
Language,
NexusGreetingService,
SetLanguageInput,
)

NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"


@workflow.defn
class CallerWorkflow:
@workflow.run
async def run(self, user_id: str) -> list[str]:
log: list[str] = []
nexus_client = workflow.create_nexus_client(
service=NexusGreetingService,
endpoint=NEXUS_ENDPOINT,
)

# Call a Nexus operation backed by a query against the entity workflow.
# The workflow must already be running on the handler, otherwise you will
# get an error saying the workflow has already terminated.
languages_output = await nexus_client.execute_operation(
NexusGreetingService.get_languages,
GetLanguagesInput(include_unsupported=False, user_id=user_id),
)
log.append(f"Supported languages: {languages_output.languages}")
workflow.logger.info("Supported languages: %s", languages_output.languages)

# Following are examples for each of the three messaging types -
# update, query, then signal.

# Call a Nexus operation backed by an update against the entity workflow.
previous_language = await nexus_client.execute_operation(
NexusGreetingService.set_language,
SetLanguageInput(language=Language.ARABIC, user_id=user_id),
)

# Call a Nexus operation backed by a query to confirm the language change.
current_language = await nexus_client.execute_operation(
NexusGreetingService.get_language,
GetLanguageInput(user_id=user_id),
)
if current_language != Language.ARABIC:
raise ApplicationError(f"Expected language ARABIC, got {current_language}")

log.append(
f"Language changed: {previous_language.name} -> {Language.ARABIC.name}"
)
workflow.logger.info(
"Language changed from %s to %s", previous_language, Language.ARABIC
)

# Call a Nexus operation backed by a signal against the entity workflow.
await nexus_client.execute_operation(
NexusGreetingService.approve,
ApproveInput(name="caller", user_id=user_id),
)
log.append("Workflow approved")
workflow.logger.info("Workflow approved")

return log
Empty file.
22 changes: 22 additions & 0 deletions nexus_messaging/callerpattern/handler/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
from typing import Optional

from temporalio import activity

from nexus_messaging.callerpattern.service import Language


@activity.defn
async def call_greeting_service(language: Language) -> Optional[str]:
"""Simulates a call to a remote greeting service. Returns None if unsupported."""
greetings = {
Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645",
Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c",
Language.ENGLISH: "Hello, world",
Language.FRENCH: "Bonjour, monde",
Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e",
Language.PORTUGUESE: "Ol\u00e1 mundo",
Language.SPANISH: "Hola mundo",
}
await asyncio.sleep(0.2)
return greetings.get(language)
80 changes: 80 additions & 0 deletions nexus_messaging/callerpattern/handler/service_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Nexus operation handler implementation for the entity pattern. Each operation receives a
user_id, which is mapped to a workflow ID. The operations are synchronous because queries
and updates against a running workflow complete quickly.
"""

from __future__ import annotations

import nexusrpc
from temporalio import nexus
from temporalio.client import WorkflowHandle

from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow
from nexus_messaging.callerpattern.service import (
ApproveInput,
ApproveOutput,
GetLanguageInput,
GetLanguagesInput,
GetLanguagesOutput,
Language,
NexusGreetingService,
SetLanguageInput,
)

WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_"


def get_workflow_id(user_id: str) -> str:
"""Map a user ID to a workflow ID.

This example assumes you might have multiple workflows, one for each user.
If you had a single workflow for all users, you could remove this function,
remove the user_id from each input, and just use a single workflow ID.
"""
return f"{WORKFLOW_ID_PREFIX}{user_id}"


@nexusrpc.handler.service_handler(service=NexusGreetingService)
class NexusGreetingServiceHandler:
def _get_workflow_handle(
self, user_id: str
) -> WorkflowHandle[GreetingWorkflow, str]:
return nexus.client().get_workflow_handle_for(
GreetingWorkflow.run, get_workflow_id(user_id)
)

@nexusrpc.handler.sync_operation
async def get_languages(
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput
) -> GetLanguagesOutput:
return await self._get_workflow_handle(input.user_id).query(
GreetingWorkflow.get_languages, input
)

@nexusrpc.handler.sync_operation
async def get_language(
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput
) -> Language:
return await self._get_workflow_handle(input.user_id).query(
GreetingWorkflow.get_language
)

# Routes to set_language_using_activity (not set_language) so that new languages not
# already in the greetings map can be fetched via an activity.
@nexusrpc.handler.sync_operation
async def set_language(
self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput
) -> Language:
return await self._get_workflow_handle(input.user_id).execute_update(
GreetingWorkflow.set_language_using_activity, input
)
Comment thread
Evanthx marked this conversation as resolved.

@nexusrpc.handler.sync_operation
async def approve(
self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput
) -> ApproveOutput:
await self._get_workflow_handle(input.user_id).signal(
GreetingWorkflow.approve, input
)
return ApproveOutput()
62 changes: 62 additions & 0 deletions nexus_messaging/callerpattern/handler/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
import logging
from typing import Optional

from temporalio.client import Client
from temporalio.common import WorkflowIDConflictPolicy
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_messaging.callerpattern.handler.activities import call_greeting_service
from nexus_messaging.callerpattern.handler.service_handler import (
NexusGreetingServiceHandler,
get_workflow_id,
)
from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow

interrupt_event = asyncio.Event()

NAMESPACE = "nexus-messaging-handler-namespace"
TASK_QUEUE = "nexus-messaging-handler-task-queue"
USER_ID = "user-1"


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

if client is None:
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
config.setdefault("namespace", NAMESPACE)
client = await Client.connect(**config)

# Start the long-running entity workflow that backs the Nexus service,
# if not already running.
workflow_id = get_workflow_id(USER_ID)
await client.start_workflow(
GreetingWorkflow.run,
id=workflow_id,
task_queue=TASK_QUEUE,
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
)
logging.info("Started greeting workflow: %s", workflow_id)
Comment thread
Evanthx marked this conversation as resolved.

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[GreetingWorkflow],
activities=[call_greeting_service],
nexus_service_handlers=[NexusGreetingServiceHandler()],
):
logging.info("Handler worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
Loading
Loading