From bec18ed32a4dfcd98540736dfa9a33a505e9d67a Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 1 Oct 2025 13:30:00 -0500 Subject: [PATCH] DSL sample Closes #32 --- README.md | 1 + dsl/README.md | 26 +++++++++++++++ dsl/activities.rb | 53 +++++++++++++++++++++++++++++++ dsl/dsl_workflow.rb | 39 +++++++++++++++++++++++ dsl/models.rb | 53 +++++++++++++++++++++++++++++++ dsl/starter.rb | 21 ++++++++++++ dsl/worker.rb | 24 ++++++++++++++ dsl/workflow1.yaml | 28 ++++++++++++++++ dsl/workflow2.yaml | 58 +++++++++++++++++++++++++++++++++ test/dsl/dsl_workflow_test.rb | 60 +++++++++++++++++++++++++++++++++++ 10 files changed, 363 insertions(+) create mode 100644 dsl/README.md create mode 100644 dsl/activities.rb create mode 100644 dsl/dsl_workflow.rb create mode 100644 dsl/models.rb create mode 100644 dsl/starter.rb create mode 100644 dsl/worker.rb create mode 100644 dsl/workflow1.yaml create mode 100644 dsl/workflow2.yaml create mode 100644 test/dsl/dsl_workflow_test.rb diff --git a/README.md b/README.md index 3d3e8f0..3c95489 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Prerequisites: [Coinbase Ruby SDK](https://github.com/coinbase/temporal-ruby). * [context_propagation](context_propagation) - Use interceptors to propagate thread/fiber local data from clients through workflows/activities. +* [dsl](dsl) - Demonstrates having a workflow interpret/invoke arbitrary steps defined in a DSL. * [encryption](encryption) - Demonstrates how to make a codec for end-to-end encryption. * [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates. * [polling/infrequent](polling/infrequent) - Implement an infrequent polling mechanism using Temporal's automatic Activity Retry feature. diff --git a/dsl/README.md b/dsl/README.md new file mode 100644 index 0000000..cffa0af --- /dev/null +++ b/dsl/README.md @@ -0,0 +1,26 @@ +# DSL + +This sample demonstrates having a workflow interpret/invoke arbitrary steps defined in a DSL. It is similar to the DSL +samples in [TypeScript](https://github.com/temporalio/samples-typescript/tree/main/dsl-interpreter), in +[Go](https://github.com/temporalio/samples-go/tree/main/dsl), and in +[Python](https://github.com/temporalio/samples-python/tree/main/dsl). + +To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker +from this directory: + + bundle exec ruby worker.rb + +Now, in another terminal, run the following from this directory to execute a workflow of steps defined in +[workflow1.yaml](workflow1.yaml): + + bundle exec ruby starter.rb workflow1.yaml + +This will run the workflow and show the final variables that the workflow returns. Looking in the worker terminal, each +step executed will be visible. + +Similarly we can do the same for the more advanced [workflow2.yaml](workflow2.yaml) file: + + bundle exec ruby starter.rb workflow2.yaml + +This sample gives a guide of how one can write a workflow to interpret arbitrary steps from a user-provided DSL. Many +DSL models are more advanced and are more specific to conform to business logic needs. \ No newline at end of file diff --git a/dsl/activities.rb b/dsl/activities.rb new file mode 100644 index 0000000..4c520cd --- /dev/null +++ b/dsl/activities.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require 'temporalio/activity' +require_relative 'models' + +module Dsl + module Activities + class Activity1 < Temporalio::Activity::Definition + activity_name :activity1 + + def execute(arg) + Temporalio::Activity::Context.current.logger.info("Executing activity1 with arg: #{arg}") + "[result from activity1: #{arg}]" + end + end + + class Activity2 < Temporalio::Activity::Definition + activity_name :activity2 + + def execute(arg) + Temporalio::Activity::Context.current.logger.info("Executing activity2 with arg: #{arg}") + "[result from activity2: #{arg}]" + end + end + + class Activity3 < Temporalio::Activity::Definition + activity_name :activity3 + + def execute(arg1, arg2) + Temporalio::Activity::Context.current.logger.info("Executing activity3 with args: #{arg1} and #{arg2}") + "[result from activity3: #{arg1} #{arg2}]" + end + end + + class Activity4 < Temporalio::Activity::Definition + activity_name :activity4 + + def execute(arg) + Temporalio::Activity::Context.current.logger.info("Executing activity4 with arg #{arg}") + "[result from activity4: #{arg}]" + end + end + + class Activity5 < Temporalio::Activity::Definition + activity_name :activity5 + + def execute(arg1, arg2) + Temporalio::Activity::Context.current.logger.info("Executing activity5 with args: #{arg1} and #{arg2}") + "[result from activity5: #{arg1} #{arg2}]" + end + end + end +end diff --git a/dsl/dsl_workflow.rb b/dsl/dsl_workflow.rb new file mode 100644 index 0000000..34d01dd --- /dev/null +++ b/dsl/dsl_workflow.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require_relative 'models' + +module Dsl + class DslWorkflow < Temporalio::Workflow::Definition + def execute(input) + # Run and return the final variable set + Temporalio::Workflow.logger.info('Running DSL workflow') + @variables = input.variables + execute_statement(input.root) + Temporalio::Workflow.logger.info('DSL workflow completed') + @variables + end + + def execute_statement(stmt) + case stmt + when Models::Statement::Activity + # Invoke activity loading arguments from variables and optionally storing result as a variable + result = Temporalio::Workflow.execute_activity( + stmt.name, + *stmt.arguments.map { |a| @variables[a] }, + start_to_close_timeout: 60 + ) + @variables[stmt.result] = result if stmt.result + when Models::Statement::Sequence + # Execute each statement in order + stmt.elements.each { |s| execute_statement(s) } + when Models::Statement::Parallel + # Execute all in parallel. Note, this will raise an exception when the first activity fails and will not cancel + # the others. We could provide a linked Cancellation to each and cancel it on error if we wanted. + Temporalio::Workflow::Future.all_of( + *stmt.branches.map { |s| Temporalio::Workflow::Future.new { execute_statement(s) } } + ).wait + end + end + end +end diff --git a/dsl/models.rb b/dsl/models.rb new file mode 100644 index 0000000..cc33367 --- /dev/null +++ b/dsl/models.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require 'json/add/struct' +require 'yaml' + +module Dsl + module Models + Input = Struct.new(:root, :variables) do + def self.from_yaml(yaml_str) + from_h(YAML.load(yaml_str)) + end + + def self.from_h(hash) + new( + root: Statement.from_h(hash['root'] || raise('Missing root')), + variables: hash['variables'] || {} + ) + end + end + + module Statement + def self.from_h(hash) + raise 'Expected single activity, sequence, or parallel field' unless hash.one? + + type, sub_hash = hash.first + case type + when 'activity' then Activity.from_h(sub_hash) + when 'sequence' then Sequence.from_h(sub_hash) + when 'parallel' then Parallel.from_h(sub_hash) + else raise 'Expected single activity, sequence, or parallel field' + end + end + + Activity = Struct.new(:name, :arguments, :result) do + def self.from_h(hash) + new(name: hash['name'], arguments: hash['arguments'], result: hash['result']) + end + end + + Sequence = Struct.new(:elements) do + def self.from_h(hash) + new(elements: hash['elements'].map { |e| Statement.from_h(e) }) + end + end + + Parallel = Struct.new(:branches) do + def self.from_h(hash) + new(branches: hash['branches'].map { |e| Statement.from_h(e) }) + end + end + end + end +end diff --git a/dsl/starter.rb b/dsl/starter.rb new file mode 100644 index 0000000..03d9043 --- /dev/null +++ b/dsl/starter.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require_relative 'activities' +require_relative 'models' +require_relative 'dsl_workflow' + +# Create a Temporal client +logger = Logger.new($stdout, level: Logger::INFO) +client = Temporalio::Client.connect('localhost:7233', 'default', logger:) + +# Load YAML file +yaml_str = File.read(ARGV.first || raise('Missing argument for YAML file')) +input = Dsl::Models::Input.from_yaml(yaml_str) + +# Run workflow +result = client.execute_workflow( + Dsl::DslWorkflow, input, + id: 'dsl-sample-workflow-id', task_queue: 'dsl-sample' +) +logger.info("Final variables: #{result}") diff --git a/dsl/worker.rb b/dsl/worker.rb new file mode 100644 index 0000000..a203b76 --- /dev/null +++ b/dsl/worker.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require 'logger' +require 'temporalio/client' +require 'temporalio/worker' +require_relative 'activities' +require_relative 'dsl_workflow' + +# Create a Temporal client +logger = Logger.new($stdout, level: Logger::INFO) +client = Temporalio::Client.connect('localhost:7233', 'default', logger:) + +# Create worker with the activities and workflow +worker = Temporalio::Worker.new( + client:, + task_queue: 'dsl-sample', + activities: [Dsl::Activities::Activity1, Dsl::Activities::Activity2, Dsl::Activities::Activity3, + Dsl::Activities::Activity4, Dsl::Activities::Activity5], + workflows: [Dsl::DslWorkflow] +) + +# Run the worker until SIGINT +logger.info('Starting worker (ctrl+c to exit)') +worker.run(shutdown_signals: ['SIGINT']) diff --git a/dsl/workflow1.yaml b/dsl/workflow1.yaml new file mode 100644 index 0000000..85da523 --- /dev/null +++ b/dsl/workflow1.yaml @@ -0,0 +1,28 @@ +# This sample workflows execute 3 steps in sequence. +# 1) Activity1, takes arg1 as input, and put result as result1. +# 2) Activity2, takes result1 as input, and put result as result2. +# 3) Activity3, takes args2 and result2 as input, and put result as result3. + +variables: + arg1: value1 + arg2: value2 + +root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 \ No newline at end of file diff --git a/dsl/workflow2.yaml b/dsl/workflow2.yaml new file mode 100644 index 0000000..cf19fdd --- /dev/null +++ b/dsl/workflow2.yaml @@ -0,0 +1,58 @@ +# This sample workflow executes 3 steps in sequence. +# 1) activity1, takes arg1 as input, and put result as result1. +# 2) it runs a parallel block which runs below sequence branches in parallel +# 2.1) sequence 1 +# 2.1.1) activity2, takes result1 as input, and put result as result2 +# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3 +# 2.2) sequence 2 +# 2.2.1) activity4, takes result1 as input, and put result as result4 +# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5 +# 3) activity3, takes result3 and result5 as input, and put result as result6. + +variables: + arg1: value1 + arg2: value2 + arg3: value3 + +root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - parallel: + branches: + - sequence: + elements: + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 + - sequence: + elements: + - activity: + name: activity4 + arguments: + - result1 + result: result4 + - activity: + name: activity5 + arguments: + - arg3 + - result4 + result: result5 + - activity: + name: activity3 + arguments: + - result3 + - result5 + result: result6 \ No newline at end of file diff --git a/test/dsl/dsl_workflow_test.rb b/test/dsl/dsl_workflow_test.rb new file mode 100644 index 0000000..f3cc312 --- /dev/null +++ b/test/dsl/dsl_workflow_test.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +require 'test' +require 'dsl/activities' +require 'dsl/models' +require 'dsl/dsl_workflow' +require 'securerandom' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' + +module Dsl + class DslWorkflowTest < Test + def test_workflow + Temporalio::Testing::WorkflowEnvironment.start_local do |env| + # Load the YAML from workflow2.yaml + yaml_str = File.read(File.join(File.dirname(__FILE__), '../../dsl/workflow2.yaml')) + # Run workflow in a worker + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + activities: [Activities::Activity1, Activities::Activity2, Activities::Activity3, + Activities::Activity4, Activities::Activity5], + workflows: [DslWorkflow] + ) + handle, result = worker.run do + handle = env.client.start_workflow( + DslWorkflow, Models::Input.from_yaml(yaml_str), + id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue + ) + [handle, handle.result] + end + # Confirm expected variable results + assert_equal( + { + 'arg1' => 'value1', + 'arg2' => 'value2', + 'arg3' => 'value3', + 'result1' => '[result from activity1: value1]', + 'result2' => '[result from activity2: [result from activity1: value1]]', + 'result3' => '[result from activity3: value2 [result from activity2: [result from activity1: value1]]]', + 'result4' => '[result from activity4: [result from activity1: value1]]', + 'result5' => '[result from activity5: value3 [result from activity4: [result from activity1: value1]]]', + 'result6' => '[result from activity3: [result from activity3: value2 [result from activity2: ' \ + '[result from activity1: value1]]] [result from activity5: ' \ + 'value3 [result from activity4: [result from activity1: value1]]]]' + }, + result + ) + # Collect all activity events and confirm they are in order expected + activity_names = handle.fetch_history_events + .map { |e| e.activity_task_scheduled_event_attributes&.activity_type&.name } + .compact + assert_equal 'activity1', activity_names[0] + assert_equal %w[activity2 activity3 activity4 activity5], activity_names[1, 4].sort + assert_equal 'activity3', activity_names[5] + end + end + end +end