diff --git a/README.md b/README.md index 5df5ec4..52a8b1c 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Prerequisites: * [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities. * [saga](saga) - Using undo/compensation using a very simplistic Saga pattern. * [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK. +* [updatable_timer](updatable_timer) - Demonstrates a blocking sleep that can be updated. * [worker_specific_task_queues](worker_specific_task_queues) - Use a unique Task Queue for each Worker to run a sequence of Activities on the same Worker. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. @@ -37,4 +38,5 @@ Prerequisites: To check format and test this repository, run: - bundle exec rake \ No newline at end of file + bundle exec rake + diff --git a/test/updatable_timer/updatable_timer_workflow_test.rb b/test/updatable_timer/updatable_timer_workflow_test.rb new file mode 100644 index 0000000..c7c688e --- /dev/null +++ b/test/updatable_timer/updatable_timer_workflow_test.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require 'test' +require 'updatable_timer/updatable_timer_workflow' +require 'securerandom' +require 'temporalio/client' +require 'temporalio/testing' +require 'temporalio/worker' + +module UpdatableTimer + class UpdatableTimerWorkflowTest < Test + def test_workflow + Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env| + # Run workflow in a worker + worker = Temporalio::Worker.new( + client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + workflows: [UpdatableTimerWorkflow] + ) + handle = worker.run do + day_from_now = (Time.now(in: 'utc') + (24 * 60 * 60)).to_r + hour_from_now = (Time.now(in: 'utc') + (60 * 60)).to_r + handle = env.client.start_workflow( + UpdatableTimerWorkflow, day_from_now, + id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue + ) + assert_equal day_from_now, Rational(handle.query(UpdatableTimerWorkflow.wake_up_time)) + env.sleep(10) + handle.signal(UpdatableTimerWorkflow.update_wake_up_time, hour_from_now) + assert_equal hour_from_now, Rational(handle.query(UpdatableTimerWorkflow.wake_up_time)) + + handle.result + handle + end + timer_events = handle.fetch_history_events.filter_map do |e| + timer_id = (e.timer_started_event_attributes || + e.timer_canceled_event_attributes || + e.timer_fired_event_attributes)&.timer_id + [e.event_type, timer_id] if timer_id + end.compact + assert_equal( + [[:EVENT_TYPE_TIMER_STARTED, '1'], [:EVENT_TYPE_TIMER_CANCELED, '1'], [:EVENT_TYPE_TIMER_STARTED, '2'], + [:EVENT_TYPE_TIMER_FIRED, '2']], + timer_events + ) + end + end + end +end diff --git a/updatable_timer/README.md b/updatable_timer/README.md new file mode 100644 index 0000000..00a3c21 --- /dev/null +++ b/updatable_timer/README.md @@ -0,0 +1,25 @@ +# Updatable Timer Sample + +Demonstrates a helper class which relies on `Temporalio::Workflow.wait_condition` to implement a blocking sleep that can be updated at any moment. + +To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker from this directory: + +```bash + bundle exec ruby worker.rb +``` + +Then in another terminal, use the Ruby client to the workflow from this directory: + +```bash + bundle exec ruby starter.rb +``` + +The Ruby code will invoke the workflow which will create a timer that will resolve in a day. + +Finally in a third terminal, run the updater to change the timer to 10 seconds from now: + +```bash + bundle exec ruby wake_up_timer_updater.rb +``` + +There is also a [test](../test/updatable_timer/updatable_timer_workflow_test.rb) that demonstrates querying the wake up time. diff --git a/updatable_timer/starter.rb b/updatable_timer/starter.rb new file mode 100644 index 0000000..86ae27c --- /dev/null +++ b/updatable_timer/starter.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require_relative 'updatable_timer_workflow' + +# Create a Temporal client +logger = Logger.new($stdout, level: Logger::INFO) +client = Temporalio::Client.connect('localhost:7233', 'default', logger:) + +# Run workflow +logger.info('Starting workflow') +client.execute_workflow( + UpdatableTimer::UpdatableTimerWorkflow, (Time.now(in: 'utc') + (24 * 60 * 60)).to_r, + id: 'updatable-timer-sample-workflow-id', task_queue: 'updatable-timer-sample' +) +logger.info('Workflow complete') diff --git a/updatable_timer/updatable_timer.rb b/updatable_timer/updatable_timer.rb new file mode 100644 index 0000000..ce430c6 --- /dev/null +++ b/updatable_timer/updatable_timer.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' + +module UpdatableTimer + class UpdatableTimer + def initialize(wake_up_time) + @wake_up_time = wake_up_time + end + + attr_reader :wake_up_time + + def wake_up_time=(wake_up_time) + Temporalio::Workflow.logger.info("update_wake_up_time: #{wake_up_time}") + @wake_up_time = wake_up_time + @wake_up_time_updated = true + end + + def sleep + Temporalio::Workflow.logger.info("sleep until: #{@wake_up_time}") + loop do + now = Temporalio::Workflow.now + sleep_interval = @wake_up_time - now + + break if sleep_interval.negative? + + Temporalio::Workflow.logger.info("going to sleep for: #{sleep_interval}") + + begin + @wake_up_time_updated = false + Temporalio::Workflow.timeout(sleep_interval) do + Temporalio::Workflow.wait_condition { @wake_up_time_updated } + end + rescue Timeout::Error + break + end + end + Temporalio::Workflow.logger.info('sleep_until completed') + end + end +end diff --git a/updatable_timer/updatable_timer_workflow.rb b/updatable_timer/updatable_timer_workflow.rb new file mode 100644 index 0000000..520d7ce --- /dev/null +++ b/updatable_timer/updatable_timer_workflow.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require_relative 'updatable_timer' + +module UpdatableTimer + class UpdatableTimerWorkflow < Temporalio::Workflow::Definition + workflow_init + def initialize(wake_up_time) + @timer = UpdatableTimer.new(Time.at(Rational(wake_up_time))) + end + + def execute(_wake_up_time) + @timer.sleep + end + + workflow_query + def wake_up_time + Temporalio::Workflow.logger.info('get_wake_up_time') + @timer.wake_up_time.to_r + end + + workflow_signal + def update_wake_up_time(wake_up_time) + wake_up_time = Time.at(Rational(wake_up_time)) + Temporalio::Workflow.logger.info("update_wake_up_time: #{wake_up_time}") + @timer.wake_up_time = wake_up_time + end + end +end diff --git a/updatable_timer/wakeup_time_updater.rb b/updatable_timer/wakeup_time_updater.rb new file mode 100644 index 0000000..f8a00f0 --- /dev/null +++ b/updatable_timer/wakeup_time_updater.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require_relative 'updatable_timer_workflow' + +# Create a Temporal client +logger = Logger.new($stdout, level: Logger::INFO) +client = Temporalio::Client.connect('localhost:7233', 'default', logger:) +handle = client.workflow_handle('updatable-timer-sample-workflow-id') + +handle.signal(UpdatableTimer::UpdatableTimerWorkflow.update_wake_up_time, (Time.now(in: 'utc') + 10).to_r) +logger.info('Updated wake up time to 10 seconds from now') diff --git a/updatable_timer/worker.rb b/updatable_timer/worker.rb new file mode 100644 index 0000000..00f69fa --- /dev/null +++ b/updatable_timer/worker.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require_relative 'updatable_timer_workflow' +require 'logger' +require 'temporalio/client' +require 'temporalio/worker' + +# Create a Temporal client +client = Temporalio::Client.connect( + 'localhost:7233', + 'default', + logger: Logger.new($stdout, level: Logger::INFO) +) + +# Create worker with the activities and workflow +worker = Temporalio::Worker.new( + client:, + task_queue: 'updatable-timer-sample', + workflows: [UpdatableTimer::UpdatableTimerWorkflow] +) + +# Run the worker until SIGINT +puts 'Starting worker (ctrl+c to exit)' +worker.run(shutdown_signals: ['SIGINT'])