Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,7 @@ To build the SDK from source for use as a dependency, the following prerequisite
* [uv](https://docs.astral.sh/uv/)
* [Rust](https://www.rust-lang.org/)
* [Protobuf Compiler](https://protobuf.dev/)
* [Node.js](https://nodejs.org/)

Use `uv` to install `poe`:

Expand Down Expand Up @@ -2074,6 +2075,12 @@ back from this downgrade, restore both of those files and run `uv sync --all-ext
run for protobuf version 3 by setting the `TEMPORAL_TEST_PROTO3` env var to `1` prior to running
tests.

The local build and lint flows also regenerate Temporal system Nexus models. By default this pulls
in `[email protected]` via `npx`. To use an existing checkout instead, set
`TEMPORAL_NEXUS_RPC_GEN_DIR` to the `nexus-rpc-gen` repo root or its `src` directory before
running `poe build-develop`, `poe lint`, or `poe gen-protos`. The local checkout override path
also requires [`pnpm`](https://pnpm.io/) to be installed.

### Style

* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ gen-protos = [
{ cmd = "uv run scripts/gen_protos.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
gen-protos-docker = [
{ cmd = "uv run scripts/gen_protos_docker.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
lint = [
Expand All @@ -102,6 +104,7 @@ lint-types = [
{ cmd = "uv run mypy --namespace-packages --check-untyped-defs ." },
{ cmd = "uv run basedpyright" },
]
gen-nexus-system-models = "uv run scripts/gen_nexus_system_models.py"
run-bench = "uv run python scripts/run_bench.py"
test = "uv run pytest"

Expand Down Expand Up @@ -139,14 +142,16 @@ environment = { PATH = "$PATH:$HOME/.cargo/bin", CARGO_NET_GIT_FETCH_WITH_CLI =
ignore_missing_imports = true
exclude = [
# Ignore generated code
'build/tool-cache',
'temporalio/api',
'temporalio/bridge/proto',
]

[tool.pydocstyle]
convention = "google"
# https://github.com/PyCQA/pydocstyle/issues/363#issuecomment-625563088
match_dir = "^(?!(docs|scripts|tests|api|proto|\\.)).*"
match_dir = "^(?!(build|docs|scripts|tests|api|proto|\\.)).*"
match = "^(?!_workflow_service_generated\\.py$).*\\.py"
add_ignore = [
# We like to wrap at a certain number of chars, even long summary sentences.
# https://github.com/PyCQA/pydocstyle/issues/184
Expand Down Expand Up @@ -211,7 +216,6 @@ exclude = [
# Exclude auto generated files
"temporalio/api",
"temporalio/bridge/proto",
"temporalio/bridge/_visitor.py",
"tests/worker/workflow_sandbox/testmodules/proto",
]

Expand Down
143 changes: 143 additions & 0 deletions scripts/gen_nexus_system_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations

import importlib
import subprocess
import sys
from pathlib import Path

NEXUS_RPC_GEN_VERSION = "0.1.0-alpha.4"


def main() -> None:
repo_root = Path(__file__).resolve().parent.parent
input_schema = (
repo_root
/ "temporalio"
/ "bridge"
/ "sdk-core"
/ "crates"
/ "common"
/ "protos"
/ "api_upstream"
/ "nexus"
/ "temporal-proto-models-nexusrpc.yaml"
)
output_file = (
repo_root / "temporalio" / "nexus" / "system" / "_workflow_service_generated.py"
)

if not input_schema.is_file():
raise RuntimeError(f"Expected Nexus schema at {input_schema}")

run_nexus_rpc_gen(
output_file=output_file,
input_schema=input_schema,
)
add_operation_registry(repo_root, output_file)
subprocess.run(
[
"uv",
"run",
"ruff",
"check",
"--select",
"I",
"--fix",
str(output_file),
],
cwd=repo_root,
check=True,
)
subprocess.run(
[
"uv",
"run",
"ruff",
"format",
str(output_file),
],
cwd=repo_root,
check=True,
)


def add_operation_registry(repo_root: Path, output_file: Path) -> None:
source = output_file.read_text()
source = ensure_typing_import(source)
services = discover_services(repo_root)
if not services:
output_file.write_text(source)
return
output_file.write_text(source.rstrip() + "\n\n" + emit_operation_registry(services))


def ensure_typing_import(source: str) -> str:
if "\nimport typing\n" in source:
return source
marker = "from __future__ import annotations\n\n"
if marker not in source:
raise RuntimeError("Expected future-annotations import in generated output")
return source.replace(marker, marker + "import typing\n", 1)


def discover_services(repo_root: Path) -> list[tuple[str, str, list[tuple[str, str]]]]:
module_name = "temporalio.nexus.system._workflow_service_generated"
sys.path.insert(0, str(repo_root))
try:
sys.modules.pop(module_name, None)
importlib.invalidate_caches()
module = importlib.import_module(module_name)
finally:
sys.path.pop(0)
services: list[tuple[str, str, list[tuple[str, str]]]] = []
for value in vars(module).values():
if not isinstance(value, type):
continue
definition = getattr(value, "__nexus_service_definition__", None)
if definition is None:
continue
operations = [
(operation_definition.method_name, operation_definition.name)
for operation_definition in definition.operation_definitions.values()
]
services.append((value.__name__, definition.name, operations))
return services


def emit_operation_registry(
services: list[tuple[str, str, list[tuple[str, str]]]],
) -> str:
lines = [
"__nexus_operation_registry__: dict[",
" tuple[str, str], Operation[typing.Any, typing.Any]",
"] = {",
]
for class_name, service_name, operations in services:
for attr_name, operation_name in operations:
lines.append(
f" ({service_name!r}, {operation_name!r}): {class_name}.{attr_name},"
)
lines.append("}")
return "\n".join(lines).rstrip() + "\n"


def run_nexus_rpc_gen(*, output_file: Path, input_schema: Path) -> None:
common_args = [
"--lang",
"py",
"--out-file",
str(output_file),
str(input_schema),
]
subprocess.run(
["npx", "--yes", f"nexus-rpc-gen@{NEXUS_RPC_GEN_VERSION}", *common_args],
check=True,
)


if __name__ == "__main__":
try:
main()
except Exception as err:
print(f"Failed to generate Nexus system models: {err}", file=sys.stderr)
raise
Loading