Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,7 @@ To build the SDK from source for use as a dependency, the following prerequisite
* [uv](https://docs.astral.sh/uv/)
* [Rust](https://www.rust-lang.org/)
* [Protobuf Compiler](https://protobuf.dev/)
* [Node.js](https://nodejs.org/)

Use `uv` to install `poe`:

Expand Down Expand Up @@ -2074,6 +2075,12 @@ back from this downgrade, restore both of those files and run `uv sync --all-ext
run for protobuf version 3 by setting the `TEMPORAL_TEST_PROTO3` env var to `1` prior to running
tests.

The local build and lint flows also regenerate Temporal system Nexus models. By default this pulls
in `nexus-rpc-gen@0.1.0-alpha.4` via `npx`. To use an existing checkout instead, set
`TEMPORAL_NEXUS_RPC_GEN_DIR` to the `nexus-rpc-gen` repo root or its `src` directory before
running `poe build-develop`, `poe lint`, or `poe gen-protos`. The local checkout override path
also requires [`pnpm`](https://pnpm.io/) to be installed.

### Style

* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
Expand Down
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ gen-protos = [
{ cmd = "uv run scripts/gen_protos.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
gen-protos-docker = [
{ cmd = "uv run scripts/gen_protos_docker.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
lint = [
Expand All @@ -102,6 +104,7 @@ lint-types = [
{ cmd = "uv run mypy --namespace-packages --check-untyped-defs ." },
{ cmd = "uv run basedpyright" },
]
gen-nexus-system-models = "uv run scripts/gen_nexus_system_models.py"
run-bench = "uv run python scripts/run_bench.py"
test = "uv run pytest"

Expand Down Expand Up @@ -139,14 +142,17 @@ environment = { PATH = "$PATH:$HOME/.cargo/bin", CARGO_NET_GIT_FETCH_WITH_CLI =
ignore_missing_imports = true
exclude = [
# Ignore generated code
'build',
'temporalio/api',
'temporalio/bridge/proto',
'temporalio/nexus/system/_workflow_service_generated.py',
]

[tool.pydocstyle]
convention = "google"
# https://github.com/PyCQA/pydocstyle/issues/363#issuecomment-625563088
match_dir = "^(?!(docs|scripts|tests|api|proto|\\.)).*"
match_dir = "^(?!(build|docs|scripts|tests|api|proto|\\.)).*"
match = "^(?!_workflow_service_generated\\.py$).*\\.py"
add_ignore = [
# We like to wrap at a certain number of chars, even long summary sentences.
# https://github.com/PyCQA/pydocstyle/issues/184
Expand Down
120 changes: 120 additions & 0 deletions scripts/gen_nexus_system_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

import os
import subprocess
import sys
from pathlib import Path

NEXUS_RPC_GEN_ENV_VAR = "TEMPORAL_NEXUS_RPC_GEN_DIR"
NEXUS_RPC_GEN_VERSION = "0.1.0-alpha.4"


def main() -> None:
repo_root = Path(__file__).resolve().parent.parent
# TODO: Remove the local .nexusrpc.yaml shim once the upstream API repo
# checks in the Nexus definition we can consume directly.
override_root = normalize_nexus_rpc_gen_root(
Path.cwd(), env_value=NEXUS_RPC_GEN_ENV_VAR
)
input_schema = (
repo_root
/ "temporalio"
/ "nexus"
/ "system"
/ "_workflow_service.nexusrpc.yaml"
)
output_file = (
repo_root / "temporalio" / "nexus" / "system" / "_workflow_service_generated.py"
)

if not input_schema.is_file():
raise RuntimeError(f"Expected Nexus schema at {input_schema}")

run_nexus_rpc_gen(
override_root=override_root,
output_file=output_file,
input_schema=input_schema,
)
subprocess.run(
[
"uv",
"run",
"ruff",
"check",
"--select",
"I",
"--fix",
str(output_file),
],
cwd=repo_root,
check=True,
)
subprocess.run(
[
"uv",
"run",
"ruff",
"format",
str(output_file),
],
cwd=repo_root,
check=True,
)


def run_nexus_rpc_gen(
*, override_root: Path | None, output_file: Path, input_schema: Path
) -> None:
common_args = [
"--lang",
"py",
"--out-file",
str(output_file),
"--temporal-nexus-payload-codec-support",
str(input_schema),
]
if override_root is None:
subprocess.run(
["npx", "--yes", f"nexus-rpc-gen@{NEXUS_RPC_GEN_VERSION}", *common_args],
check=True,
)
return

subprocess.run(
[
"node",
"packages/nexus-rpc-gen/dist/index.js",
*common_args,
],
cwd=override_root,
check=True,
)


def normalize_nexus_rpc_gen_root(base_dir: Path, env_value: str) -> Path | None:
raw_root = env_get(env_value)
if raw_root is None:
return None
candidate = Path(raw_root)
if not candidate.is_absolute():
candidate = base_dir / candidate
candidate = candidate.resolve()
if (candidate / "package.json").is_file() and (candidate / "packages").is_dir():
return candidate
if (candidate / "src" / "package.json").is_file():
return candidate / "src"
raise RuntimeError(
f"{NEXUS_RPC_GEN_ENV_VAR} must point to the nexus-rpc-gen repo root or its src directory"
)


def env_get(name: str) -> str | None:
return os.environ.get(name)


if __name__ == "__main__":
try:
main()
except Exception as err:
print(f"Failed to generate Nexus system models: {err}", file=sys.stderr)
raise
51 changes: 49 additions & 2 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge
import temporalio.converter
import temporalio.nexus.system
from temporalio.api.common.v1.message_pb2 import Payload
from temporalio.api.enums.v1.command_type_pb2 import CommandType
from temporalio.bridge._visitor import VisitorFunctions
from temporalio.bridge.temporal_sdk_bridge import (
CustomSlotSupplier as BridgeCustomSlotSupplier,
)
from temporalio.bridge.temporal_sdk_bridge import (
PollShutdownError, # type: ignore # noqa: F401
)
from temporalio.worker import _command_aware_visitor
from temporalio.worker._command_aware_visitor import CommandAwarePayloadVisitor


Expand Down Expand Up @@ -279,7 +282,10 @@ async def finalize_shutdown(self) -> None:


class _Visitor(VisitorFunctions):
def __init__(self, f: Callable[[Sequence[Payload]], Awaitable[list[Payload]]]):
def __init__(
self,
f: Callable[[Sequence[Payload]], Awaitable[list[Payload]]],
):
self._f = f

async def visit_payload(self, payload: Payload) -> None:
Expand All @@ -297,6 +303,42 @@ async def visit_payloads(self, payloads: MutableSequence[Payload]) -> None:
payloads.extend(new_payloads)


async def _encode_completion_payloads(
data_converter: temporalio.converter.DataConverter,
payloads: Sequence[Payload],
) -> list[Payload]:
if len(payloads) != 1:
return await data_converter._encode_payload_sequence(payloads)

# A single payload may be the outer envelope for a system Nexus operation.
# In that case we leave the envelope itself unencoded so the server can read
# it, but still route any nested Temporal payloads through normal payload
# processing via the generated operation-specific rewriter.
payload = payloads[0]
command_info = _command_aware_visitor.current_command_info.get()
if (
command_info is None
or command_info.command_type
!= CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION
or not command_info.nexus_service
or not command_info.nexus_operation
):
return await data_converter._encode_payload_sequence(payloads)

rewrite = temporalio.nexus.system.get_payload_rewriter(
command_info.nexus_service, command_info.nexus_operation
)
if rewrite is None:
return await data_converter._encode_payload_sequence(payloads)

new_payload = await rewrite(
payload,
data_converter._encode_payload_sequence,
False,
)
return [new_payload]


async def decode_activation(
activation: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
data_converter: temporalio.converter.DataConverter,
Expand All @@ -316,4 +358,9 @@ async def encode_completion(
"""Encode all payloads in the completion."""
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not encode_headers
).visit(_Visitor(data_converter._encode_payload_sequence), completion)
).visit(
_Visitor(
lambda payloads: _encode_completion_payloads(data_converter, payloads)
),
completion,
)
53 changes: 53 additions & 0 deletions temporalio/nexus/system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Generated system Nexus service models.

This package contains code generated from Temporal's system Nexus schemas.
Higher-level ergonomic APIs may wrap these generated types.
"""

from collections.abc import Awaitable, Callable, Sequence

import temporalio.api.common.v1
import temporalio.converter

from . import _workflow_service_generated as generated
from ._workflow_service_generated import __temporal_nexus_payload_rewriters__

TemporalNexusPayloadRewriter = Callable[
[
temporalio.api.common.v1.Payload,
Callable[
[Sequence[temporalio.api.common.v1.Payload]],
Awaitable[list[temporalio.api.common.v1.Payload]],
],
bool,
],
Awaitable[temporalio.api.common.v1.Payload],
]

_SYSTEM_NEXUS_PAYLOAD_CONVERTER = temporalio.converter.default().payload_converter


def get_payload_rewriter(
service: str,
operation: str,
) -> TemporalNexusPayloadRewriter | None:
"""Return the generated nested-payload rewriter for a system Nexus operation."""
return __temporal_nexus_payload_rewriters__.get((service, operation))


def is_system_operation(service: str, operation: str) -> bool:
"""Return whether a Nexus operation uses the generated system envelope."""
return get_payload_rewriter(service, operation) is not None


def get_payload_converter() -> temporalio.converter.PayloadConverter:
"""Return the fixed payload converter for system Nexus outer envelopes."""
return _SYSTEM_NEXUS_PAYLOAD_CONVERTER


__all__ = (
"generated",
"get_payload_converter",
"get_payload_rewriter",
"is_system_operation",
)
11 changes: 11 additions & 0 deletions temporalio/nexus/system/_workflow_service.nexusrpc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# TODO: Remove this local shim once the upstream API repo checks in the Nexus
# definition and the generator can consume it directly.
nexusrpc: 1.0.0
services:
WorkflowService:
operations:
SignalWithStartWorkflowExecution:
input:
$ref: ../../bridge/sdk-core/crates/common/protos/api_upstream/openapi/openapiv3.yaml#/components/schemas/SignalWithStartWorkflowExecutionRequest
output:
$ref: ../../bridge/sdk-core/crates/common/protos/api_upstream/openapi/openapiv3.yaml#/components/schemas/SignalWithStartWorkflowExecutionResponse
Loading