Skip to content

Commit c0069f2

Browse files
Better observer state handling.
1 parent b3024a7 commit c0069f2

5 files changed

Lines changed: 65 additions & 111 deletions

File tree

lib/async/utilization/metric.rb

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@ class Metric
1414
# Initialize a new metric.
1515
#
1616
# @parameter name [Symbol] The field name for this metric.
17-
# @parameter registry [Registry] The registry instance to use.
18-
def initialize(name, registry)
17+
def initialize(name)
1918
@name = name.to_sym
20-
@registry = registry
2119
@value = 0
22-
@cache_valid = false
23-
@cached_field_info = nil
20+
21+
@observer = nil
22+
@cached_field = nil
2423
@cached_buffer = nil
2524
@guard = Mutex.new
2625
end
@@ -34,19 +33,43 @@ def initialize(name, registry)
3433
# @attribute [Mutex] The mutex for thread safety.
3534
attr :guard
3635

37-
# Invalidate the cached field information.
36+
# Set the observer and rebuild cache.
3837
#
39-
# Called when the observer changes to force cache rebuild.
40-
def invalidate
41-
@cache_valid = false
42-
@cached_field_info = nil
43-
@cached_buffer = nil
38+
# This is called when the registry assigns a new observer (or removes it).
39+
# The cache is invalidated and then immediately recomputed so that the
40+
# fast write path doesn't need to re-check the observer on the first write.
41+
#
42+
# @parameter observer [#set] The new observer (or nil).
43+
def observer=(observer)
44+
@guard.synchronize do
45+
@observer = observer
46+
47+
# Eagerly validate so the first write is fast.
48+
outcome = :no_observer
49+
if @observer
50+
if field = @observer.schema[@name]
51+
if buffer = @observer.buffer
52+
@cached_field = field
53+
@cached_buffer = buffer
54+
outcome = :cached
55+
else
56+
outcome = :no_buffer
57+
end
58+
else
59+
outcome = :missing_field_in_schema
60+
end
61+
else
62+
outcome = :unsupported_observer
63+
end
64+
65+
# Console.info(self, "Cache validation", metric: @name, outcome: outcome)
66+
67+
write_direct(@value)
68+
end
4469
end
4570

4671
# Increment the metric value.
4772
#
48-
# Uses the fast path (direct buffer write) when cache is valid and observer is available.
49-
#
5073
# @returns [Integer] The new value of the field.
5174
def increment
5275
@guard.synchronize do
@@ -103,28 +126,6 @@ def set(value)
103126

104127
protected
105128

106-
# Check if the cache is valid and rebuild if necessary.
107-
#
108-
# Always attempts to build the cache if it's invalid. Returns true if cache
109-
# is now valid (observer exists, field is in schema, and buffer is available), false otherwise.
110-
#
111-
# @returns [bool] True if cache is valid, false otherwise.
112-
def ensure_cache_valid!
113-
unless @cache_valid
114-
if observer = @registry.observer
115-
if field = observer.schema[@name]
116-
if buffer = observer.buffer
117-
@cached_field_info = field
118-
@cached_buffer = buffer
119-
end
120-
end
121-
end
122-
123-
# Once we've validated the cache, even if there was no observer or buffer, we mark it as valid, so that we don't try to revalidate it again:
124-
@cache_valid = true
125-
end
126-
end
127-
128129
# Write directly to the cached buffer if available.
129130
#
130131
# This is the fast path that avoids hash lookups. Always ensures cache is valid
@@ -133,16 +134,12 @@ def ensure_cache_valid!
133134
# @parameter value [Numeric] The value to write.
134135
# @returns [Boolean] Whether the write succeeded.
135136
def write_direct(value)
136-
self.ensure_cache_valid!
137-
138137
if @cached_buffer
139-
@cached_buffer.set_value(@cached_field_info.type, @cached_field_info.offset, value)
138+
@cached_buffer.set_value(@cached_field.type, @cached_field.offset, value)
140139
end
141140

142141
return true
143142
rescue => error
144-
# If write fails, log warning but don't invalidate cache
145-
# The error might be transient, and invalidating would force hash lookups
146143
Console.warn(self, "Failed to write metric value!", metric: {name: @name, value: value}, exception: error)
147144

148145
return false

lib/async/utilization/observer.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ def initialize(schema, buffer)
8080
# @parameter field [Symbol] The field name to set.
8181
# @parameter value [Numeric] The value to set.
8282
def set(field, value)
83-
if field = @schema[field]
84-
@buffer.set_value(field.type, field.offset, value)
83+
if entry = @schema[field]
84+
@buffer.set_value(entry.type, entry.offset, value)
85+
# Console.info(self, "Wrote utilization metric", field: field, value: value, offset: entry.offset)
8586
end
8687
rescue => error
8788
Console.warn(self, "Failed to set field in shared memory!", field: field, exception: error)

lib/async/utilization/registry.rb

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Released under the MIT License.
44
# Copyright, 2026, by Samuel Williams.
55

6+
require "console"
7+
68
module Async
79
module Utilization
810
# Registry for emitting utilization metrics.
@@ -68,19 +70,16 @@ def values
6870
# @parameter observer [#set] The observer to set.
6971
def observer=(observer)
7072
@guard.synchronize do
71-
# Invalidate all cached metrics
73+
@observer = observer
74+
75+
# Invalidate all cached metrics with new observer (or nil)
7276
@metrics.each_value do |metric|
73-
metric.invalidate
77+
metric.observer = observer
7478
end
7579

76-
@observer = observer
80+
# Console.info(self, "Observer assigned", observer: observer, metric_count: @metrics.size)
7781
end
7882

79-
# Notify observer of all current metric values (outside guard to avoid deadlock)
80-
@metrics.each do |name, metric|
81-
value = metric.guard.synchronize{metric.value}
82-
observer.set(name, value)
83-
end
8483
end
8584

8685
# Set a field value.
@@ -135,7 +134,7 @@ def metric(field)
135134
field = field.to_sym
136135

137136
@guard.synchronize do
138-
@metrics[field] ||= Metric.new(field, self)
137+
@metrics[field] ||= Metric.new(field)
139138
end
140139
end
141140
end

test/async/utilization/metric.rb

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,9 @@
154154
end
155155

156156
it "writes directly to shared memory when observer is set" do
157-
registry.observer = observer
158157
metric = registry.metric(:total_requests)
159-
158+
registry.observer = observer
159+
160160
metric.set(42)
161161

162162
# Read back from file to verify
@@ -216,33 +216,13 @@
216216
expect(metric1).to be == metric2
217217
end
218218

219-
it "falls back to observer.set when write_direct fails" do
220-
registry.observer = observer
221-
metric = registry.metric(:total_requests)
222-
223-
# Force cache to be invalid by invalidating it
224-
metric.invalidate
225-
226-
# Set a value - should fall back to observer.set
227-
metric.set(42)
228-
expect(metric.value).to be == 42
229-
230-
# Verify it was written to shared memory
231-
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
232-
expect(buffer.get_value(:u64, 0)).to be == 42
233-
end
234-
235219
it "handles write errors gracefully" do
236220
registry.observer = observer
237221
metric = registry.metric(:total_requests)
238222

239223
# Set a value first to build the cache
240224
metric.set(10)
241-
242-
# Verify cache is built
243-
expect(metric.instance_variable_get(:@cache_valid)).to be == true
244-
cached_buffer = metric.instance_variable_get(:@cached_buffer)
245-
225+
246226
# Create an invalid buffer that will raise an error
247227
invalid_buffer = Object.new
248228
def invalid_buffer.set_value(type, offset, value)
@@ -251,13 +231,10 @@ def invalid_buffer.set_value(type, offset, value)
251231

252232
metric.instance_variable_set(:@cached_buffer, invalid_buffer)
253233

254-
# Should not raise, but log warning and keep cache valid
234+
# Should not raise, but log warning
255235
metric.set(42)
256236
expect(metric.value).to be == 42
257-
258-
# Cache should remain valid (not invalidated on error)
259-
expect(metric.instance_variable_get(:@cache_valid)).to be == true
260-
237+
261238
# Assert that a warning was logged
262239
expect_console.to have_logged(
263240
severity: be == :warn,

test/async/utilization/registry.rb

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -56,44 +56,24 @@
5656
expect(registry.values[:test_field]).to be == 0
5757
end
5858

59-
it "can set an observer" do
60-
observer = Object.new
61-
def observer.set(field, value); end
62-
63-
registry.set(:test_field, 10)
64-
registry.observer = observer
65-
66-
expect(registry.observer).to be == observer
67-
end
68-
6959
it "notifies observer when values change" do
70-
values_set = []
71-
72-
# Create a proper observer with schema
7360
schema = Async::Utilization::Schema.build(test_field: :u64)
61+
buffer = IO::Buffer.new(8)
62+
7463
observer = Object.new
75-
76-
# Define methods on observer
77-
observer.define_singleton_method(:set) do |field, value|
78-
values_set << [field, value]
79-
end
80-
observer.define_singleton_method(:schema){schema}
81-
observer.define_singleton_method(:buffer){nil} # No buffer, so write_direct will return false
82-
64+
observer.define_singleton_method(:schema) { schema }
65+
observer.define_singleton_method(:buffer) { buffer }
66+
8367
registry.set(:test_field, 5)
8468
registry.observer = observer
85-
86-
# Observer should be notified of existing values
87-
expect(values_set).to be(:include?, [:test_field, 5])
88-
89-
# Clear and test new changes
90-
# Note: Since observer has no buffer, write_direct will return false
91-
# and no notification will occur (as per new design)
92-
values_set.clear
69+
70+
# Buffer should be synced with the existing value on observer assignment
71+
expect(buffer.get_value(:u64, 0)).to be == 5
72+
9373
registry.increment(:test_field)
94-
95-
# With no buffer, write_direct fails silently, so no notification
96-
expect(values_set).to be == []
74+
75+
# Buffer should reflect the incremented value
76+
expect(buffer.get_value(:u64, 0)).to be == 6
9777
end
9878

9979
it "uses metric method for fast path" do

0 commit comments

Comments
 (0)