-
Notifications
You must be signed in to change notification settings - Fork 102
Sample code for Nexus messaging #290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Evanthx
wants to merge
14
commits into
main
Choose a base branch
from
signals-nexus-python
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
7db991f
Working on adding Nexus messaging sample code
Evanthx 41d3aae
Fix to the tests
Evanthx 83386cf
Updates from a code review
Evanthx 274ea2a
Working on adding Nexus messaging sample code
Evanthx 8ed3721
Fix to the tests
Evanthx ca866f1
Updates from a code review
Evanthx 0c5e214
Merge remote-tracking branch 'refs/remotes/origin/signals-nexus-pytho…
Evanthx ea466a2
Bump pandas to 2.3.3 for Python 3.14 wheel support
Evanthx 48a2ab6
removed some unneeded code
Evanthx ed11a26
Removing workflow.unsafe.imports_passed_through
Evanthx 4206de0
Bringing in changes from main
Evanthx 9bb1166
Merge branch 'main' into signals-nexus-python
Evanthx 5add8fb
Linting
Evanthx b2bfdba
Changes from code reviews
Evanthx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus | ||
| operations. There are two self-contained examples, each in its own directory: | ||
|
|
||
| | | `callerpattern/` | `ondemandpattern/` | | ||
| |--------------------------------|--------------------------------------|--------------------------------------------------------------| | ||
| | **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them | | ||
| | **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation | | ||
| | **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation | | ||
| | **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` | | ||
|
|
||
| Each directory is fully self-contained for clarity. The `GreetingWorkflow`, activity, and | ||
| `Language` enum are **identical** between the two -- only the Nexus service definition and its | ||
| handler implementation differ. This highlights that the same Workflow can be exposed through | ||
| Nexus in different ways depending on whether the caller needs lifecycle control. | ||
|
|
||
| See each directory's README for running instructions. |
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| ## Caller pattern | ||
|
|
||
| The handler worker starts a `GreetingWorkflow` for a User ID. | ||
| `NexusGreetingServiceHandler` holds that ID and routes every Nexus operation to it. | ||
| The caller's input does not have that Workflow ID as the caller doesn't know it -- but the caller | ||
| sends in the User ID, and `NexusGreetingServiceHandler` knows how to get the desired Workflow ID | ||
| from that User ID (see the `get_workflow_id` call). | ||
|
|
||
| The handler worker uses the same `get_workflow_id` call to generate a Workflow ID from a Wser ID | ||
| when it launches the Workflow. | ||
|
|
||
| The caller Workflow: | ||
| 1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`) | ||
| 2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity) | ||
| 3. Confirms the change via a second query (`get_language`) | ||
| 4. Approves the Workflow (`approve` -- backed by a `@workflow.signal`) | ||
|
|
||
| ### Running | ||
|
|
||
| Start a Temporal server: | ||
|
|
||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| Create the namespaces and Nexus endpoint: | ||
|
|
||
| ```bash | ||
| temporal operator namespace create --namespace nexus-messaging-handler-namespace | ||
| temporal operator namespace create --namespace nexus-messaging-caller-namespace | ||
|
|
||
| temporal operator nexus endpoint create \ | ||
| --name nexus-messaging-nexus-endpoint \ | ||
| --target-namespace nexus-messaging-handler-namespace \ | ||
| --target-task-queue nexus-messaging-handler-task-queue | ||
| ``` | ||
|
|
||
| In one terminal, start the handler worker: | ||
|
|
||
| ```bash | ||
| uv run python -m nexus_messaging.callerpattern.handler.worker | ||
| ``` | ||
|
|
||
| In another terminal, run the following command to start the example: | ||
|
|
||
| ```bash | ||
| uv run python -m nexus_messaging.callerpattern.caller.app | ||
| ``` | ||
|
|
||
| Expected output: | ||
|
|
||
| ``` | ||
| Supported languages: [<Language.CHINESE: 2>, <Language.ENGLISH: 3>] | ||
| Language changed: ENGLISH -> ARABIC | ||
| Workflow approved | ||
| ``` |
File renamed without changes.
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| """ | ||
| A caller workflow that executes Nexus operations. The caller does not have information | ||
| about how these operations are implemented by the Nexus service. | ||
| """ | ||
|
|
||
| from temporalio import workflow | ||
| from temporalio.exceptions import ApplicationError | ||
|
|
||
| from nexus_messaging.callerpattern.service import ( | ||
| ApproveInput, | ||
| GetLanguageInput, | ||
| GetLanguagesInput, | ||
| Language, | ||
| NexusGreetingService, | ||
| SetLanguageInput, | ||
| ) | ||
|
|
||
| NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class CallerWorkflow: | ||
| @workflow.run | ||
| async def run(self, user_id: str) -> list[str]: | ||
| log: list[str] = [] | ||
| nexus_client = workflow.create_nexus_client( | ||
| service=NexusGreetingService, | ||
| endpoint=NEXUS_ENDPOINT, | ||
| ) | ||
|
|
||
| # Call a Nexus operation backed by a query against the entity workflow. | ||
| # The workflow must already be running on the handler, otherwise you will | ||
| # get an error saying the workflow has already terminated. | ||
| languages_output = await nexus_client.execute_operation( | ||
| NexusGreetingService.get_languages, | ||
| GetLanguagesInput(include_unsupported=False, user_id=user_id), | ||
| ) | ||
| log.append(f"Supported languages: {languages_output.languages}") | ||
| workflow.logger.info("Supported languages: %s", languages_output.languages) | ||
|
|
||
| # Following are examples for each of the three messaging types - | ||
| # update, query, then signal. | ||
|
|
||
| # Call a Nexus operation backed by an update against the entity workflow. | ||
| previous_language = await nexus_client.execute_operation( | ||
| NexusGreetingService.set_language, | ||
| SetLanguageInput(language=Language.ARABIC, user_id=user_id), | ||
| ) | ||
|
|
||
| # Call a Nexus operation backed by a query to confirm the language change. | ||
| current_language = await nexus_client.execute_operation( | ||
| NexusGreetingService.get_language, | ||
| GetLanguageInput(user_id=user_id), | ||
| ) | ||
| if current_language != Language.ARABIC: | ||
| raise ApplicationError(f"Expected language ARABIC, got {current_language}") | ||
|
|
||
| log.append( | ||
| f"Language changed: {previous_language.name} -> {Language.ARABIC.name}" | ||
| ) | ||
| workflow.logger.info( | ||
| "Language changed from %s to %s", previous_language, Language.ARABIC | ||
| ) | ||
|
|
||
| # Call a Nexus operation backed by a signal against the entity workflow. | ||
| await nexus_client.execute_operation( | ||
| NexusGreetingService.approve, | ||
| ApproveInput(name="caller", user_id=user_id), | ||
| ) | ||
| log.append("Workflow approved") | ||
| workflow.logger.info("Workflow approved") | ||
|
|
||
| return log |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import asyncio | ||
| from typing import Optional | ||
|
|
||
| from temporalio import activity | ||
|
|
||
| from nexus_messaging.callerpattern.service import Language | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def call_greeting_service(language: Language) -> Optional[str]: | ||
| """Simulates a call to a remote greeting service. Returns None if unsupported.""" | ||
| greetings = { | ||
| Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645", | ||
| Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", | ||
| Language.ENGLISH: "Hello, world", | ||
| Language.FRENCH: "Bonjour, monde", | ||
| Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e", | ||
| Language.PORTUGUESE: "Ol\u00e1 mundo", | ||
| Language.SPANISH: "Hola mundo", | ||
| } | ||
| await asyncio.sleep(0.2) | ||
| return greetings.get(language) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| """ | ||
| Nexus operation handler implementation for the entity pattern. Each operation receives a | ||
| user_id, which is mapped to a workflow ID. The operations are synchronous because queries | ||
| and updates against a running workflow complete quickly. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import nexusrpc | ||
| from temporalio import nexus | ||
| from temporalio.client import WorkflowHandle | ||
|
|
||
| from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow | ||
| from nexus_messaging.callerpattern.service import ( | ||
| ApproveInput, | ||
| ApproveOutput, | ||
| GetLanguageInput, | ||
| GetLanguagesInput, | ||
| GetLanguagesOutput, | ||
| Language, | ||
| NexusGreetingService, | ||
| SetLanguageInput, | ||
| ) | ||
|
|
||
| WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_" | ||
|
|
||
|
|
||
| def get_workflow_id(user_id: str) -> str: | ||
| """Map a user ID to a workflow ID. | ||
|
|
||
| This example assumes you might have multiple workflows, one for each user. | ||
| If you had a single workflow for all users, you could remove this function, | ||
| remove the user_id from each input, and just use a single workflow ID. | ||
| """ | ||
| return f"{WORKFLOW_ID_PREFIX}{user_id}" | ||
|
|
||
|
|
||
| @nexusrpc.handler.service_handler(service=NexusGreetingService) | ||
| class NexusGreetingServiceHandler: | ||
| def _get_workflow_handle( | ||
| self, user_id: str | ||
| ) -> WorkflowHandle[GreetingWorkflow, str]: | ||
| return nexus.client().get_workflow_handle_for( | ||
| GreetingWorkflow.run, get_workflow_id(user_id) | ||
| ) | ||
|
|
||
| @nexusrpc.handler.sync_operation | ||
| async def get_languages( | ||
| self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput | ||
| ) -> GetLanguagesOutput: | ||
| return await self._get_workflow_handle(input.user_id).query( | ||
| GreetingWorkflow.get_languages, input | ||
| ) | ||
|
|
||
| @nexusrpc.handler.sync_operation | ||
| async def get_language( | ||
| self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput | ||
| ) -> Language: | ||
| return await self._get_workflow_handle(input.user_id).query( | ||
| GreetingWorkflow.get_language | ||
| ) | ||
|
|
||
| # Routes to set_language_using_activity (not set_language) so that new languages not | ||
| # already in the greetings map can be fetched via an activity. | ||
| @nexusrpc.handler.sync_operation | ||
| async def set_language( | ||
| self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput | ||
| ) -> Language: | ||
| return await self._get_workflow_handle(input.user_id).execute_update( | ||
| GreetingWorkflow.set_language_using_activity, input | ||
| ) | ||
|
|
||
| @nexusrpc.handler.sync_operation | ||
| async def approve( | ||
| self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput | ||
| ) -> ApproveOutput: | ||
| await self._get_workflow_handle(input.user_id).signal( | ||
| GreetingWorkflow.approve, input | ||
| ) | ||
| return ApproveOutput() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| import asyncio | ||
| import logging | ||
| from typing import Optional | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.common import WorkflowIDConflictPolicy | ||
| from temporalio.envconfig import ClientConfig | ||
| from temporalio.worker import Worker | ||
|
|
||
| from nexus_messaging.callerpattern.handler.activities import call_greeting_service | ||
| from nexus_messaging.callerpattern.handler.service_handler import ( | ||
| NexusGreetingServiceHandler, | ||
| get_workflow_id, | ||
| ) | ||
| from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow | ||
|
|
||
| interrupt_event = asyncio.Event() | ||
|
|
||
| NAMESPACE = "nexus-messaging-handler-namespace" | ||
| TASK_QUEUE = "nexus-messaging-handler-task-queue" | ||
| USER_ID = "user-1" | ||
|
|
||
|
|
||
| async def main(client: Optional[Client] = None): | ||
| logging.basicConfig(level=logging.INFO) | ||
|
|
||
| if client is None: | ||
| config = ClientConfig.load_client_connect_config() | ||
| config.setdefault("target_host", "localhost:7233") | ||
| config.setdefault("namespace", NAMESPACE) | ||
| client = await Client.connect(**config) | ||
|
|
||
| # Start the long-running entity workflow that backs the Nexus service, | ||
| # if not already running. | ||
| workflow_id = get_workflow_id(USER_ID) | ||
| await client.start_workflow( | ||
| GreetingWorkflow.run, | ||
| id=workflow_id, | ||
| task_queue=TASK_QUEUE, | ||
| id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, | ||
| ) | ||
| logging.info("Started greeting workflow: %s", workflow_id) | ||
|
Evanthx marked this conversation as resolved.
|
||
|
|
||
| async with Worker( | ||
| client, | ||
| task_queue=TASK_QUEUE, | ||
| workflows=[GreetingWorkflow], | ||
| activities=[call_greeting_service], | ||
| nexus_service_handlers=[NexusGreetingServiceHandler()], | ||
| ): | ||
| logging.info("Handler worker started, ctrl+c to exit") | ||
| await interrupt_event.wait() | ||
| logging.info("Shutting down") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| loop.run_until_complete(main()) | ||
| except KeyboardInterrupt: | ||
| interrupt_event.set() | ||
| loop.run_until_complete(loop.shutdown_asyncgens()) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.