Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Prerequisites:
[Coinbase Ruby SDK](https://github.com/coinbase/temporal-ruby).
* [context_propagation](context_propagation) - Use interceptors to propagate thread/fiber local data from clients
through workflows/activities.
* [dsl](dsl) - Demonstrates having a workflow interpret/invoke arbitrary steps defined in a DSL.
* [encryption](encryption) - Demonstrates how to make a codec for end-to-end encryption.
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
* [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates.
Expand Down
26 changes: 26 additions & 0 deletions dsl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# DSL

This sample demonstrates having a workflow interpret/invoke arbitrary steps defined in a DSL. It is similar to the DSL
samples in [TypeScript](https://github.com/temporalio/samples-typescript/tree/main/dsl-interpreter), in
[Go](https://github.com/temporalio/samples-go/tree/main/dsl), and in
[Python](https://github.com/temporalio/samples-python/tree/main/dsl).

To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker
from this directory:

bundle exec ruby worker.rb

Now, in another terminal, run the following from this directory to execute a workflow of steps defined in
[workflow1.yaml](workflow1.yaml):

bundle exec ruby starter.rb workflow1.yaml

This will run the workflow and show the final variables that the workflow returns. Looking in the worker terminal, each
step executed will be visible.

Similarly we can do the same for the more advanced [workflow2.yaml](workflow2.yaml) file:

bundle exec ruby starter.rb workflow2.yaml

This sample gives a guide of how one can write a workflow to interpret arbitrary steps from a user-provided DSL. Many
DSL models are more advanced and are more specific to conform to business logic needs.
53 changes: 53 additions & 0 deletions dsl/activities.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

require 'temporalio/activity'
require_relative 'models'

module Dsl
module Activities
class Activity1 < Temporalio::Activity::Definition
activity_name :activity1

def execute(arg)
Temporalio::Activity::Context.current.logger.info("Executing activity1 with arg: #{arg}")
"[result from activity1: #{arg}]"
end
end

class Activity2 < Temporalio::Activity::Definition
activity_name :activity2

def execute(arg)
Temporalio::Activity::Context.current.logger.info("Executing activity2 with arg: #{arg}")
"[result from activity2: #{arg}]"
end
end

class Activity3 < Temporalio::Activity::Definition
activity_name :activity3

def execute(arg1, arg2)
Temporalio::Activity::Context.current.logger.info("Executing activity3 with args: #{arg1} and #{arg2}")
"[result from activity3: #{arg1} #{arg2}]"
end
end

class Activity4 < Temporalio::Activity::Definition
activity_name :activity4

def execute(arg)
Temporalio::Activity::Context.current.logger.info("Executing activity4 with arg #{arg}")
"[result from activity4: #{arg}]"
end
end

class Activity5 < Temporalio::Activity::Definition
activity_name :activity5

def execute(arg1, arg2)
Temporalio::Activity::Context.current.logger.info("Executing activity5 with args: #{arg1} and #{arg2}")
"[result from activity5: #{arg1} #{arg2}]"
end
end
end
end
39 changes: 39 additions & 0 deletions dsl/dsl_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

require 'temporalio/workflow'
require_relative 'models'

module Dsl
class DslWorkflow < Temporalio::Workflow::Definition
def execute(input)
# Run and return the final variable set
Temporalio::Workflow.logger.info('Running DSL workflow')
@variables = input.variables
execute_statement(input.root)
Temporalio::Workflow.logger.info('DSL workflow completed')
@variables
end

def execute_statement(stmt)
case stmt
when Models::Statement::Activity
# Invoke activity loading arguments from variables and optionally storing result as a variable
result = Temporalio::Workflow.execute_activity(
stmt.name,
*stmt.arguments.map { |a| @variables[a] },
start_to_close_timeout: 60
)
@variables[stmt.result] = result if stmt.result
when Models::Statement::Sequence
# Execute each statement in order
stmt.elements.each { |s| execute_statement(s) }
when Models::Statement::Parallel
# Execute all in parallel. Note, this will raise an exception when the first activity fails and will not cancel
# the others. We could provide a linked Cancellation to each and cancel it on error if we wanted.
Temporalio::Workflow::Future.all_of(
*stmt.branches.map { |s| Temporalio::Workflow::Future.new { execute_statement(s) } }
).wait
end
end
end
end
53 changes: 53 additions & 0 deletions dsl/models.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

require 'json/add/struct'
require 'yaml'

module Dsl
module Models
Input = Struct.new(:root, :variables) do
def self.from_yaml(yaml_str)
from_h(YAML.load(yaml_str))
end

def self.from_h(hash)
new(
root: Statement.from_h(hash['root'] || raise('Missing root')),
variables: hash['variables'] || {}
)
end
end

module Statement
def self.from_h(hash)
raise 'Expected single activity, sequence, or parallel field' unless hash.one?

type, sub_hash = hash.first
case type
when 'activity' then Activity.from_h(sub_hash)
when 'sequence' then Sequence.from_h(sub_hash)
when 'parallel' then Parallel.from_h(sub_hash)
else raise 'Expected single activity, sequence, or parallel field'
end
end

Activity = Struct.new(:name, :arguments, :result) do
def self.from_h(hash)
new(name: hash['name'], arguments: hash['arguments'], result: hash['result'])
end
end

Sequence = Struct.new(:elements) do
def self.from_h(hash)
new(elements: hash['elements'].map { |e| Statement.from_h(e) })
end
end

Parallel = Struct.new(:branches) do
def self.from_h(hash)
new(branches: hash['branches'].map { |e| Statement.from_h(e) })
end
end
end
end
end
21 changes: 21 additions & 0 deletions dsl/starter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

require 'temporalio/client'
require_relative 'activities'
require_relative 'models'
require_relative 'dsl_workflow'

# Create a Temporal client
logger = Logger.new($stdout, level: Logger::INFO)
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)

# Load YAML file
yaml_str = File.read(ARGV.first || raise('Missing argument for YAML file'))
input = Dsl::Models::Input.from_yaml(yaml_str)

# Run workflow
result = client.execute_workflow(
Dsl::DslWorkflow, input,
id: 'dsl-sample-workflow-id', task_queue: 'dsl-sample'
)
logger.info("Final variables: #{result}")
24 changes: 24 additions & 0 deletions dsl/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require 'logger'
require 'temporalio/client'
require 'temporalio/worker'
require_relative 'activities'
require_relative 'dsl_workflow'

# Create a Temporal client
logger = Logger.new($stdout, level: Logger::INFO)
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)

# Create worker with the activities and workflow
worker = Temporalio::Worker.new(
client:,
task_queue: 'dsl-sample',
activities: [Dsl::Activities::Activity1, Dsl::Activities::Activity2, Dsl::Activities::Activity3,
Dsl::Activities::Activity4, Dsl::Activities::Activity5],
workflows: [Dsl::DslWorkflow]
)

# Run the worker until SIGINT
logger.info('Starting worker (ctrl+c to exit)')
worker.run(shutdown_signals: ['SIGINT'])
28 changes: 28 additions & 0 deletions dsl/workflow1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This sample workflows execute 3 steps in sequence.
# 1) Activity1, takes arg1 as input, and put result as result1.
# 2) Activity2, takes result1 as input, and put result as result2.
# 3) Activity3, takes args2 and result2 as input, and put result as result3.

variables:
arg1: value1
arg2: value2

root:
sequence:
elements:
- activity:
name: activity1
arguments:
- arg1
result: result1
- activity:
name: activity2
arguments:
- result1
result: result2
- activity:
name: activity3
arguments:
- arg2
- result2
result: result3
58 changes: 58 additions & 0 deletions dsl/workflow2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This sample workflow executes 3 steps in sequence.
# 1) activity1, takes arg1 as input, and put result as result1.
# 2) it runs a parallel block which runs below sequence branches in parallel
# 2.1) sequence 1
# 2.1.1) activity2, takes result1 as input, and put result as result2
# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3
# 2.2) sequence 2
# 2.2.1) activity4, takes result1 as input, and put result as result4
# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5
# 3) activity3, takes result3 and result5 as input, and put result as result6.

variables:
arg1: value1
arg2: value2
arg3: value3

root:
sequence:
elements:
- activity:
name: activity1
arguments:
- arg1
result: result1
- parallel:
branches:
- sequence:
elements:
- activity:
name: activity2
arguments:
- result1
result: result2
- activity:
name: activity3
arguments:
- arg2
- result2
result: result3
- sequence:
elements:
- activity:
name: activity4
arguments:
- result1
result: result4
- activity:
name: activity5
arguments:
- arg3
- result4
result: result5
- activity:
name: activity3
arguments:
- result3
- result5
result: result6
60 changes: 60 additions & 0 deletions test/dsl/dsl_workflow_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

require 'test'
require 'dsl/activities'
require 'dsl/models'
require 'dsl/dsl_workflow'
require 'securerandom'
require 'temporalio/client'
require 'temporalio/testing'
require 'temporalio/worker'

module Dsl
class DslWorkflowTest < Test
def test_workflow
Temporalio::Testing::WorkflowEnvironment.start_local do |env|
# Load the YAML from workflow2.yaml
yaml_str = File.read(File.join(File.dirname(__FILE__), '../../dsl/workflow2.yaml'))
# Run workflow in a worker
worker = Temporalio::Worker.new(
client: env.client,
task_queue: "tq-#{SecureRandom.uuid}",
activities: [Activities::Activity1, Activities::Activity2, Activities::Activity3,
Activities::Activity4, Activities::Activity5],
workflows: [DslWorkflow]
)
handle, result = worker.run do
handle = env.client.start_workflow(
DslWorkflow, Models::Input.from_yaml(yaml_str),
id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue
)
[handle, handle.result]
end
# Confirm expected variable results
assert_equal(
{
'arg1' => 'value1',
'arg2' => 'value2',
'arg3' => 'value3',
'result1' => '[result from activity1: value1]',
'result2' => '[result from activity2: [result from activity1: value1]]',
'result3' => '[result from activity3: value2 [result from activity2: [result from activity1: value1]]]',
'result4' => '[result from activity4: [result from activity1: value1]]',
'result5' => '[result from activity5: value3 [result from activity4: [result from activity1: value1]]]',
'result6' => '[result from activity3: [result from activity3: value2 [result from activity2: ' \
'[result from activity1: value1]]] [result from activity5: ' \
'value3 [result from activity4: [result from activity1: value1]]]]'
},
result
)
# Collect all activity events and confirm they are in order expected
activity_names = handle.fetch_history_events
.map { |e| e.activity_task_scheduled_event_attributes&.activity_type&.name }
.compact
assert_equal 'activity1', activity_names[0]
assert_equal %w[activity2 activity3 activity4 activity5], activity_names[1, 4].sort
assert_equal 'activity3', activity_names[5]
end
end
end
end