Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ Prerequisites:
* [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities.
* [saga](saga) - Using undo/compensation using a very simplistic Saga pattern.
* [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK.
* [updatable_timer](updatable_timer) - Demonstrates a blocking sleep that can be updated.
* [worker_specific_task_queues](worker_specific_task_queues) - Use a unique Task Queue for each Worker to run a sequence of Activities on the same Worker.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

## Development

To check format and test this repository, run:

bundle exec rake
bundle exec rake

49 changes: 49 additions & 0 deletions test/updatable_timer/updatable_timer_workflow_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

require 'test'
require 'updatable_timer/updatable_timer_workflow'
require 'securerandom'
require 'temporalio/client'
require 'temporalio/testing'
require 'temporalio/worker'

module UpdatableTimer
class UpdatableTimerWorkflowTest < Test
def test_workflow
Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env|
# Run workflow in a worker
worker = Temporalio::Worker.new(
client: env.client,
task_queue: "tq-#{SecureRandom.uuid}",
workflows: [UpdatableTimerWorkflow]
)
handle = worker.run do
day_from_now = (Time.now(in: 'utc') + (24 * 60 * 60)).to_r
hour_from_now = (Time.now(in: 'utc') + (60 * 60)).to_r
handle = env.client.start_workflow(
UpdatableTimerWorkflow, day_from_now,
id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue
)
assert_equal day_from_now, Rational(handle.query(UpdatableTimerWorkflow.wake_up_time))
env.sleep(10)
handle.signal(UpdatableTimerWorkflow.update_wake_up_time, hour_from_now)
assert_equal hour_from_now, Rational(handle.query(UpdatableTimerWorkflow.wake_up_time))

handle.result
handle
end
timer_events = handle.fetch_history_events.filter_map do |e|
timer_id = (e.timer_started_event_attributes ||
e.timer_canceled_event_attributes ||
e.timer_fired_event_attributes)&.timer_id
[e.event_type, timer_id] if timer_id
end.compact
assert_equal(
[[:EVENT_TYPE_TIMER_STARTED, '1'], [:EVENT_TYPE_TIMER_CANCELED, '1'], [:EVENT_TYPE_TIMER_STARTED, '2'],
[:EVENT_TYPE_TIMER_FIRED, '2']],
timer_events
)
end
end
end
end
25 changes: 25 additions & 0 deletions updatable_timer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Updatable Timer Sample

Demonstrates a helper class which relies on `Temporalio::Workflow.wait_condition` to implement a blocking sleep that can be updated at any moment.

To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker from this directory:

```bash
bundle exec ruby worker.rb
```

Then in another terminal, use the Ruby client to the workflow from this directory:

```bash
bundle exec ruby starter.rb
```

The Ruby code will invoke the workflow which will create a timer that will resolve in a day.

Finally in a third terminal, run the updater to change the timer to 10 seconds from now:

```bash
bundle exec ruby wake_up_timer_updater.rb
```

There is also a [test](../test/updatable_timer/updatable_timer_workflow_test.rb) that demonstrates querying the wake up time.
16 changes: 16 additions & 0 deletions updatable_timer/starter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

require 'temporalio/client'
require_relative 'updatable_timer_workflow'

# Create a Temporal client
logger = Logger.new($stdout, level: Logger::INFO)
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)

# Run workflow
logger.info('Starting workflow')
client.execute_workflow(
UpdatableTimer::UpdatableTimerWorkflow, (Time.now(in: 'utc') + (24 * 60 * 60)).to_r,
id: 'updatable-timer-sample-workflow-id', task_queue: 'updatable-timer-sample'
)
logger.info('Workflow complete')
41 changes: 41 additions & 0 deletions updatable_timer/updatable_timer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

require 'temporalio/workflow'

module UpdatableTimer
class UpdatableTimer
def initialize(wake_up_time)
@wake_up_time = wake_up_time
end

attr_reader :wake_up_time

def wake_up_time=(wake_up_time)
Temporalio::Workflow.logger.info("update_wake_up_time: #{wake_up_time}")
@wake_up_time = wake_up_time
@wake_up_time_updated = true
end

def sleep
Temporalio::Workflow.logger.info("sleep until: #{@wake_up_time}")
loop do
now = Temporalio::Workflow.now
sleep_interval = @wake_up_time - now

break if sleep_interval.negative?

Temporalio::Workflow.logger.info("going to sleep for: #{sleep_interval}")

begin
@wake_up_time_updated = false
Temporalio::Workflow.timeout(sleep_interval) do
Temporalio::Workflow.wait_condition { @wake_up_time_updated }
end
rescue Timeout::Error
break
end
end
Temporalio::Workflow.logger.info('sleep_until completed')
end
end
end
30 changes: 30 additions & 0 deletions updatable_timer/updatable_timer_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

require 'temporalio/workflow'
require_relative 'updatable_timer'

module UpdatableTimer
class UpdatableTimerWorkflow < Temporalio::Workflow::Definition
workflow_init
def initialize(wake_up_time)
@timer = UpdatableTimer.new(Time.at(Rational(wake_up_time)))
end

def execute(_wake_up_time)
@timer.sleep
end

workflow_query
def wake_up_time
Temporalio::Workflow.logger.info('get_wake_up_time')
@timer.wake_up_time.to_r
end

workflow_signal
def update_wake_up_time(wake_up_time)
wake_up_time = Time.at(Rational(wake_up_time))
Temporalio::Workflow.logger.info("update_wake_up_time: #{wake_up_time}")
@timer.wake_up_time = wake_up_time
end
end
end
12 changes: 12 additions & 0 deletions updatable_timer/wakeup_time_updater.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

require 'temporalio/client'
require_relative 'updatable_timer_workflow'

# Create a Temporal client
logger = Logger.new($stdout, level: Logger::INFO)
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)
handle = client.workflow_handle('updatable-timer-sample-workflow-id')

handle.signal(UpdatableTimer::UpdatableTimerWorkflow.update_wake_up_time, (Time.now(in: 'utc') + 10).to_r)
logger.info('Updated wake up time to 10 seconds from now')
24 changes: 24 additions & 0 deletions updatable_timer/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require_relative 'updatable_timer_workflow'
require 'logger'
require 'temporalio/client'
require 'temporalio/worker'

# Create a Temporal client
client = Temporalio::Client.connect(
'localhost:7233',
'default',
logger: Logger.new($stdout, level: Logger::INFO)
)

# Create worker with the activities and workflow
worker = Temporalio::Worker.new(
client:,
task_queue: 'updatable-timer-sample',
workflows: [UpdatableTimer::UpdatableTimerWorkflow]
)

# Run the worker until SIGINT
puts 'Starting worker (ctrl+c to exit)'
worker.run(shutdown_signals: ['SIGINT'])