Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ Also see:
- [Activity Worker Shutdown](#activity-worker-shutdown)
- [Activity Concurrency and Executors](#activity-concurrency-and-executors)
- [Activity Testing](#activity-testing)
- [Nexus](#nexus)
- [Nexus in Workflows](#nexus-in-workflows)
- [Standalone Nexus Operations](#standalone-nexus-operations)
- [Telemetry](#telemetry)
- [Metrics](#metrics)
- [OpenTelemetry Tracing](#opentelemetry-tracing)
Expand Down Expand Up @@ -1092,6 +1095,101 @@ it will raise the error raised in the activity.
The constructor of the environment has multiple keyword arguments that can be set to affect the activity context for the
activity.

### Nexus

[Nexus](https://docs.temporal.io/nexus) is a feature of Temporal that allows calling across namespace and cluster
boundaries. Nexus operations can be invoked from within workflows or as standalone operations from client code.

**WARNING: Nexus support is experimental.**

#### Nexus in Workflows

Within a workflow, a Nexus client can be created and used to start or execute operations:

```ruby
class MyWorkflow < Temporalio::Workflow::Definition
def execute
client = Temporalio::Workflow.create_nexus_client(
endpoint: 'my-endpoint',
service: 'my-service'
)

# Execute a synchronous operation and wait for the result
result = client.execute_operation('my-operation', 'some-input',
schedule_to_close_timeout: 30)

# Or start an async operation and get a handle
handle = client.start_operation('my-async-operation', { key: 'value' },
schedule_to_close_timeout: 60)
result = handle.result
end
end
```

#### Standalone Nexus Operations

Nexus operations can also be started directly from client code, outside of workflows. This mirrors the workflow Nexus
API but operates via direct gRPC calls to the Temporal server.

Create a standalone Nexus client:

```ruby
nexus_client = client.create_nexus_client('my-endpoint', 'my-service')
```

Start an operation and wait for the result:

```ruby
result = nexus_client.execute_operation('my-operation', 'some-input',
id: SecureRandom.uuid,
schedule_to_close_timeout: 30)
```

Or start an operation and get a handle for async interaction:

```ruby
handle = nexus_client.start_operation('my-operation', 'some-input',
id: 'my-operation-id',
schedule_to_close_timeout: 60)

# Or with a generated ID
handle = nexus_client.start_operation('my-operation', 'some-input',
id: SecureRandom.uuid,
schedule_to_close_timeout: 60)

# Wait for the result
result = handle.result

# Describe the operation
description = handle.describe

# Cancel the operation
handle.cancel(reason: 'no longer needed')

# Terminate the operation
handle.terminate(reason: 'force stop')
```

Get a handle to an existing operation by ID:

```ruby
handle = client.nexus_operation_handle('my-operation-id')
handle.describe
```

List and count operations:

```ruby
# List all operations (auto-paginates)
client.list_nexus_operations.each { |op| puts op }

# List one page at a time
page = client.list_nexus_operation_page(page_size: 10)

# Count operations
count = client.count_nexus_operations
```

### Telemetry

#### Metrics
Expand Down
94 changes: 94 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
require 'temporalio/client/async_activity_handle'
require 'temporalio/client/connection'
require 'temporalio/client/interceptor'
require 'temporalio/client/nexus_client'
require 'temporalio/client/nexus_operation_execution_count'
require 'temporalio/client/nexus_operation_handle'
require 'temporalio/client/plugin'
require 'temporalio/client/schedule'
require 'temporalio/client/schedule_handle'
Expand Down Expand Up @@ -52,6 +55,18 @@ class Client
# Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
class Options; end # rubocop:disable Lint/EmptyClass

ListNexusOperationPage = Data.define(:operations, :next_page_token)

# A page of Nexus operations returned by {Client#list_nexus_operation_page}.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @!attribute operations
# @return [Array] List of Nexus operation executions in this page.
# @!attribute next_page_token
# @return [String, nil] Token for the next page of results. nil if there are no more results.
class ListNexusOperationPage; end # rubocop:disable Lint/EmptyClass

ListWorkflowPage = Data.define(:executions, :next_page_token)

# A page of workflow executions returned by {Client#list_workflow_page}.
Expand Down Expand Up @@ -717,6 +732,85 @@ def list_schedules(query = nil, rpc_options: nil)
@impl.list_schedules(Interceptor::ListSchedulesInput.new(query:, rpc_options:))
end

# Create a Nexus client for the given endpoint and service.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param endpoint [String] Endpoint name.
# @param service [String] Service name.
# @return [NexusClient] Nexus client.
def create_nexus_client(endpoint, service)
NexusClient.new(client: self, endpoint:, service:)
end

# List Nexus operations.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Enumerator] Enumerable Nexus operations.
# @raise [Error::RPCError] RPC error from call.
def list_nexus_operations(query = nil, rpc_options: nil)
next_page_token = nil
Enumerator.new do |yielder|
loop do
page = @impl.list_nexus_operation_page(Interceptor::ListNexusOperationPageInput.new(
query:,
rpc_options:,
next_page_token:,
page_size: nil
))
page.operations.each { |op| yielder << op }
next_page_token = page.next_page_token
break if (next_page_token || '').empty?
end
end
end

# List Nexus operations one page at a time.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param page_size [Integer, nil] Maximum number of results to return.
# @param next_page_token [String, nil] Token for the next page of results.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [ListNexusOperationPage] Page of Nexus operations.
# @raise [Error::RPCError] RPC error from call.
def list_nexus_operation_page(query = nil, page_size: nil, next_page_token: nil, rpc_options: nil)
@impl.list_nexus_operation_page(Interceptor::ListNexusOperationPageInput.new(query:,
next_page_token:,
page_size:,
rpc_options:))
end

# Count Nexus operations.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [NexusOperationExecutionCount] Count of Nexus operations.
# @raise [Error::RPCError] RPC error from call.
def count_nexus_operations(query = nil, rpc_options: nil)
@impl.count_nexus_operations(Interceptor::CountNexusOperationsInput.new(query:, rpc_options:))
end

# Get a handle to an existing standalone Nexus operation.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param operation_id [String] Operation ID.
# @param run_id [String, nil] Operation run ID.
# @return [NexusOperationHandle] The operation handle.
def nexus_operation_handle(operation_id, run_id: nil)
NexusOperationHandle.new(client: self, operation_id:, run_id:)
end

# Get an async activity handle.
#
# @param task_token_or_id_reference [String, ActivityIDReference] Task token string or activity ID reference.
Expand Down
144 changes: 144 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,83 @@ def intercept_client(next_interceptor)
:rpc_options
)

# Input for {Outbound.start_nexus_operation}.
#
# WARNING: Standalone Nexus operations are experimental.
StartNexusOperationInput = Data.define(
:endpoint,
:service,
:operation,
:input,
:operation_id,
:schedule_to_close_timeout,
:schedule_to_start_timeout,
:start_to_close_timeout,
:id_reuse_policy,
:id_conflict_policy,
:search_attributes,
:nexus_header,
:static_summary,
:headers,
:rpc_options
)

# Input for {Outbound.describe_nexus_operation}.
#
# WARNING: Standalone Nexus operations are experimental.
DescribeNexusOperationInput = Data.define(
:operation_id,
:run_id,
:rpc_options
)

# Input for {Outbound.cancel_nexus_operation}.
#
# WARNING: Standalone Nexus operations are experimental.
CancelNexusOperationInput = Data.define(
:operation_id,
:run_id,
:reason,
:rpc_options
)

# Input for {Outbound.terminate_nexus_operation}.
#
# WARNING: Standalone Nexus operations are experimental.
TerminateNexusOperationInput = Data.define(
:operation_id,
:run_id,
:reason,
:rpc_options
)

# Input for {Outbound.poll_nexus_operation}.
#
# WARNING: Standalone Nexus operations are experimental.
PollNexusOperationInput = Data.define(
:operation_id,
:run_id,
:rpc_options
)

# Input for {Outbound.list_nexus_operation_page}.
#
# WARNING: Standalone Nexus operations are experimental.
ListNexusOperationPageInput = Data.define(
:query,
:next_page_token,
:page_size,
:rpc_options
)

# Input for {Outbound.count_nexus_operations}.
#
# WARNING: Standalone Nexus operations are experimental.
CountNexusOperationsInput = Data.define(
:query,
:rpc_options
)

# Outbound interceptor for intercepting client calls. This should be extended by users needing to intercept client
# actions.
class Outbound
Expand Down Expand Up @@ -467,6 +544,73 @@ def fail_async_activity(input)
def report_cancellation_async_activity(input)
next_interceptor.report_cancellation_async_activity(input)
end

# Called for every {NexusClient.start_operation} and {NexusClient.execute_operation} call.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [StartNexusOperationInput] Input.
# @return [NexusOperationHandle] Nexus operation handle.
def start_nexus_operation(input)
next_interceptor.start_nexus_operation(input)
end

# Called for every {NexusOperationHandle.describe} call.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [DescribeNexusOperationInput] Input.
def describe_nexus_operation(input)
next_interceptor.describe_nexus_operation(input)
end

# Called for every {NexusOperationHandle.cancel} call.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [CancelNexusOperationInput] Input.
def cancel_nexus_operation(input)
next_interceptor.cancel_nexus_operation(input)
end

# Called for every {NexusOperationHandle.terminate} call.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [TerminateNexusOperationInput] Input.
def terminate_nexus_operation(input)
next_interceptor.terminate_nexus_operation(input)
end

# Called for every {Client.list_nexus_operation_page} call.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [ListNexusOperationPageInput] Input.
# @return [Client::ListNexusOperationPage] Page of Nexus operations.
def list_nexus_operation_page(input)
next_interceptor.list_nexus_operation_page(input)
end

# Called for every {Client.count_nexus_operations} call.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [CountNexusOperationsInput] Input.
# @return [NexusOperationExecutionCount] Count.
def count_nexus_operations(input)
next_interceptor.count_nexus_operations(input)
end

# Called when polling for Nexus operation result.
#
# WARNING: Standalone Nexus operations are experimental.
#
# @param input [PollNexusOperationInput] Input.
# @return [Object, nil] Result of the operation.
def poll_nexus_operation(input)
next_interceptor.poll_nexus_operation(input)
end
end
end
end
Expand Down
Loading
Loading