diff --git a/README.md b/README.md index 51472b8..96075d3 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Prerequisites: [Coinbase Ruby SDK](https://github.com/coinbase/temporal-ruby). * [context_propagation](context_propagation) - Use interceptors to propagate thread/fiber local data from clients through workflows/activities. +* [dsl](dsl) - Execute workflows defined in a YAML-based domain-specific language. * [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates. * [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK. diff --git a/dsl/README.md b/dsl/README.md new file mode 100644 index 0000000..91ccf9f --- /dev/null +++ b/dsl/README.md @@ -0,0 +1,69 @@ +# DSL Sample + +This sample demonstrates how to create a workflow that can interpret and execute operations defined in a YAML-based Domain Specific Language (DSL). This approach allows you to define workflow steps declaratively in a YAML file, which can be modified without changing the underlying code. + +## Overview + +The sample shows how to: + +1. Define a DSL schema for workflow operations +2. Parse YAML files into Ruby objects +3. Create a workflow that can interpret and execute these operations +4. Execute activities based on the DSL definition +5. Support sequential and parallel execution patterns + +## Prerequisites + +See the [main README](../README.md) for prerequisites and setup instructions. + +## Running the Sample + +1. Start the worker in one terminal: + +``` +ruby worker.rb +``` + +2. Execute a workflow using one of the example YAML definitions: + +``` +ruby starter.rb workflow1.yaml +``` + +This will run the simple sequential workflow defined in `workflow1.yaml`. + +3. Try the more complex workflow with parallel execution: + +``` +ruby starter.rb workflow2.yaml +``` + +## Understanding the Sample + +### DSL Model + +The DSL allows defining: + +- Variables to be used across the workflow +- Activities with inputs and outputs +- Sequential execution of steps +- Parallel execution of branches + +### Workflow Structure + +- `my_activities.rb` - Example activities that can be executed by the workflow +- `dsl_models.rb` - Classes representing the DSL schema +- `dsl.rb` - Workflow implementation that interprets and executes the DSL +- `worker.rb` - Worker process that hosts the activities and workflow +- `starter.rb` - Client that reads a YAML file and executes the workflow +- `workflow1.yaml` - Simple sequential workflow example +- `workflow2.yaml` - More complex workflow with parallel execution + +### Extending the Sample + +You can extend this sample by: + +1. Adding more activity types +2. Extending the DSL with new statement types (e.g., conditionals, loops) +3. Adding error handling and retry mechanisms +4. Creating validation for the DSL input \ No newline at end of file diff --git a/dsl/activities.rb b/dsl/activities.rb new file mode 100644 index 0000000..abede07 --- /dev/null +++ b/dsl/activities.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require 'temporalio/activity' + +module Dsl + module Activities + class Activity1 < Temporalio::Activity::Definition + def execute(arg) + Temporalio::Activity::Context.current.logger.info("Executing activity1 with arg: #{arg}") + "[result from activity1: #{arg}]" + end + end + + class Activity2 < Temporalio::Activity::Definition + def execute(arg) + Temporalio::Activity::Context.current.logger.info("Executing activity2 with arg: #{arg}") + "[result from activity2: #{arg}]" + end + end + + class Activity3 < Temporalio::Activity::Definition + def execute(arg1, arg2) + Temporalio::Activity::Context.current.logger.info("Executing activity3 with args: #{arg1} and #{arg2}") + "[result from activity3: #{arg1} #{arg2}]" + end + end + + class Activity4 < Temporalio::Activity::Definition + def execute(arg) + Temporalio::Activity::Context.current.logger.info("Executing activity4 with arg: #{arg}") + "[result from activity4: #{arg}]" + end + end + + class Activity5 < Temporalio::Activity::Definition + def execute(arg1, arg2) + Temporalio::Activity::Context.current.logger.info("Executing activity5 with args: #{arg1} and #{arg2}") + "[result from activity5: #{arg1} #{arg2}]" + end + end + end +end diff --git a/dsl/dsl_models.rb b/dsl/dsl_models.rb new file mode 100644 index 0000000..ba87432 --- /dev/null +++ b/dsl/dsl_models.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +module Dsl + module Models + # Base class for DSL input with root statement and variables + class DslInput + attr_reader :root, :variables + + def initialize(root, variables = {}) + @root = root + @variables = variables + end + + def to_h + { + 'root' => @root.to_h, + 'variables' => @variables + } + end + end + + # Activity invocation model + class ActivityInvocation + attr_reader :name, :arguments, :result + + def initialize(name, arguments = [], result = nil) + @name = name + @arguments = arguments + @result = result + end + + def to_h + { + 'name' => @name, + 'arguments' => @arguments, + 'result' => @result + } + end + end + + # Activity statement model + class ActivityStatement + attr_reader :activity + + def initialize(activity) + @activity = activity + end + + def to_h + { + 'activity' => @activity.to_h + } + end + end + + # Sequence model containing list of statements + class Sequence + attr_reader :elements + + def initialize(elements) + @elements = elements + end + + def to_h + { + 'elements' => @elements.map(&:to_h) + } + end + end + + # Sequence statement model + class SequenceStatement + attr_reader :sequence + + def initialize(sequence) + @sequence = sequence + end + + def to_h + { + 'sequence' => @sequence.to_h + } + end + end + + # Parallel model containing list of branches + class Parallel + attr_reader :branches + + def initialize(branches) + @branches = branches + end + + def to_h + { + 'branches' => @branches.map(&:to_h) + } + end + end + + # Parallel statement model + class ParallelStatement + attr_reader :parallel + + def initialize(parallel) + @parallel = parallel + end + + def to_h + { + 'parallel' => @parallel.to_h + } + end + end + + # Parse YAML to DSL models + class Parser + def self.parse_yaml(yaml_content) + require 'yaml' + data = YAML.safe_load(yaml_content) + + variables = data['variables'] || {} + root = parse_statement(data['root']) + + DslInput.new(root, variables) + end + + def self.parse_statement(data) + case data.keys.first + when 'activity' + activity_data = data['activity'] + activity = ActivityInvocation.new( + activity_data['name'], + activity_data['arguments'] || [], + activity_data['result'] + ) + ActivityStatement.new(activity) + when 'sequence' + sequence_data = data['sequence'] + elements = sequence_data['elements'].map { |elem| parse_statement(elem) } + SequenceStatement.new(Sequence.new(elements)) + when 'parallel' + parallel_data = data['parallel'] + branches = parallel_data['branches'].map { |branch| parse_statement(branch) } + ParallelStatement.new(Parallel.new(branches)) + else + raise "Unknown statement type: #{data.keys.first}" + end + end + end + end +end diff --git a/dsl/dsl_workflow.rb b/dsl/dsl_workflow.rb new file mode 100644 index 0000000..74a9cb4 --- /dev/null +++ b/dsl/dsl_workflow.rb @@ -0,0 +1,151 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require_relative 'dsl_models' +require_relative 'activities' + +module Dsl + class DslWorkflow < Temporalio::Workflow::Definition + def execute(input) + Temporalio::Workflow.logger.info('Running DSL workflow') + + # Parse input - it could be a string (YAML) or a hash (already parsed) + parsed_input = parse_input(input) + @variables = parsed_input[:variables].dup + + # Execute the root statement + execute_statement(parsed_input[:root]) + + Temporalio::Workflow.logger.info('DSL workflow completed') + + # Return the final variables state + @variables + end + + private + + # Safely parse the input + def parse_input(input) + if input.is_a?(String) + # Run YAML parsing in an unsafe block since it's non-deterministic + Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do + require 'yaml' + data = YAML.safe_load(input) + + variables = data['variables'] || {} + root = parse_data_to_statement(data['root']) + + { variables: variables, root: root } + end + elsif input.is_a?(Hash) + # It's already a hash, extract variables and root + variables = input['variables'] || {} + root = parse_data_to_statement(input['root']) + + { variables: variables, root: root } + else + # For other types (like the DslInput object) + begin + { variables: input.variables, root: input.root } + rescue NoMethodError + raise "Invalid input format: #{input.class}. Expected String, Hash, or DslInput object." + end + end + end + + # Parse statement data into appropriate objects + def parse_data_to_statement(data) + return nil unless data.is_a?(Hash) + + if data.key?('activity') + activity_data = data['activity'] + activity = Models::ActivityInvocation.new( + activity_data['name'], + activity_data['arguments'] || [], + activity_data['result'] + ) + Models::ActivityStatement.new(activity) + elsif data.key?('sequence') + sequence_data = data['sequence'] + elements = sequence_data['elements'].map { |elem| parse_data_to_statement(elem) } + Models::SequenceStatement.new(Models::Sequence.new(elements)) + elsif data.key?('parallel') + parallel_data = data['parallel'] + branches = parallel_data['branches'].map { |branch| parse_data_to_statement(branch) } + Models::ParallelStatement.new(Models::Parallel.new(branches)) + else + raise "Unknown statement type: #{data.keys.first}" + end + end + + def execute_statement(stmt) + case stmt + when Models::ActivityStatement + # Execute activity statement with variable resolution + execute_activity_statement(stmt) + when Models::SequenceStatement + # Execute each statement in sequence + execute_sequence_statement(stmt) + when Models::ParallelStatement + # Execute branches in parallel + execute_parallel_statement(stmt) + else + raise "Unknown statement type: #{stmt.class}" + end + end + + def execute_activity_statement(stmt) + activity = stmt.activity + + # Resolve activity name to the appropriate activity class + activity_class = activity_name_to_class(activity.name) + + # Resolve arguments from variables + args = activity.arguments.map { |arg| @variables[arg] || arg } + + # Execute the activity + result = Temporalio::Workflow.execute_activity( + activity_class, + *args, + start_to_close_timeout: 5 * 60 # 5 minutes + ) + + # Store result in variables if result variable specified + @variables[activity.result] = result if activity.result + end + + def execute_sequence_statement(stmt) + # Execute each statement in the sequence in order + stmt.sequence.elements.each do |element| + execute_statement(element) + end + end + + def execute_parallel_statement(stmt) + futures = stmt.parallel.branches.map do |branch| + Temporalio::Workflow::Future.new { execute_statement(branch) } + end + Temporalio::Workflow::Future.all_of(*futures).wait + + futures.map(&:result) + end + + def activity_name_to_class(name) + # Map activity name to appropriate activity class + case name + when 'activity1' + Activities::Activity1 + when 'activity2' + Activities::Activity2 + when 'activity3' + Activities::Activity3 + when 'activity4' + Activities::Activity4 + when 'activity5' + Activities::Activity5 + else + raise "Unknown activity name: #{name}" + end + end + end +end diff --git a/dsl/starter.rb b/dsl/starter.rb new file mode 100644 index 0000000..6359884 --- /dev/null +++ b/dsl/starter.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/client' +require_relative 'dsl_workflow' +require_relative 'dsl_models' + +# Ensure workflow file is provided +if ARGV.empty? + puts 'Please provide a workflow YAML file' + puts 'Usage: ruby starter.rb workflow_file.yaml' + exit 1 +end + +file_path = ARGV[0] +unless File.exist?(file_path) + puts "File not found: #{file_path}" + exit 1 +end + +# Read the YAML workflow +yaml_content = File.read(file_path) + +# Create a client +client = Temporalio::Client.connect('localhost:7233', 'default') + +# Generate a unique workflow ID +workflow_id = "dsl-workflow-#{SecureRandom.uuid}" + +puts "Executing workflow: #{workflow_id}" +puts "Using workflow definition from: #{file_path}" + +# Execute the workflow with the YAML content directly +result = client.execute_workflow( + Dsl::DslWorkflow, + yaml_content, + id: workflow_id, + task_queue: 'dsl-workflow-sample' +) + +# Display final variables +puts "\nWorkflow completed. Final variables:" +result.each do |key, value| + puts " #{key}: #{value}" +end diff --git a/dsl/worker.rb b/dsl/worker.rb new file mode 100644 index 0000000..2e3786b --- /dev/null +++ b/dsl/worker.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require 'logger' +require 'temporalio/client' +require 'temporalio/worker' +require_relative 'dsl_workflow' +require_relative 'activities' + +# Create a Temporal client +client = Temporalio::Client.connect( + 'localhost:7233', + 'default', + logger: Logger.new($stdout, level: Logger::INFO) +) + +# Create worker with the activities and workflow +worker = Temporalio::Worker.new( + client: client, + task_queue: 'dsl-workflow-sample', + activities: [ + Dsl::Activities::Activity1, + Dsl::Activities::Activity2, + Dsl::Activities::Activity3, + Dsl::Activities::Activity4, + Dsl::Activities::Activity5 + ], + workflows: [Dsl::DslWorkflow] +) + +# Run the worker until SIGINT +puts 'Starting worker (ctrl+c to exit)' +worker.run(shutdown_signals: ['SIGINT']) diff --git a/dsl/workflow1.yaml b/dsl/workflow1.yaml new file mode 100644 index 0000000..25b0cac --- /dev/null +++ b/dsl/workflow1.yaml @@ -0,0 +1,28 @@ +# This sample workflow executes 3 steps in sequence. +# 1) Activity1, takes arg1 as input, and put result as result1. +# 2) Activity2, takes result1 as input, and put result as result2. +# 3) Activity3, takes arg2 and result2 as input, and put result as result3. + +variables: + arg1: value1 + arg2: value2 + +root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 \ No newline at end of file diff --git a/dsl/workflow2.yaml b/dsl/workflow2.yaml new file mode 100644 index 0000000..cf19fdd --- /dev/null +++ b/dsl/workflow2.yaml @@ -0,0 +1,58 @@ +# This sample workflow executes 3 steps in sequence. +# 1) activity1, takes arg1 as input, and put result as result1. +# 2) it runs a parallel block which runs below sequence branches in parallel +# 2.1) sequence 1 +# 2.1.1) activity2, takes result1 as input, and put result as result2 +# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3 +# 2.2) sequence 2 +# 2.2.1) activity4, takes result1 as input, and put result as result4 +# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5 +# 3) activity3, takes result3 and result5 as input, and put result as result6. + +variables: + arg1: value1 + arg2: value2 + arg3: value3 + +root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - parallel: + branches: + - sequence: + elements: + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 + - sequence: + elements: + - activity: + name: activity4 + arguments: + - result1 + result: result4 + - activity: + name: activity5 + arguments: + - arg3 + - result4 + result: result5 + - activity: + name: activity3 + arguments: + - result3 + - result5 + result: result6 \ No newline at end of file diff --git a/test/dsl/dsl_workflow_test.rb b/test/dsl/dsl_workflow_test.rb new file mode 100644 index 0000000..c277877 --- /dev/null +++ b/test/dsl/dsl_workflow_test.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +require 'minitest/autorun' +require 'securerandom' +require 'temporalio/testing' +require 'temporalio/worker' +require 'dsl/dsl_workflow' +require 'dsl/dsl_models' +require 'dsl/activities' + +module Dsl + class DslWorkflowTest < Minitest::Test + def test_workflow1 + yaml_content = File.read(File.join(File.dirname(__FILE__), '../../dsl/workflow1.yaml')) + + # Run test server until completion of the block + Temporalio::Testing::WorkflowEnvironment.start_local do |env| + # Run worker until completion of the block + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "dsl-test-queue-#{SecureRandom.uuid}", + activities: [ + Activities::Activity1, + Activities::Activity2, + Activities::Activity3, + Activities::Activity4, + Activities::Activity5 + ], + workflows: [DslWorkflow] + ) + worker.run do + # Run workflow with the first YAML file + workflow_id = "dsl-workflow1-test-#{SecureRandom.uuid}" + result = env.client.execute_workflow( + DslWorkflow, + yaml_content, + id: workflow_id, + task_queue: worker.task_queue + ) + + # Simply check that the workflow completes - no need to verify results + assert result.is_a?(Hash), "Expected result to be a Hash, got #{result.class}" + end + end + end + + def test_workflow2 + yaml_content = File.read(File.join(File.dirname(__FILE__), '../../dsl/workflow2.yaml')) + + # Run test server until completion of the block + Temporalio::Testing::WorkflowEnvironment.start_local do |env| + # Run worker until completion of the block + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "dsl-test-queue-#{SecureRandom.uuid}", + activities: [ + Activities::Activity1, + Activities::Activity2, + Activities::Activity3, + Activities::Activity4, + Activities::Activity5 + ], + workflows: [DslWorkflow] + ) + worker.run do + # Run workflow with the second YAML file + workflow_id = "dsl-workflow2-test-#{SecureRandom.uuid}" + result = env.client.execute_workflow( + DslWorkflow, + yaml_content, + id: workflow_id, + task_queue: worker.task_queue + ) + + # Simply check that the workflow completes - no need to verify results + assert result.is_a?(Hash), "Expected result to be a Hash, got #{result.class}" + end + end + end + end +end