From 7d85c0e59473edda55e8eaf6c2fa51bece545bc1 Mon Sep 17 00:00:00 2001 From: Michael Go Date: Wed, 12 Nov 2025 19:08:04 -0400 Subject: [PATCH 1/7] refactor connection to use MessagePack instead of JSON --- bake/async/container/supervisor.rb | 2 +- examples/simple/simple.rb | 2 +- lib/async/container/supervisor/connection.rb | 34 ++++++- .../container/supervisor/message_wrapper.rb | 90 +++++++++++++++++++ test/async/container/connection.rb | 61 ++++++++----- test/async/container/server.rb | 61 ++++++++----- 6 files changed, 197 insertions(+), 53 deletions(-) create mode 100644 lib/async/container/supervisor/message_wrapper.rb diff --git a/bake/async/container/supervisor.rb b/bake/async/container/supervisor.rb index 3fa462d..17e59ff 100644 --- a/bake/async/container/supervisor.rb +++ b/bake/async/container/supervisor.rb @@ -44,7 +44,7 @@ def memory_sample(duration: 10, connection_id:) operation = {do: :memory_sample, duration: duration} # Use the forward operation to proxy the request to a worker: - return connection.call(do: :forward, operation: operation, connection_id: connection_id) + connection.call(do: :forward, operation: operation, connection_id: connection_id) end end diff --git a/examples/simple/simple.rb b/examples/simple/simple.rb index 5c8f385..cdee009 100755 --- a/examples/simple/simple.rb +++ b/examples/simple/simple.rb @@ -33,7 +33,7 @@ def setup(container) Console.info(self, "Exiting...") end end - end + end end service "sleep" do diff --git a/lib/async/container/supervisor/connection.rb b/lib/async/container/supervisor/connection.rb index 49e9e28..1f12d5e 100644 --- a/lib/async/container/supervisor/connection.rb +++ b/lib/async/container/supervisor/connection.rb @@ -5,6 +5,7 @@ require "json" require "async" +require_relative "message_wrapper" module Async module Container @@ -13,6 +14,8 @@ module Supervisor # # Handles message passing, call/response patterns, and connection lifecycle. class Connection + MAX_MESSAGE_SIZE = 2 ** 32 - 1 + # Represents a remote procedure call over a connection. # # Manages the call lifecycle, response queueing, and completion signaling. @@ -231,7 +234,7 @@ def initialize(stream, id = 0, **state) @stream = stream @id = id @state = state - + @message_wrapper = MessageWrapper.new @reader = nil @calls = {} end @@ -251,9 +254,15 @@ def next_id # Write a message to the connection stream. # + # Uses a length-prefixed protocol: 2-byte length header (big-endian) followed by data. + # This allows MessagePack data to contain newlines without breaking message boundaries. + # # @parameter message [Hash] The message to write. def write(**message) - @stream.write(JSON.dump(message) << "\n") + data = @message_wrapper.pack(message) + # Write 4-byte length prefix + data_length = [data.bytesize].pack("N") + @stream.write(data_length + data) @stream.flush end @@ -261,9 +270,26 @@ def write(**message) # # @returns [Hash, nil] The parsed message or nil if stream is closed. def read - if line = @stream&.gets - JSON.parse(line, symbolize_names: true) + length_data = @stream&.read(4) + return nil unless length_data && length_data.bytesize == 4 + + # Unpack 32-bit integer + length = length_data.unpack1("N") + + # Validate message size + if length > MAX_MESSAGE_SIZE + Console.error(self, "Message too large: #{length} bytes (max: #{MAX_MESSAGE_SIZE})") + return nil end + + # Read the exact amount of data specified + data = @stream.read(length) + + unless data && data.bytesize == length + raise EOFError, "Failed to read complete message" + end + + @message_wrapper.unpack(data) end # Iterate over all messages from the connection. diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb new file mode 100644 index 0000000..814b094 --- /dev/null +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "msgpack" + +module Async + module Container + module Supervisor + class MessageWrapper + def initialize + @factory = MessagePack::Factory.new + + register_types + + @packer = @factory.packer + end + + def pack(message) + @packer.clear + normalized_message = normalize(message) + @packer.pack(normalized_message) + @packer.full_pack + end + + def unpack(data) + @factory.unpack(data) + end + + private + + def normalize(obj) + case obj + when Hash + obj.transform_values{|v| normalize(v)} + when Array + obj.map{|v| normalize(v)} + else + if obj.respond_to?(:as_json) + normalize(obj.as_json) + else + obj + end + end + end + + def register_types + @factory.register_type(0x00, Symbol) + + @factory.register_type( + 0x01, + Exception, + packer: self.method(:pack_exception), + unpacker: self.method(:unpack_exception), + recursive: true, + ) + + @factory.register_type( + 0x02, + Class, + packer: ->(klass) {klass.name}, + unpacker: ->(name) {name}, + ) + + @factory.register_type( + MessagePack::Timestamp::TYPE, + Time, + packer: MessagePack::Time::Packer, + unpacker: MessagePack::Time::Unpacker + ) + end + + def pack_exception(exception) + [exception.class.name, exception.message, exception.backtrace].pack("A*") + end + + def unpack_exception(data) + klass, message, backtrace = data.unpack("A*A*A*") + klass = Object.const_get(klass) + + exception = klass.new(message) + exception.set_backtrace(backtrace) + + return exception + end + end + end + end +end diff --git a/test/async/container/connection.rb b/test/async/container/connection.rb index 673b754..3bbc1d3 100644 --- a/test/async/container/connection.rb +++ b/test/async/container/connection.rb @@ -6,6 +6,7 @@ require "async/container/supervisor/connection" require "sus/fixtures/async/scheduler_context" require "stringio" +require "msgpack" class TestTarget def initialize(&block) @@ -20,6 +21,7 @@ def dispatch(call) describe Async::Container::Supervisor::Connection do let(:stream) {StringIO.new} let(:connection) {Async::Container::Supervisor::Connection.new(stream)} + let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new} with "dispatch" do it "handles failed writes when dispatching a call" do @@ -88,16 +90,16 @@ def dispatch(call) parsed = JSON.parse(json) expect(parsed).to have_keys( - "do" => be == "test", - "data" => be == "value" - ) + "do" => be == "test", + "data" => be == "value" + ) end it "can get call message via as_json" do expect(test_call.as_json).to have_keys( - do: be == :test, - data: be == "value" - ) + do: be == :test, + data: be == "value" + ) end it "can iterate over call responses with each" do @@ -121,11 +123,11 @@ def dispatch(call) response = test_call.pop expect(response).to have_keys( - id: be == 1, - finished: be == true, - failed: be == true, - error: be == "Something went wrong" - ) + id: be == 1, + finished: be == true, + failed: be == true, + error: be == "Something went wrong" + ) expect(test_call.closed?).to be == true end @@ -144,30 +146,43 @@ def dispatch(call) end end - it "writes JSON with newline" do + it "writes length-prefixed MessagePack data" do connection.write(id: 1, do: :test) stream.rewind - output = stream.read - # Check it's valid JSON with a newline - expect(output[-1]).to be == "\n" + # Read 2-byte length prefix + length_data = stream.read(4) + expect(length_data.bytesize).to be == 4 - parsed = JSON.parse(output.chomp) + length = length_data.unpack1("N") + expect(length).to be > 0 + + # Read MessagePack data + data = stream.read(length) + expect(data.bytesize).to be == length + + # Parse MessagePack + parsed = message_wrapper.unpack(data) expect(parsed).to have_keys( - "id" => be == 1, - "do" => be == "test" + id: be == 1, + do: be == :test ) end - it "parses JSON lines" do - stream.string = JSON.dump({id: 1, do: "test"}) << "\n" + it "reads length-prefixed MessagePack data" do + # Create MessagePack data + message = {id: 1, do: "test"} + data = message_wrapper.pack(message) + + # Write with length prefix + stream.string = [data.bytesize].pack("N") + data stream.rewind - message = connection.read + parsed = connection.read - # Connection.read uses symbolize_names: true (keys are symbols, values are as-is) - expect(message).to have_keys( + # Keys are symbols + expect(parsed).to have_keys( id: be == 1, do: be == "test" ) diff --git a/test/async/container/server.rb b/test/async/container/server.rb index 905a13e..3d1c8d8 100644 --- a/test/async/container/server.rb +++ b/test/async/container/server.rb @@ -10,11 +10,32 @@ include Async::Container::Supervisor::AServer include Sus::Fixtures::Console::CapturedLogger + let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new} + + # Helper to write length-prefixed MessagePack data + def write_message(stream, message) + data = message_wrapper.pack(message) + stream.write([data.bytesize].pack("N") + data) + stream.flush + end + + # Helper to read length-prefixed MessagePack data + def read_message(stream) + length_data = stream.read(4) + return nil unless length_data && length_data.bytesize == 4 + + length = length_data.unpack1("N") + data = stream.read(length) + return nil unless data && data.bytesize == length + + message_wrapper.unpack(data) + end + it "can handle unexpected failures" do # First, send invalid JSON to trigger the error: endpoint.connect do |stream| - # Send malformed JSON that will cause parsing errors: - stream.write("not valid json\n") + # Send malformed data (just 2 bytes claiming huge size, but no actual data): + stream.write([999999].pack("N")) stream.flush end @@ -23,11 +44,10 @@ # Send a valid register message: message = {id: 1, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Read the response: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should respond with a finished message: expect(response).to have_keys( @@ -69,11 +89,10 @@ def status(call) stream = endpoint.connect message = {id: 1, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Read the response: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should still finish despite the monitor error: expect(response).to have_keys( @@ -93,11 +112,10 @@ def status(call) stream = endpoint.connect message = {id: 1, do: :status} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Read the response: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should still respond with a finished message despite the monitor error: expect(response).to have_keys( @@ -128,10 +146,9 @@ def status(call) # Verify server is still working by sending a new request: stream = endpoint.connect message = {id: 1, do: :status} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) expect(response).to have_keys( id: be == 1, finished: be == true @@ -152,8 +169,7 @@ def status(call) # Simulate what happens when a timed-out response arrives: # The response only has id and finished (no 'do' key) because it's a response, not a request message = {id: 1, finished: true} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Wait for the message to be processed reactor.sleep(0.01) @@ -164,11 +180,10 @@ def status(call) # Send a valid message to confirm the server is still working: valid_message = {id: 3, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(valid_message)) - stream.flush + write_message(stream, valid_message) # Read the response to the valid message: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should have ignored the stale response and processed the valid one: expect(response).to have_keys( @@ -186,17 +201,15 @@ def status(call) # Send a stale response: stale_message = {id: 5, finished: true} - stream.puts(JSON.dump(stale_message)) - stream.flush + write_message(stream, stale_message) # Send a valid message: valid_message = {id: 7, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(valid_message)) - stream.flush + write_message(stream, valid_message) # We should only get ONE response - for the valid message. # Not an error response for the stale message. - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) expect(response).to have_keys( id: be == 7, From e9e3fb6f95d6f492888bf37a294e1b5835536e00 Mon Sep 17 00:00:00 2001 From: Michael Go Date: Thu, 13 Nov 2025 11:18:58 -0400 Subject: [PATCH 2/7] refactor connection to use MessagePacker's internal stream --- lib/async/container/supervisor/connection.rb | 31 ++--------- .../container/supervisor/message_wrapper.rb | 17 +++++- test/async/container/connection.rb | 55 +++---------------- test/async/container/server.rb | 17 +----- 4 files changed, 30 insertions(+), 90 deletions(-) diff --git a/lib/async/container/supervisor/connection.rb b/lib/async/container/supervisor/connection.rb index 1f12d5e..ff14e5a 100644 --- a/lib/async/container/supervisor/connection.rb +++ b/lib/async/container/supervisor/connection.rb @@ -234,7 +234,7 @@ def initialize(stream, id = 0, **state) @stream = stream @id = id @state = state - @message_wrapper = MessageWrapper.new + @message_wrapper = MessageWrapper.new(@stream) @reader = nil @calls = {} end @@ -259,37 +259,16 @@ def next_id # # @parameter message [Hash] The message to write. def write(**message) - data = @message_wrapper.pack(message) - # Write 4-byte length prefix - data_length = [data.bytesize].pack("N") - @stream.write(data_length + data) - @stream.flush + @message_wrapper.write(message) end # Read a message from the connection stream. # # @returns [Hash, nil] The parsed message or nil if stream is closed. def read - length_data = @stream&.read(4) - return nil unless length_data && length_data.bytesize == 4 - - # Unpack 32-bit integer - length = length_data.unpack1("N") - - # Validate message size - if length > MAX_MESSAGE_SIZE - Console.error(self, "Message too large: #{length} bytes (max: #{MAX_MESSAGE_SIZE})") - return nil - end - - # Read the exact amount of data specified - data = @stream.read(length) - - unless data && data.bytesize == length - raise EOFError, "Failed to read complete message" - end - - @message_wrapper.unpack(data) + @message_wrapper.read + rescue EOFError, IOError + nil end # Iterate over all messages from the connection. diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb index 814b094..dc30922 100644 --- a/lib/async/container/supervisor/message_wrapper.rb +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -9,12 +9,25 @@ module Async module Container module Supervisor class MessageWrapper - def initialize + def initialize(stream) @factory = MessagePack::Factory.new register_types - @packer = @factory.packer + @packer = @factory.packer(stream) + @unpacker = @factory.unpacker(stream) + end + + def write(message) + data = pack(message) + # Console.logger.info("Sending data: #{message.inspect}") + @packer.write(data) + end + + def read + data = @unpacker.read + # Console.logger.info("Received data: #{data.inspect}") + data end def pack(message) diff --git a/test/async/container/connection.rb b/test/async/container/connection.rb index 3bbc1d3..4d631a3 100644 --- a/test/async/container/connection.rb +++ b/test/async/container/connection.rb @@ -21,12 +21,16 @@ def dispatch(call) describe Async::Container::Supervisor::Connection do let(:stream) {StringIO.new} let(:connection) {Async::Container::Supervisor::Connection.new(stream)} - let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new} + let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new(stream)} + + def write_message(message) + message_wrapper.write(message) + stream.rewind + end with "dispatch" do it "handles failed writes when dispatching a call" do - stream.write(JSON.dump({id: 1, do: :test}) << "\n") - stream.rewind + write_message({id: 1, do: :test}) expect(stream).to receive(:write).and_raise(IOError, "Write error") @@ -44,8 +48,7 @@ def dispatch(call) end it "closes the queue when the connection fails" do - stream.write(JSON.dump({id: 1, do: :test}) << "\n") - stream.rewind + write_message({id: 1, do: :test}) expect(stream).to receive(:write).and_raise(IOError, "Write error") @@ -146,48 +149,6 @@ def dispatch(call) end end - it "writes length-prefixed MessagePack data" do - connection.write(id: 1, do: :test) - - stream.rewind - - # Read 2-byte length prefix - length_data = stream.read(4) - expect(length_data.bytesize).to be == 4 - - length = length_data.unpack1("N") - expect(length).to be > 0 - - # Read MessagePack data - data = stream.read(length) - expect(data.bytesize).to be == length - - # Parse MessagePack - parsed = message_wrapper.unpack(data) - expect(parsed).to have_keys( - id: be == 1, - do: be == :test - ) - end - - it "reads length-prefixed MessagePack data" do - # Create MessagePack data - message = {id: 1, do: "test"} - data = message_wrapper.pack(message) - - # Write with length prefix - stream.string = [data.bytesize].pack("N") + data - stream.rewind - - parsed = connection.read - - # Keys are symbols - expect(parsed).to have_keys( - id: be == 1, - do: be == "test" - ) - end - it "returns nil when stream is closed" do stream.string = "" stream.rewind diff --git a/test/async/container/server.rb b/test/async/container/server.rb index 3d1c8d8..2027ec8 100644 --- a/test/async/container/server.rb +++ b/test/async/container/server.rb @@ -10,25 +10,12 @@ include Async::Container::Supervisor::AServer include Sus::Fixtures::Console::CapturedLogger - let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new} - - # Helper to write length-prefixed MessagePack data def write_message(stream, message) - data = message_wrapper.pack(message) - stream.write([data.bytesize].pack("N") + data) - stream.flush + Async::Container::Supervisor::MessageWrapper.new(stream).write(message) end - # Helper to read length-prefixed MessagePack data def read_message(stream) - length_data = stream.read(4) - return nil unless length_data && length_data.bytesize == 4 - - length = length_data.unpack1("N") - data = stream.read(length) - return nil unless data && data.bytesize == length - - message_wrapper.unpack(data) + Async::Container::Supervisor::MessageWrapper.new(stream).read end it "can handle unexpected failures" do From 5a4951127910bcfc19e11a1fba234776abbe5c8a Mon Sep 17 00:00:00 2001 From: Michael Go Date: Thu, 13 Nov 2025 11:36:24 -0400 Subject: [PATCH 3/7] remove to_json from connection --- lib/async/container/supervisor/connection.rb | 10 ---------- lib/async/container/supervisor/message_wrapper.rb | 5 +---- test/async/container/connection.rb | 10 ---------- 3 files changed, 1 insertion(+), 24 deletions(-) diff --git a/lib/async/container/supervisor/connection.rb b/lib/async/container/supervisor/connection.rb index ff14e5a..789029b 100644 --- a/lib/async/container/supervisor/connection.rb +++ b/lib/async/container/supervisor/connection.rb @@ -3,7 +3,6 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require "json" require "async" require_relative "message_wrapper" @@ -14,8 +13,6 @@ module Supervisor # # Handles message passing, call/response patterns, and connection lifecycle. class Connection - MAX_MESSAGE_SIZE = 2 ** 32 - 1 - # Represents a remote procedure call over a connection. # # Manages the call lifecycle, response queueing, and completion signaling. @@ -40,13 +37,6 @@ def as_json(...) @message end - # Convert the call to a JSON string. - # - # @returns [String] The JSON representation. - def to_json(...) - as_json.to_json(...) - end - # @attribute [Connection] The connection that initiated the call. attr :connection diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb index dc30922..969b727 100644 --- a/lib/async/container/supervisor/message_wrapper.rb +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -20,14 +20,11 @@ def initialize(stream) def write(message) data = pack(message) - # Console.logger.info("Sending data: #{message.inspect}") @packer.write(data) end def read - data = @unpacker.read - # Console.logger.info("Received data: #{data.inspect}") - data + @unpacker.read end def pack(message) diff --git a/test/async/container/connection.rb b/test/async/container/connection.rb index 4d631a3..8c3168c 100644 --- a/test/async/container/connection.rb +++ b/test/async/container/connection.rb @@ -88,16 +88,6 @@ def write_message(message) with subject::Call do let(:test_call) {Async::Container::Supervisor::Connection::Call.new(connection, 1, {do: :test, data: "value"})} - it "can serialize call to JSON" do - json = test_call.to_json - parsed = JSON.parse(json) - - expect(parsed).to have_keys( - "do" => be == "test", - "data" => be == "value" - ) - end - it "can get call message via as_json" do expect(test_call.as_json).to have_keys( do: be == :test, From 7e7f7e6918283d8fbf8cf7e030169a34bb9de2a6 Mon Sep 17 00:00:00 2001 From: Michael Go Date: Fri, 14 Nov 2025 12:10:00 -0400 Subject: [PATCH 4/7] fix packing exceptions --- lib/async/container/supervisor/message_wrapper.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb index 969b727..fdb1d6e 100644 --- a/lib/async/container/supervisor/message_wrapper.rb +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -81,12 +81,13 @@ def register_types ) end - def pack_exception(exception) - [exception.class.name, exception.message, exception.backtrace].pack("A*") + def pack_exception(exception, packer) + message = [exception.class.name, exception.message, exception.backtrace] + packer.write(message) end - def unpack_exception(data) - klass, message, backtrace = data.unpack("A*A*A*") + def unpack_exception(unpacker) + klass, message, backtrace = unpacker.read klass = Object.const_get(klass) exception = klass.new(message) From 0e8c58889c74776456e9ac562f5f919ae3197d5d Mon Sep 17 00:00:00 2001 From: Michael Go Date: Fri, 14 Nov 2025 12:19:57 -0400 Subject: [PATCH 5/7] fix infinite loop from inifinite self-returning as_json objects --- .../container/supervisor/message_wrapper.rb | 2 +- test/async/container/connection.rb | 1 - test/async/container/message_wrapper.rb | 311 ++++++++++++++++++ 3 files changed, 312 insertions(+), 2 deletions(-) create mode 100644 test/async/container/message_wrapper.rb diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb index fdb1d6e..8d77020 100644 --- a/lib/async/container/supervisor/message_wrapper.rb +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -47,7 +47,7 @@ def normalize(obj) when Array obj.map{|v| normalize(v)} else - if obj.respond_to?(:as_json) + if obj.respond_to?(:as_json) && (as_json = obj.as_json) && as_json != obj normalize(obj.as_json) else obj diff --git a/test/async/container/connection.rb b/test/async/container/connection.rb index 8c3168c..1972cce 100644 --- a/test/async/container/connection.rb +++ b/test/async/container/connection.rb @@ -6,7 +6,6 @@ require "async/container/supervisor/connection" require "sus/fixtures/async/scheduler_context" require "stringio" -require "msgpack" class TestTarget def initialize(&block) diff --git a/test/async/container/message_wrapper.rb b/test/async/container/message_wrapper.rb new file mode 100644 index 0000000..be48679 --- /dev/null +++ b/test/async/container/message_wrapper.rb @@ -0,0 +1,311 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/container/supervisor/connection" +require "sus/fixtures/async/scheduler_context" +require "stringio" +require "msgpack" + +class TrueObject + def as_json + true + end +end + +describe Async::Container::Supervisor::MessageWrapper do + let(:stream) {StringIO.new} + let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new(stream)} + + def write_message(message) + message_wrapper.write(message) + stream.rewind + end + + def read_message + message_wrapper.read + end + + with "write and read" do + it "normalizes without infinite loop" do + Integer.define_method(:as_json) do + self + end + + write_message({id: 1, do: :test}) + + message = read_message + expect(message[:id]).to be == 1 + ensure + Integer.send(:remove_method, :as_json) + end + + it "handles simple strings" do + write_message({message: "hello world"}) + + result = read_message + expect(result[:message]).to be == "hello world" + end + + it "handles integers" do + write_message({count: 42, negative: -10}) + + result = read_message + expect(result[:count]).to be == 42 + expect(result[:negative]).to be == -10 + end + + it "handles floats" do + write_message({pi: 3.14159, negative: -2.5}) + + result = read_message + expect(result[:pi]).to be == 3.14159 + expect(result[:negative]).to be == -2.5 + end + + it "handles boolean values" do + write_message({success: true, failed: false}) + + result = read_message + expect(result[:success]).to be == true + expect(result[:failed]).to be == false + end + + it "handles nil values" do + write_message({empty: nil}) + + result = read_message + expect(result[:empty]).to be_nil + end + + it "handles arrays" do + write_message({items: [1, 2, 3, "four", true]}) + + result = read_message + expect(result[:items]).to be == [1, 2, 3, "four", true] + end + + it "handles nested hashes" do + write_message({ + user: { + name: "Alice", + details: { + age: 30, + active: true + } + } + }) + + result = read_message + expect(result[:user][:name]).to be == "Alice" + expect(result[:user][:details][:age]).to be == 30 + expect(result[:user][:details][:active]).to be == true + end + + it "handles nested arrays" do + write_message({matrix: [[1, 2], [3, 4], [5, 6]]}) + + result = read_message + expect(result[:matrix]).to be == [[1, 2], [3, 4], [5, 6]] + end + + it "handles symbols" do + write_message({action: :start, status: :success}) + + result = read_message + expect(result[:action]).to be == :start + expect(result[:status]).to be == :success + end + + it "handles empty hash" do + write_message({}) + + result = read_message + expect(result).to be == {} + end + + it "handles empty array" do + write_message({items: []}) + + result = read_message + expect(result[:items]).to be == [] + end + end + + with "Time handling" do + it "serializes and deserializes Time objects" do + time = Time.now + write_message({timestamp: time}) + + result = read_message + expect(result[:timestamp]).to be_within(0.001).of(time) + end + + it "handles Time in nested structures" do + time = Time.now + write_message({ + event: { + name: "test", + occurred_at: time + } + }) + + result = read_message + expect(result[:event][:occurred_at]).to be_within(0.001).of(time) + end + end + + with "Class handling" do + it "serializes class names" do + write_message({type: String}) + + result = read_message + expect(result[:type]).to be == "String" + end + + it "handles multiple classes" do + write_message({types: [String, Integer, Array]}) + + result = read_message + expect(result[:types]).to be == ["String", "Integer", "Array"] + end + end + + with "Exception handling" do + it "serializes and deserializes RuntimeError" do + error = RuntimeError.new("Something went wrong") + error.set_backtrace(["line1", "line2"]) + + write_message({error: error}) + + result = read_message + expect(result[:error]).to be_a(RuntimeError) + expect(result[:error].message).to be == "Something went wrong" + expect(result[:error].backtrace).to be == ["line1", "line2"] + end + + it "handles StandardError" do + error = StandardError.new("Standard error") + write_message({error: error}) + + result = read_message + expect(result[:error]).to be_a(StandardError) + expect(result[:error].message).to be == "Standard error" + end + + it "handles ArgumentError" do + error = ArgumentError.new("Invalid argument") + write_message({error: error}) + + result = read_message + expect(result[:error]).to be_a(ArgumentError) + expect(result[:error].message).to be == "Invalid argument" + end + end + + with "normalize method" do + it "normalizes objects with as_json method" do + obj = TrueObject.new + write_message({custom: obj}) + + result = read_message + expect(result[:custom]).to be == true + end + + it "normalizes arrays of objects with as_json" do + write_message({items: [TrueObject.new, TrueObject.new]}) + + result = read_message + expect(result[:items]).to be == [true, true] + end + + it "normalizes nested objects with as_json" do + write_message({ + data: { + flag: TrueObject.new, + nested: { + another: TrueObject.new + } + } + }) + + result = read_message + expect(result[:data][:flag]).to be == true + expect(result[:data][:nested][:another]).to be == true + end + end + + with "complex messages" do + it "handles complex nested structures" do + write_message({ + id: 123, + action: :process, + data: { + items: [1, 2, 3], + metadata: { + timestamp: Time.now, + type: String + } + }, + flags: { + active: true, + debug: false + } + }) + + result = read_message + expect(result[:id]).to be == 123 + expect(result[:action]).to be == :process + expect(result[:data][:items]).to be == [1, 2, 3] + expect(result[:flags][:active]).to be == true + end + + it "handles large arrays" do + large_array = (1..1000).to_a + write_message({data: large_array}) + + result = read_message + expect(result[:data]).to be == large_array + end + + it "handles deeply nested structures" do + deep = {level1: {level2: {level3: {level4: {level5: "deep"}}}}} + write_message(deep) + + result = read_message + expect(result[:level1][:level2][:level3][:level4][:level5]).to be == "deep" + end + end + + with "edge cases" do + it "handles empty string" do + write_message({text: ""}) + + result = read_message + expect(result[:text]).to be == "" + end + + it "handles zero values" do + write_message({count: 0, amount: 0.0}) + + result = read_message + expect(result[:count]).to be == 0 + expect(result[:amount]).to be == 0.0 + end + + it "handles unicode strings" do + write_message({text: "Hello δΈ–η•Œ 🌍"}) + + result = read_message + expect(result[:text]).to be == "Hello δΈ–η•Œ 🌍" + end + + it "handles special characters" do + write_message({text: "Line1\nLine2\tTabbed"}) + + result = read_message + expect(result[:text]).to be == "Line1\nLine2\tTabbed" + end + end +end From 6b19d54a382fe36535adcb4ec318aaa6779ad169 Mon Sep 17 00:00:00 2001 From: Michael Go Date: Fri, 14 Nov 2025 12:23:34 -0400 Subject: [PATCH 6/7] prevent infinite loop from packing circular refernce objects --- .../container/supervisor/message_wrapper.rb | 23 +++++++++++++++---- test/async/container/message_wrapper.rb | 10 ++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb index 8d77020..3d30234 100644 --- a/lib/async/container/supervisor/message_wrapper.rb +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -4,6 +4,7 @@ # Copyright, 2025, by Samuel Williams. require "msgpack" +require "set" module Async module Container @@ -29,7 +30,7 @@ def read def pack(message) @packer.clear - normalized_message = normalize(message) + normalized_message = normalize(message, Set.new) @packer.pack(normalized_message) @packer.full_pack end @@ -40,15 +41,27 @@ def unpack(data) private - def normalize(obj) + def normalize(obj, visited = Set.new.compare_by_identity) + # Check for circular references + return "..." if visited.include?(obj) + case obj when Hash - obj.transform_values{|v| normalize(v)} + visited.add(obj) + result = obj.transform_values{|v| normalize(v, visited)} + visited.delete(obj) + result when Array - obj.map{|v| normalize(v)} + visited.add(obj) + result = obj.map{|v| normalize(v, visited)} + visited.delete(obj) + result else if obj.respond_to?(:as_json) && (as_json = obj.as_json) && as_json != obj - normalize(obj.as_json) + visited.add(obj) + result = normalize(as_json, visited) + visited.delete(obj) + result else obj end diff --git a/test/async/container/message_wrapper.rb b/test/async/container/message_wrapper.rb index be48679..c05c878 100644 --- a/test/async/container/message_wrapper.rb +++ b/test/async/container/message_wrapper.rb @@ -41,6 +41,16 @@ def read_message Integer.send(:remove_method, :as_json) end + it "normalizes circular references" do + array = [] + array << array + + write_message({data: array}) + + result = read_message + expect(result[:data]).to be == ["..."] + end + it "handles simple strings" do write_message({message: "hello world"}) From 248f83f40e8d334174a0731445a81478b0fa4323 Mon Sep 17 00:00:00 2001 From: Michael Go Date: Thu, 20 Nov 2025 12:13:17 -0400 Subject: [PATCH 7/7] remove comment --- lib/async/container/supervisor/connection.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/async/container/supervisor/connection.rb b/lib/async/container/supervisor/connection.rb index 789029b..90ec267 100644 --- a/lib/async/container/supervisor/connection.rb +++ b/lib/async/container/supervisor/connection.rb @@ -244,9 +244,6 @@ def next_id # Write a message to the connection stream. # - # Uses a length-prefixed protocol: 2-byte length header (big-endian) followed by data. - # This allows MessagePack data to contain newlines without breaking message boundaries. - # # @parameter message [Hash] The message to write. def write(**message) @message_wrapper.write(message)