diff --git a/README.md b/README.md index ff834d64..0bf11de3 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,9 @@ Also see: - [Activity Worker Shutdown](#activity-worker-shutdown) - [Activity Concurrency and Executors](#activity-concurrency-and-executors) - [Activity Testing](#activity-testing) + - [Nexus](#nexus) + - [Nexus in Workflows](#nexus-in-workflows) + - [Standalone Nexus Operations](#standalone-nexus-operations) - [Telemetry](#telemetry) - [Metrics](#metrics) - [OpenTelemetry Tracing](#opentelemetry-tracing) @@ -1092,6 +1095,101 @@ it will raise the error raised in the activity. The constructor of the environment has multiple keyword arguments that can be set to affect the activity context for the activity. +### Nexus + +[Nexus](https://docs.temporal.io/nexus) is a feature of Temporal that allows calling across namespace and cluster +boundaries. Nexus operations can be invoked from within workflows or as standalone operations from client code. + +**WARNING: Nexus support is experimental.** + +#### Nexus in Workflows + +Within a workflow, a Nexus client can be created and used to start or execute operations: + +```ruby +class MyWorkflow < Temporalio::Workflow::Definition + def execute + client = Temporalio::Workflow.create_nexus_client( + endpoint: 'my-endpoint', + service: 'my-service' + ) + + # Execute a synchronous operation and wait for the result + result = client.execute_operation('my-operation', 'some-input', + schedule_to_close_timeout: 30) + + # Or start an async operation and get a handle + handle = client.start_operation('my-async-operation', { key: 'value' }, + schedule_to_close_timeout: 60) + result = handle.result + end +end +``` + +#### Standalone Nexus Operations + +Nexus operations can also be started directly from client code, outside of workflows. This mirrors the workflow Nexus +API but operates via direct gRPC calls to the Temporal server. + +Create a standalone Nexus client: + +```ruby +nexus_client = client.create_nexus_client('my-endpoint', 'my-service') +``` + +Start an operation and wait for the result: + +```ruby +result = nexus_client.execute_operation('my-operation', 'some-input', + id: SecureRandom.uuid, + schedule_to_close_timeout: 30) +``` + +Or start an operation and get a handle for async interaction: + +```ruby +handle = nexus_client.start_operation('my-operation', 'some-input', + id: 'my-operation-id', + schedule_to_close_timeout: 60) + +# Or with a generated ID +handle = nexus_client.start_operation('my-operation', 'some-input', + id: SecureRandom.uuid, + schedule_to_close_timeout: 60) + +# Wait for the result +result = handle.result + +# Describe the operation +description = handle.describe + +# Cancel the operation +handle.cancel(reason: 'no longer needed') + +# Terminate the operation +handle.terminate(reason: 'force stop') +``` + +Get a handle to an existing operation by ID: + +```ruby +handle = client.nexus_operation_handle('my-operation-id') +handle.describe +``` + +List and count operations: + +```ruby +# List all operations (auto-paginates) +client.list_nexus_operations.each { |op| puts op } + +# List one page at a time +page = client.list_nexus_operation_page(page_size: 10) + +# Count operations +count = client.count_nexus_operations +``` + ### Telemetry #### Metrics diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index 5c8f148e..d8785584 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -6,6 +6,9 @@ require 'temporalio/client/async_activity_handle' require 'temporalio/client/connection' require 'temporalio/client/interceptor' +require 'temporalio/client/nexus_client' +require 'temporalio/client/nexus_operation_execution_count' +require 'temporalio/client/nexus_operation_handle' require 'temporalio/client/plugin' require 'temporalio/client/schedule' require 'temporalio/client/schedule_handle' @@ -52,6 +55,18 @@ class Client # Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details. class Options; end # rubocop:disable Lint/EmptyClass + ListNexusOperationPage = Data.define(:operations, :next_page_token) + + # A page of Nexus operations returned by {Client#list_nexus_operation_page}. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @!attribute operations + # @return [Array] List of Nexus operation executions in this page. + # @!attribute next_page_token + # @return [String, nil] Token for the next page of results. nil if there are no more results. + class ListNexusOperationPage; end # rubocop:disable Lint/EmptyClass + ListWorkflowPage = Data.define(:executions, :next_page_token) # A page of workflow executions returned by {Client#list_workflow_page}. @@ -717,6 +732,85 @@ def list_schedules(query = nil, rpc_options: nil) @impl.list_schedules(Interceptor::ListSchedulesInput.new(query:, rpc_options:)) end + # Create a Nexus client for the given endpoint and service. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param endpoint [String] Endpoint name. + # @param service [String] Service name. + # @return [NexusClient] Nexus client. + def create_nexus_client(endpoint, service) + NexusClient.new(client: self, endpoint:, service:) + end + + # List Nexus operations. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param query [String, nil] A Temporal visibility list filter. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [Enumerator] Enumerable Nexus operations. + # @raise [Error::RPCError] RPC error from call. + def list_nexus_operations(query = nil, rpc_options: nil) + next_page_token = nil + Enumerator.new do |yielder| + loop do + page = @impl.list_nexus_operation_page(Interceptor::ListNexusOperationPageInput.new( + query:, + rpc_options:, + next_page_token:, + page_size: nil + )) + page.operations.each { |op| yielder << op } + next_page_token = page.next_page_token + break if (next_page_token || '').empty? + end + end + end + + # List Nexus operations one page at a time. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param query [String, nil] A Temporal visibility list filter. + # @param page_size [Integer, nil] Maximum number of results to return. + # @param next_page_token [String, nil] Token for the next page of results. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [ListNexusOperationPage] Page of Nexus operations. + # @raise [Error::RPCError] RPC error from call. + def list_nexus_operation_page(query = nil, page_size: nil, next_page_token: nil, rpc_options: nil) + @impl.list_nexus_operation_page(Interceptor::ListNexusOperationPageInput.new(query:, + next_page_token:, + page_size:, + rpc_options:)) + end + + # Count Nexus operations. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param query [String, nil] A Temporal visibility list filter. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [NexusOperationExecutionCount] Count of Nexus operations. + # @raise [Error::RPCError] RPC error from call. + def count_nexus_operations(query = nil, rpc_options: nil) + @impl.count_nexus_operations(Interceptor::CountNexusOperationsInput.new(query:, rpc_options:)) + end + + # Get a handle to an existing standalone Nexus operation. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param operation_id [String] Operation ID. + # @param run_id [String, nil] Operation run ID. + # @return [NexusOperationHandle] The operation handle. + def nexus_operation_handle(operation_id, run_id: nil) + NexusOperationHandle.new(client: self, operation_id:, run_id:) + end + # Get an async activity handle. # # @param task_token_or_id_reference [String, ActivityIDReference] Task token string or activity ID reference. diff --git a/temporalio/lib/temporalio/client/interceptor.rb b/temporalio/lib/temporalio/client/interceptor.rb index 361acd1c..0a3d3599 100644 --- a/temporalio/lib/temporalio/client/interceptor.rb +++ b/temporalio/lib/temporalio/client/interceptor.rb @@ -259,6 +259,83 @@ def intercept_client(next_interceptor) :rpc_options ) + # Input for {Outbound.start_nexus_operation}. + # + # WARNING: Standalone Nexus operations are experimental. + StartNexusOperationInput = Data.define( + :endpoint, + :service, + :operation, + :input, + :operation_id, + :schedule_to_close_timeout, + :schedule_to_start_timeout, + :start_to_close_timeout, + :id_reuse_policy, + :id_conflict_policy, + :search_attributes, + :nexus_header, + :static_summary, + :headers, + :rpc_options + ) + + # Input for {Outbound.describe_nexus_operation}. + # + # WARNING: Standalone Nexus operations are experimental. + DescribeNexusOperationInput = Data.define( + :operation_id, + :run_id, + :rpc_options + ) + + # Input for {Outbound.cancel_nexus_operation}. + # + # WARNING: Standalone Nexus operations are experimental. + CancelNexusOperationInput = Data.define( + :operation_id, + :run_id, + :reason, + :rpc_options + ) + + # Input for {Outbound.terminate_nexus_operation}. + # + # WARNING: Standalone Nexus operations are experimental. + TerminateNexusOperationInput = Data.define( + :operation_id, + :run_id, + :reason, + :rpc_options + ) + + # Input for {Outbound.poll_nexus_operation}. + # + # WARNING: Standalone Nexus operations are experimental. + PollNexusOperationInput = Data.define( + :operation_id, + :run_id, + :rpc_options + ) + + # Input for {Outbound.list_nexus_operation_page}. + # + # WARNING: Standalone Nexus operations are experimental. + ListNexusOperationPageInput = Data.define( + :query, + :next_page_token, + :page_size, + :rpc_options + ) + + # Input for {Outbound.count_nexus_operations}. + # + # WARNING: Standalone Nexus operations are experimental. + CountNexusOperationsInput = Data.define( + :query, + :rpc_options + ) + # Outbound interceptor for intercepting client calls. This should be extended by users needing to intercept client # actions. class Outbound @@ -467,6 +544,73 @@ def fail_async_activity(input) def report_cancellation_async_activity(input) next_interceptor.report_cancellation_async_activity(input) end + + # Called for every {NexusClient.start_operation} and {NexusClient.execute_operation} call. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [StartNexusOperationInput] Input. + # @return [NexusOperationHandle] Nexus operation handle. + def start_nexus_operation(input) + next_interceptor.start_nexus_operation(input) + end + + # Called for every {NexusOperationHandle.describe} call. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [DescribeNexusOperationInput] Input. + def describe_nexus_operation(input) + next_interceptor.describe_nexus_operation(input) + end + + # Called for every {NexusOperationHandle.cancel} call. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [CancelNexusOperationInput] Input. + def cancel_nexus_operation(input) + next_interceptor.cancel_nexus_operation(input) + end + + # Called for every {NexusOperationHandle.terminate} call. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [TerminateNexusOperationInput] Input. + def terminate_nexus_operation(input) + next_interceptor.terminate_nexus_operation(input) + end + + # Called for every {Client.list_nexus_operation_page} call. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [ListNexusOperationPageInput] Input. + # @return [Client::ListNexusOperationPage] Page of Nexus operations. + def list_nexus_operation_page(input) + next_interceptor.list_nexus_operation_page(input) + end + + # Called for every {Client.count_nexus_operations} call. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [CountNexusOperationsInput] Input. + # @return [NexusOperationExecutionCount] Count. + def count_nexus_operations(input) + next_interceptor.count_nexus_operations(input) + end + + # Called when polling for Nexus operation result. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param input [PollNexusOperationInput] Input. + # @return [Object, nil] Result of the operation. + def poll_nexus_operation(input) + next_interceptor.poll_nexus_operation(input) + end end end end diff --git a/temporalio/lib/temporalio/client/nexus_client.rb b/temporalio/lib/temporalio/client/nexus_client.rb new file mode 100644 index 00000000..5d73cd95 --- /dev/null +++ b/temporalio/lib/temporalio/client/nexus_client.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +require 'securerandom' + +module Temporalio + class Client + # Client for starting and managing standalone Nexus operations. + # + # This is created via {Client.create_nexus_client}, it is never instantiated directly. + # + # WARNING: Standalone Nexus operations are experimental. + class NexusClient + # @return [String] Endpoint name for this client. + attr_reader :endpoint + + # @return [String] Service name for this client. + attr_reader :service + + # @!visibility private + def initialize(client:, endpoint:, service:) + @client = client + @endpoint = endpoint + @service = service + end + + # Start a Nexus operation and return a handle. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param operation [String, Symbol] Operation name. + # @param input [Object, nil] Input for the operation. + # @param id [String] Unique identifier for the operation. + # @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds. + # @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the + # operation has not started within this window, a SCHEDULE_TO_START timeout error is raised. + # @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has + # started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised. + # @param id_reuse_policy [NexusOperationIDReusePolicy] How already-existing IDs are treated. + # @param id_conflict_policy [NexusOperationIDConflictPolicy] How already-running operations of the same ID are + # treated. + # @param search_attributes [SearchAttributes, nil] Search attributes for the operation. + # @param nexus_header [Hash{String => String}] Headers to attach to the Nexus request. + # @param static_summary [String, nil] Summary for the operation (appears in UI/CLI). + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [NexusOperationHandle] Handle to the started operation. + # @raise [Error::NexusOperationAlreadyStartedError] Operation already exists. + # @raise [Error::RPCError] RPC error from call. + def start_operation( + operation, + input = nil, + id:, + schedule_to_close_timeout: nil, + schedule_to_start_timeout: nil, + start_to_close_timeout: nil, + id_reuse_policy: NexusOperationIDReusePolicy::ALLOW_DUPLICATE, + id_conflict_policy: NexusOperationIDConflictPolicy::FAIL, + search_attributes: nil, + nexus_header: {}, + static_summary: nil, + rpc_options: nil + ) + @client._impl.start_nexus_operation( + Interceptor::StartNexusOperationInput.new( + endpoint:, + service:, + operation: operation.to_s, + input:, + operation_id: id, + schedule_to_close_timeout:, + schedule_to_start_timeout:, + start_to_close_timeout:, + id_reuse_policy:, + id_conflict_policy:, + search_attributes:, + nexus_header:, + static_summary:, + headers: {}, + rpc_options: + ) + ) + end + + # Start a Nexus operation and wait for its result. This is a shortcut for {start_operation} + + # {NexusOperationHandle.result}. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param operation [String, Symbol] Operation name. + # @param input [Object, nil] Input for the operation. + # @param id [String] Unique identifier for the operation. + # @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds. + # @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. + # @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after started. + # @param id_reuse_policy [NexusOperationIDReusePolicy] How already-existing IDs are treated. + # @param id_conflict_policy [NexusOperationIDConflictPolicy] How already-running operations of the same ID are + # treated. + # @param search_attributes [SearchAttributes, nil] Search attributes for the operation. + # @param nexus_header [Hash{String => String}] Headers to attach to the Nexus request. + # @param static_summary [String, nil] Summary for the operation (appears in UI/CLI). + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # + # @return [Object, nil] Result of the operation. + # @raise [Error::NexusOperationAlreadyStartedError] Operation already exists. + # @raise [Error::NexusOperationFailedError] Operation failed. + # @raise [Error::RPCError] RPC error from call. + def execute_operation( + operation, + input = nil, + id:, + schedule_to_close_timeout: nil, + schedule_to_start_timeout: nil, + start_to_close_timeout: nil, + id_reuse_policy: NexusOperationIDReusePolicy::ALLOW_DUPLICATE, + id_conflict_policy: NexusOperationIDConflictPolicy::FAIL, + search_attributes: nil, + nexus_header: {}, + static_summary: nil, + rpc_options: nil + ) + start_operation( + operation, + input, + id:, + schedule_to_close_timeout:, + schedule_to_start_timeout:, + start_to_close_timeout:, + id_reuse_policy:, + id_conflict_policy:, + search_attributes:, + nexus_header:, + static_summary:, + rpc_options: + ).result(rpc_options:) + end + end + end +end diff --git a/temporalio/lib/temporalio/client/nexus_operation_execution_count.rb b/temporalio/lib/temporalio/client/nexus_operation_execution_count.rb new file mode 100644 index 00000000..6debb55d --- /dev/null +++ b/temporalio/lib/temporalio/client/nexus_operation_execution_count.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module Temporalio + class Client + # Representation of a count from a count Nexus operations call. + # + # WARNING: Standalone Nexus operations are experimental. + class NexusOperationExecutionCount + # @return [Integer] Approximate number of operations matching the original query. If the query had a group-by + # clause, this is simply the sum of all the counts in {groups}. + attr_reader :count + + # @return [Array] Groups if the query had a group-by clause, or empty if not. + attr_reader :groups + + # @!visibility private + def initialize(count, groups) + @count = count + @groups = groups + end + + # Aggregation group if the operation count query had a group-by clause. + class AggregationGroup + # @return [Integer] Approximate number of operations matching the original query for this group. + attr_reader :count + + # @return [Array] Search attribute values for this group. + attr_reader :group_values + + # @!visibility private + def initialize(count, group_values) + @count = count + @group_values = group_values + end + end + end + end +end diff --git a/temporalio/lib/temporalio/client/nexus_operation_handle.rb b/temporalio/lib/temporalio/client/nexus_operation_handle.rb new file mode 100644 index 00000000..fbb8fd57 --- /dev/null +++ b/temporalio/lib/temporalio/client/nexus_operation_handle.rb @@ -0,0 +1,97 @@ +# frozen_string_literal: true + +module Temporalio + class Client + # Handle for a standalone Nexus operation to perform actions on. + # + # This is created via {NexusClient#start_operation} or {Client#nexus_operation_handle}, it is never instantiated + # directly. + # + # WARNING: Standalone Nexus operations are experimental. + class NexusOperationHandle + # @return [String] Operation ID. + attr_reader :operation_id + + # @return [String, nil] Operation run ID if known. + attr_reader :run_id + + # @!visibility private + def initialize(client:, operation_id:, run_id:) + @client = client + @operation_id = operation_id + @run_id = run_id + end + + # Wait for the result of the operation. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @return [Object, nil] Result of the operation. + # @raise [Error::NexusOperationFailedError] Operation failed. + # @raise [Error::RPCError] RPC error from call. + def result(rpc_options: nil) + @client._impl.poll_nexus_operation( + Interceptor::PollNexusOperationInput.new( + operation_id:, + run_id:, + rpc_options: + ) + ) + end + + # Describe this operation. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @return [Api::WorkflowService::V1::DescribeNexusOperationExecutionResponse] Operation description. + # @raise [Error::RPCError] RPC error from call. + def describe(rpc_options: nil) + @client._impl.describe_nexus_operation( + Interceptor::DescribeNexusOperationInput.new( + operation_id:, + run_id:, + rpc_options: + ) + ) + end + + # Request cancellation of this operation. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param reason [String, nil] Reason for cancellation. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @raise [Error::RPCError] RPC error from call. + def cancel(reason: nil, rpc_options: nil) + @client._impl.cancel_nexus_operation( + Interceptor::CancelNexusOperationInput.new( + operation_id:, + run_id:, + reason:, + rpc_options: + ) + ) + end + + # Terminate this operation. + # + # WARNING: Standalone Nexus operations are experimental. + # + # @param reason [String, nil] Reason for termination. + # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @raise [Error::RPCError] RPC error from call. + def terminate(reason: nil, rpc_options: nil) + @client._impl.terminate_nexus_operation( + Interceptor::TerminateNexusOperationInput.new( + operation_id:, + run_id:, + reason:, + rpc_options: + ) + ) + end + end + end +end diff --git a/temporalio/lib/temporalio/common_enums.rb b/temporalio/lib/temporalio/common_enums.rb index e6bdd6a2..01d97cf3 100644 --- a/temporalio/lib/temporalio/common_enums.rb +++ b/temporalio/lib/temporalio/common_enums.rb @@ -84,6 +84,36 @@ module SuggestContinueAsNewReason Api::Enums::V1::SuggestContinueAsNewReason::SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES end + # How already-in-use Nexus operation IDs are handled on start. + # + # WARNING: Standalone Nexus operations are experimental. + module NexusOperationIDReusePolicy + # Allow starting a Nexus operation using the same operation ID. + ALLOW_DUPLICATE = + Api::Enums::V1::NexusOperationIdReusePolicy::NEXUS_OPERATION_ID_REUSE_POLICY_ALLOW_DUPLICATE + # Allow starting a Nexus operation using the same operation ID, only when the last operation's final state is one + # of failed, canceled, terminated, or timed out. + ALLOW_DUPLICATE_FAILED_ONLY = + Api::Enums::V1::NexusOperationIdReusePolicy::NEXUS_OPERATION_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY + # Do not permit re-use of the operation ID. + REJECT_DUPLICATE = + Api::Enums::V1::NexusOperationIdReusePolicy::NEXUS_OPERATION_ID_REUSE_POLICY_REJECT_DUPLICATE + end + + # How already-running Nexus operations of the same ID are handled on start. + # + # WARNING: Standalone Nexus operations are experimental. + module NexusOperationIDConflictPolicy + # Unset. + UNSPECIFIED = + Api::Enums::V1::NexusOperationIdConflictPolicy::NEXUS_OPERATION_ID_CONFLICT_POLICY_UNSPECIFIED + # Don't start a new operation, instead fail with already-started error. + FAIL = Api::Enums::V1::NexusOperationIdConflictPolicy::NEXUS_OPERATION_ID_CONFLICT_POLICY_FAIL + # Don't start a new operation, instead return a handle for the running operation. + USE_EXISTING = + Api::Enums::V1::NexusOperationIdConflictPolicy::NEXUS_OPERATION_ID_CONFLICT_POLICY_USE_EXISTING + end + # Specifies when a workflow might move from a worker of one Build Id to another. module VersioningBehavior # Unspecified versioning behavior. By default, workers opting into worker versioning will diff --git a/temporalio/lib/temporalio/error.rb b/temporalio/lib/temporalio/error.rb index 0170990a..1e12d571 100644 --- a/temporalio/lib/temporalio/error.rb +++ b/temporalio/lib/temporalio/error.rb @@ -107,6 +107,42 @@ def initialize(details) end end + # Error when a Nexus operation has already been started with the same ID. + # + # WARNING: Standalone Nexus operations are experimental. + class NexusOperationAlreadyStartedError < Error + # @return [String] Operation ID that was already started. + attr_reader :operation_id + + # @return [String] Run ID of the already-started operation. + attr_reader :run_id + + # @!visibility private + def initialize(operation_id:, run_id:) + super("Nexus operation already started: #{operation_id}") + @operation_id = operation_id + @run_id = run_id + end + end + + # Error when a standalone Nexus operation fails. + # + # WARNING: Standalone Nexus operations are experimental. + class NexusOperationFailedError < Error + # @return [String] Operation ID. + attr_reader :operation_id + + # @return [String, nil] Operation run ID. + attr_reader :run_id + + # @!visibility private + def initialize(operation_id:, run_id:) + super('Nexus operation failed') + @operation_id = operation_id + @run_id = run_id + end + end + # Error raised by a client for a general RPC failure. class RPCError < Error # @return [Code] Status code for the error. diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index dcc0ae08..c94e33c4 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -21,6 +21,7 @@ require 'temporalio/internal/proto_utils' require 'temporalio/runtime' require 'temporalio/search_attributes' +require 'temporalio/client/nexus_operation_handle' require 'temporalio/workflow/definition' module Temporalio @@ -929,6 +930,169 @@ def report_cancellation_async_activity(input) end nil end + + def start_nexus_operation(input) + req = Api::WorkflowService::V1::StartNexusOperationExecutionRequest.new( + namespace: @client.namespace, + identity: @client.connection.identity, + request_id: SecureRandom.uuid, + operation_id: input.operation_id, + endpoint: input.endpoint, + service: input.service, + operation: input.operation, + schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), + schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout), + start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout), + id_reuse_policy: input.id_reuse_policy, + id_conflict_policy: input.id_conflict_policy, + search_attributes: input.search_attributes&._to_proto, + nexus_header: input.nexus_header, + user_metadata: ProtoUtils.to_user_metadata(input.static_summary, nil, @client.data_converter) + ) + req.input = @client.data_converter.to_payload(input.input) unless input.input.nil? + + begin + resp = @client.workflow_service.start_nexus_operation_execution( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + rescue Error::RPCError => e + if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first + details = e.grpc_status.details.first.unpack( + Api::ErrorDetails::V1::NexusOperationExecutionAlreadyStartedFailure + ) + if details + raise Error::NexusOperationAlreadyStartedError.new( + operation_id: input.operation_id, + run_id: details.run_id + ) + end + end + raise + end + + Temporalio::Client::NexusOperationHandle.new( + client: @client, + operation_id: input.operation_id, + run_id: resp.run_id + ) + end + + def describe_nexus_operation(input) + req = Api::WorkflowService::V1::DescribeNexusOperationExecutionRequest.new( + namespace: @client.namespace, + operation_id: input.operation_id, + run_id: input.run_id || '' + ) + @client.workflow_service.describe_nexus_operation_execution( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + end + + def cancel_nexus_operation(input) + @client.workflow_service.request_cancel_nexus_operation_execution( + Api::WorkflowService::V1::RequestCancelNexusOperationExecutionRequest.new( + namespace: @client.namespace, + operation_id: input.operation_id, + run_id: input.run_id || '', + identity: @client.connection.identity, + request_id: SecureRandom.uuid, + reason: input.reason || '' + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + nil + end + + def terminate_nexus_operation(input) + @client.workflow_service.terminate_nexus_operation_execution( + Api::WorkflowService::V1::TerminateNexusOperationExecutionRequest.new( + namespace: @client.namespace, + operation_id: input.operation_id, + run_id: input.run_id || '', + identity: @client.connection.identity, + request_id: SecureRandom.uuid, + reason: input.reason || '' + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + nil + end + + def list_nexus_operation_page(input) + req = Api::WorkflowService::V1::ListNexusOperationExecutionsRequest.new( + namespace: @client.namespace, + query: input.query || '', + next_page_token: input.next_page_token, + page_size: input.page_size + ) + resp = @client.workflow_service.list_nexus_operation_executions( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + Temporalio::Client::ListNexusOperationPage.new( + operations: resp.operations.to_a, + next_page_token: resp.next_page_token + ) + end + + def count_nexus_operations(input) + resp = @client.workflow_service.count_nexus_operation_executions( + Api::WorkflowService::V1::CountNexusOperationExecutionsRequest.new( + namespace: @client.namespace, + query: input.query || '' + ), + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + Temporalio::Client::NexusOperationExecutionCount.new( + resp.count, + resp.groups.map do |group| + Temporalio::Client::NexusOperationExecutionCount::AggregationGroup.new( + group.count, + group.group_values.map { |payload| SearchAttributes._value_from_payload(payload) } + ) + end + ) + end + + def poll_nexus_operation(input) + req = Api::WorkflowService::V1::PollNexusOperationExecutionRequest.new( + namespace: @client.namespace, + operation_id: input.operation_id, + run_id: input.run_id || '', + wait_stage: Api::Enums::V1::NexusOperationWaitStage::NEXUS_OPERATION_WAIT_STAGE_CLOSED + ) + + loop do + resp = @client.workflow_service.poll_nexus_operation_execution( + req, + rpc_options: Implementation.with_default_rpc_options(input.rpc_options) + ) + + # Check which outcome field is set via the oneof + case resp.outcome + when :result + return @client.data_converter.from_payload(resp.result) + when :failure + cause = @client.data_converter.failure_converter.from_failure(resp.failure, @client.data_converter) + err = Error::NexusOperationFailedError.new( + operation_id: input.operation_id, + run_id: resp.run_id + ) + raise Error._with_backtrace_and_cause(err, backtrace: nil, cause: cause) + when nil + # No outcome yet, continue polling + else + raise "Unknown Nexus operation outcome: #{resp.outcome}" + end + rescue Error::RPCError => e + # Retry on deadline exceeded (long-poll timeout), propagate all others including cancellation + raise unless e.code == Error::RPCError::Code::DEADLINE_EXCEEDED + rescue Error::CanceledError + raise + end + end end end end diff --git a/temporalio/sig/temporalio/client.rbs b/temporalio/sig/temporalio/client.rbs index fd7e6761..6e1c6063 100644 --- a/temporalio/sig/temporalio/client.rbs +++ b/temporalio/sig/temporalio/client.rbs @@ -24,6 +24,30 @@ module Temporalio def with: (**untyped) -> Options end + class ListNexusOperationPage + attr_reader operations: Array[untyped] + attr_reader next_page_token: String? + + def initialize: ( + operations: Array[untyped], + next_page_token: String? + ) -> void + end + + class NexusOperationExecutionCount + attr_reader count: Integer + attr_reader groups: Array[NexusOperationExecutionCount::AggregationGroup] + + def initialize: (Integer count, Array[NexusOperationExecutionCount::AggregationGroup] groups) -> void + + class AggregationGroup + attr_reader count: Integer + attr_reader group_values: Array[Object?] + + def initialize: (Integer count, Array[Object?] group_values) -> void + end + end + class ListWorkflowPage attr_reader executions: Array[WorkflowExecution] attr_reader next_page_token: String? @@ -193,6 +217,33 @@ module Temporalio ?rpc_options: RPCOptions? ) -> Enumerator[Schedule::List::Description, Schedule::List::Description] + def create_nexus_client: ( + String endpoint, + String service + ) -> NexusClient + + def nexus_operation_handle: ( + String operation_id, + ?run_id: String? + ) -> NexusOperationHandle + + def list_nexus_operations: ( + ?String? query, + ?rpc_options: RPCOptions? + ) -> Enumerator[untyped, untyped] + + def list_nexus_operation_page: ( + ?String? query, + ?next_page_token: String?, + ?page_size: Integer?, + ?rpc_options: RPCOptions? + ) -> ListNexusOperationPage + + def count_nexus_operations: ( + ?String? query, + ?rpc_options: RPCOptions? + ) -> NexusOperationExecutionCount + def async_activity_handle: ( String | ActivityIDReference task_token_or_id_reference ) -> AsyncActivityHandle diff --git a/temporalio/sig/temporalio/client/interceptor.rbs b/temporalio/sig/temporalio/client/interceptor.rbs index 2d0aee02..136e9f22 100644 --- a/temporalio/sig/temporalio/client/interceptor.rbs +++ b/temporalio/sig/temporalio/client/interceptor.rbs @@ -432,6 +432,118 @@ module Temporalio ) -> void end + class StartNexusOperationInput + attr_reader endpoint: String + attr_reader service: String + attr_reader operation: String + attr_reader input: Object? + attr_reader operation_id: String + attr_reader schedule_to_close_timeout: Float? + attr_reader schedule_to_start_timeout: Float? + attr_reader start_to_close_timeout: Float? + attr_reader id_reuse_policy: Integer + attr_reader id_conflict_policy: Integer + attr_reader search_attributes: SearchAttributes? + attr_reader nexus_header: Hash[String, String] + attr_reader static_summary: String? + attr_reader headers: Hash[String, Object?] + attr_reader rpc_options: RPCOptions? + + def initialize: ( + endpoint: String, + service: String, + operation: String, + input: Object?, + operation_id: String, + schedule_to_close_timeout: Float?, + schedule_to_start_timeout: Float?, + start_to_close_timeout: Float?, + id_reuse_policy: Integer, + id_conflict_policy: Integer, + search_attributes: SearchAttributes?, + nexus_header: Hash[String, String], + static_summary: String?, + headers: Hash[String, Object?], + rpc_options: RPCOptions? + ) -> void + end + + class DescribeNexusOperationInput + attr_reader operation_id: String + attr_reader run_id: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + operation_id: String, + run_id: String?, + rpc_options: RPCOptions? + ) -> void + end + + class CancelNexusOperationInput + attr_reader operation_id: String + attr_reader run_id: String? + attr_reader reason: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + operation_id: String, + run_id: String?, + reason: String?, + rpc_options: RPCOptions? + ) -> void + end + + class TerminateNexusOperationInput + attr_reader operation_id: String + attr_reader run_id: String? + attr_reader reason: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + operation_id: String, + run_id: String?, + reason: String?, + rpc_options: RPCOptions? + ) -> void + end + + class PollNexusOperationInput + attr_reader operation_id: String + attr_reader run_id: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + operation_id: String, + run_id: String?, + rpc_options: RPCOptions? + ) -> void + end + + class ListNexusOperationPageInput + attr_reader query: String? + attr_reader next_page_token: String? + attr_reader page_size: Integer? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + query: String?, + next_page_token: String?, + page_size: Integer?, + rpc_options: RPCOptions? + ) -> void + end + + class CountNexusOperationsInput + attr_reader query: String? + attr_reader rpc_options: RPCOptions? + + def initialize: ( + query: String?, + rpc_options: RPCOptions? + ) -> void + end + class Outbound attr_reader next_interceptor: Outbound @@ -488,6 +600,20 @@ module Temporalio def fail_async_activity: (FailAsyncActivityInput input) -> void def report_cancellation_async_activity: (ReportCancellationAsyncActivityInput input) -> void + + def start_nexus_operation: (StartNexusOperationInput input) -> NexusOperationHandle + + def describe_nexus_operation: (DescribeNexusOperationInput input) -> untyped + + def cancel_nexus_operation: (CancelNexusOperationInput input) -> void + + def terminate_nexus_operation: (TerminateNexusOperationInput input) -> void + + def list_nexus_operation_page: (ListNexusOperationPageInput input) -> Client::ListNexusOperationPage + + def count_nexus_operations: (CountNexusOperationsInput input) -> NexusOperationExecutionCount + + def poll_nexus_operation: (PollNexusOperationInput input) -> Object? end end end diff --git a/temporalio/sig/temporalio/client/nexus_client.rbs b/temporalio/sig/temporalio/client/nexus_client.rbs new file mode 100644 index 00000000..d227780b --- /dev/null +++ b/temporalio/sig/temporalio/client/nexus_client.rbs @@ -0,0 +1,44 @@ +module Temporalio + class Client + class NexusClient + attr_reader endpoint: String + attr_reader service: String + + def initialize: ( + client: Client, + endpoint: String, + service: String + ) -> void + + def start_operation: ( + String | Symbol operation, + ?Object? input, + ?id: String, + ?schedule_to_close_timeout: Float?, + ?schedule_to_start_timeout: Float?, + ?start_to_close_timeout: Float?, + ?id_reuse_policy: Integer, + ?id_conflict_policy: Integer, + ?search_attributes: SearchAttributes?, + ?nexus_header: Hash[String, String], + ?static_summary: String?, + ?rpc_options: RPCOptions? + ) -> NexusOperationHandle + + def execute_operation: ( + String | Symbol operation, + ?Object? input, + ?id: String, + ?schedule_to_close_timeout: Float?, + ?schedule_to_start_timeout: Float?, + ?start_to_close_timeout: Float?, + ?id_reuse_policy: Integer, + ?id_conflict_policy: Integer, + ?search_attributes: SearchAttributes?, + ?nexus_header: Hash[String, String], + ?static_summary: String?, + ?rpc_options: RPCOptions? + ) -> Object? + end + end +end diff --git a/temporalio/sig/temporalio/client/nexus_operation_handle.rbs b/temporalio/sig/temporalio/client/nexus_operation_handle.rbs new file mode 100644 index 00000000..d3907935 --- /dev/null +++ b/temporalio/sig/temporalio/client/nexus_operation_handle.rbs @@ -0,0 +1,32 @@ +module Temporalio + class Client + class NexusOperationHandle + attr_reader operation_id: String + attr_reader run_id: String? + + def initialize: ( + client: Client, + operation_id: String, + run_id: String? + ) -> void + + def result: ( + ?rpc_options: RPCOptions? + ) -> Object? + + def describe: ( + ?rpc_options: RPCOptions? + ) -> untyped + + def cancel: ( + ?reason: String?, + ?rpc_options: RPCOptions? + ) -> void + + def terminate: ( + ?reason: String?, + ?rpc_options: RPCOptions? + ) -> void + end + end +end diff --git a/temporalio/sig/temporalio/common_enums.rbs b/temporalio/sig/temporalio/common_enums.rbs index 9c16555e..1ee8ea30 100644 --- a/temporalio/sig/temporalio/common_enums.rbs +++ b/temporalio/sig/temporalio/common_enums.rbs @@ -34,6 +34,22 @@ module Temporalio TOO_MANY_UPDATES: enum end + module NexusOperationIDReusePolicy + type enum = Integer + + ALLOW_DUPLICATE: enum + ALLOW_DUPLICATE_FAILED_ONLY: enum + REJECT_DUPLICATE: enum + end + + module NexusOperationIDConflictPolicy + type enum = Integer + + UNSPECIFIED: enum + FAIL: enum + USE_EXISTING: enum + end + module VersioningBehavior type enum = Integer diff --git a/temporalio/sig/temporalio/error.rbs b/temporalio/sig/temporalio/error.rbs index 6ad0cd57..39212755 100644 --- a/temporalio/sig/temporalio/error.rbs +++ b/temporalio/sig/temporalio/error.rbs @@ -39,6 +39,20 @@ module Temporalio def initialize: -> void end + class NexusOperationAlreadyStartedError < Error + attr_reader operation_id: String + attr_reader run_id: String + + def initialize: (operation_id: String, run_id: String) -> void + end + + class NexusOperationFailedError < Error + attr_reader operation_id: String + attr_reader run_id: String? + + def initialize: (operation_id: String, run_id: String?) -> void + end + class AsyncActivityCanceledError < Error attr_reader details: Activity::CancellationDetails diff --git a/temporalio/test/client_nexus_test.rb b/temporalio/test/client_nexus_test.rb new file mode 100644 index 00000000..f3d5fc47 --- /dev/null +++ b/temporalio/test/client_nexus_test.rb @@ -0,0 +1,173 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require 'temporalio/error' +require 'temporalio/testing' +require 'test' + +class ClientNexusTest < Test + def test_nexus_standalone_sync_execute_operation + with_nexus_client do |nexus_client, endpoint| + result = nexus_client.execute_operation('echo', 'success', id: SecureRandom.uuid, + schedule_to_close_timeout: 30) + assert_equal 'success', result + end + end + + def test_nexus_standalone_sync_start_and_result + with_nexus_client do |nexus_client, endpoint| + op_id = SecureRandom.uuid + handle = nexus_client.start_operation('echo', 'success', id: op_id, schedule_to_close_timeout: 30) + + assert_instance_of Temporalio::Client::NexusOperationHandle, handle + assert_equal op_id, handle.operation_id + refute_nil handle.run_id + + result = handle.result + assert_equal 'success', result + end + end + + def test_nexus_standalone_async_operation + with_nexus_client do |nexus_client, endpoint| + op_id = SecureRandom.uuid + handle = nexus_client.start_operation('workflow-operation', { 'action' => 'success' }, + id: op_id, schedule_to_close_timeout: 30) + + assert_instance_of Temporalio::Client::NexusOperationHandle, handle + assert_equal op_id, handle.operation_id + refute_nil handle.run_id + + result = handle.result + assert_instance_of Hash, result + assert_equal 'success', result['result'] + end + end + + def test_nexus_standalone_cancel_operation + with_nexus_client do |nexus_client, endpoint| + op_id = SecureRandom.uuid + handle = nexus_client.start_operation('workflow-operation', { 'action' => 'wait-for-cancel' }, + id: op_id, schedule_to_close_timeout: 30) + assert_equal op_id, handle.operation_id + + # Wait for the operation to be running + assert_eventually do + desc = handle.describe + refute_nil desc.info + assert_equal op_id, desc.info.operation_id + assert_equal 'test-service', desc.info.service + assert_equal 'workflow-operation', desc.info.operation + assert_equal endpoint, desc.info.endpoint + end + + handle.cancel(reason: 'test cancel') + + # Waiting for result should raise with cancellation cause + err = assert_raises(Temporalio::Error::NexusOperationFailedError) { handle.result } + assert_equal op_id, err.operation_id + assert_instance_of Temporalio::Error::CanceledError, err.cause + end + end + + def test_nexus_standalone_terminate_operation + with_nexus_client do |nexus_client, endpoint| + op_id = SecureRandom.uuid + handle = nexus_client.start_operation('workflow-operation', { 'action' => 'wait-for-cancel' }, + id: op_id, schedule_to_close_timeout: 30) + assert_equal op_id, handle.operation_id + + assert_eventually do + desc = handle.describe + refute_nil desc.info + end + + handle.terminate(reason: 'test terminate') + end + end + + def test_nexus_standalone_describe_operation + with_nexus_client do |nexus_client, endpoint| + op_id = SecureRandom.uuid + handle = nexus_client.start_operation('workflow-operation', { 'action' => 'success' }, + id: op_id, schedule_to_close_timeout: 30) + + desc = handle.describe + refute_nil desc + refute_nil desc.info + assert_equal op_id, desc.info.operation_id + assert_equal endpoint, desc.info.endpoint + assert_equal 'test-service', desc.info.service + assert_equal 'workflow-operation', desc.info.operation + refute_nil desc.run_id + end + end + + def test_nexus_standalone_list_operations + with_nexus_client do |nexus_client, _endpoint| + op_id = SecureRandom.uuid + nexus_client.start_operation('echo', 'success', id: op_id, schedule_to_close_timeout: 30) + + ops = env.client.list_nexus_operations.to_a + refute_empty ops + + # Find our operation in the list + our_op = ops.find { |op| op.operation_id == op_id } + refute_nil our_op, "Expected to find operation #{op_id} in list" + assert_equal 'test-service', our_op.service + assert_equal 'echo', our_op.operation + end + end + + def test_nexus_standalone_count_operations + with_nexus_client do |nexus_client, _endpoint| + nexus_client.start_operation('echo', 'success', id: SecureRandom.uuid, schedule_to_close_timeout: 30) + + count = env.client.count_nexus_operations + assert_instance_of Temporalio::Client::NexusOperationExecutionCount, count + assert_operator count.count, :>=, 1 + assert_instance_of Array, count.groups + end + end + + def test_nexus_standalone_operation_handle_factory + with_nexus_client do |nexus_client, endpoint| + op_id = SecureRandom.uuid + original = nexus_client.start_operation('workflow-operation', { 'action' => 'success' }, + id: op_id, schedule_to_close_timeout: 30) + + # Get a new handle via the factory method + handle = env.client.nexus_operation_handle(original.operation_id) + assert_instance_of Temporalio::Client::NexusOperationHandle, handle + assert_equal op_id, handle.operation_id + assert_nil handle.run_id + + # Describe should work and return the same operation + desc = handle.describe + refute_nil desc.info + assert_equal op_id, desc.info.operation_id + assert_equal endpoint, desc.info.endpoint + assert_equal 'test-service', desc.info.service + end + end + + def test_nexus_standalone_operation_failure + with_nexus_client do |nexus_client, _endpoint| + op_id = SecureRandom.uuid + err = assert_raises(Temporalio::Error::NexusOperationFailedError) do + nexus_client.execute_operation('echo', 'operation-error', id: op_id, schedule_to_close_timeout: 30) + end + assert_equal op_id, err.operation_id + refute_nil err.cause + end + end + + private + + def with_nexus_client + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + yield env.client.create_nexus_client(endpoint, 'test-service'), endpoint + end + end +end diff --git a/temporalio/test/test.rb b/temporalio/test/test.rb index 8ad2c8d5..3a1bc614 100644 --- a/temporalio/test/test.rb +++ b/temporalio/test/test.rb @@ -147,7 +147,7 @@ def initialize if target_host.empty? @server = Temporalio::Testing::WorkflowEnvironment.start_local( logger: Logger.new($stdout), - dev_server_download_version: 'v1.7.0', + dev_server_download_version: 'v1.7.1-standalone-nexus-operations', dev_server_extra_args: [ # Allow continue as new to be immediate '--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"', @@ -155,7 +155,21 @@ def initialize '--dynamic-config-value', 'frontend.enableVersioningDataAPIs=true', '--dynamic-config-value', 'system.enableDeploymentVersions=true', # Enable activity pause - '--dynamic-config-value', 'frontend.activityAPIsEnabled=true' + '--dynamic-config-value', 'frontend.activityAPIsEnabled=true', + # Enable standalone Nexus operations + '--http-port', '7243', + '--dynamic-config-value', 'nexusoperation.enableStandalone=true', + '--dynamic-config-value', 'component.nexusoperations.useSystemCallbackURL=false', + '--dynamic-config-value', + 'component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"', + '--dynamic-config-value', 'system.refreshNexusEndpointsMinWait="0s"', + '--dynamic-config-value', 'component.nexusoperations.recordCancelRequestCompletionEvents=true', + '--dynamic-config-value', 'activity.enableStandalone=true', + '--dynamic-config-value', 'component.callbacks.allowedAddresses=[{"Pattern":"*","AllowInsecure":true}]', + '--dynamic-config-value', 'history.enableChasm=true', + '--dynamic-config-value', 'history.enableTransitionHistory=true', + '--dynamic-config-value', 'history.enableChasmCallbacks=true', + '--dynamic-config-value', 'history.enableRequestIdRefLinks=true' ] ) Minitest.after_run do