diff --git a/bake/async/container/supervisor.rb b/bake/async/container/supervisor.rb index 3fa462d..17e59ff 100644 --- a/bake/async/container/supervisor.rb +++ b/bake/async/container/supervisor.rb @@ -44,7 +44,7 @@ def memory_sample(duration: 10, connection_id:) operation = {do: :memory_sample, duration: duration} # Use the forward operation to proxy the request to a worker: - return connection.call(do: :forward, operation: operation, connection_id: connection_id) + connection.call(do: :forward, operation: operation, connection_id: connection_id) end end diff --git a/examples/simple/simple.rb b/examples/simple/simple.rb index 5c8f385..cdee009 100755 --- a/examples/simple/simple.rb +++ b/examples/simple/simple.rb @@ -33,7 +33,7 @@ def setup(container) Console.info(self, "Exiting...") end end - end + end end service "sleep" do diff --git a/lib/async/container/supervisor/connection.rb b/lib/async/container/supervisor/connection.rb index 49e9e28..90ec267 100644 --- a/lib/async/container/supervisor/connection.rb +++ b/lib/async/container/supervisor/connection.rb @@ -3,8 +3,8 @@ # Released under the MIT License. # Copyright, 2025, by Samuel Williams. -require "json" require "async" +require_relative "message_wrapper" module Async module Container @@ -37,13 +37,6 @@ def as_json(...) @message end - # Convert the call to a JSON string. - # - # @returns [String] The JSON representation. - def to_json(...) - as_json.to_json(...) - end - # @attribute [Connection] The connection that initiated the call. attr :connection @@ -231,7 +224,7 @@ def initialize(stream, id = 0, **state) @stream = stream @id = id @state = state - + @message_wrapper = MessageWrapper.new(@stream) @reader = nil @calls = {} end @@ -253,17 +246,16 @@ def next_id # # @parameter message [Hash] The message to write. def write(**message) - @stream.write(JSON.dump(message) << "\n") - @stream.flush + @message_wrapper.write(message) end # Read a message from the connection stream. # # @returns [Hash, nil] The parsed message or nil if stream is closed. def read - if line = @stream&.gets - JSON.parse(line, symbolize_names: true) - end + @message_wrapper.read + rescue EOFError, IOError + nil end # Iterate over all messages from the connection. diff --git a/lib/async/container/supervisor/message_wrapper.rb b/lib/async/container/supervisor/message_wrapper.rb new file mode 100644 index 0000000..3d30234 --- /dev/null +++ b/lib/async/container/supervisor/message_wrapper.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "msgpack" +require "set" + +module Async + module Container + module Supervisor + class MessageWrapper + def initialize(stream) + @factory = MessagePack::Factory.new + + register_types + + @packer = @factory.packer(stream) + @unpacker = @factory.unpacker(stream) + end + + def write(message) + data = pack(message) + @packer.write(data) + end + + def read + @unpacker.read + end + + def pack(message) + @packer.clear + normalized_message = normalize(message, Set.new) + @packer.pack(normalized_message) + @packer.full_pack + end + + def unpack(data) + @factory.unpack(data) + end + + private + + def normalize(obj, visited = Set.new.compare_by_identity) + # Check for circular references + return "..." if visited.include?(obj) + + case obj + when Hash + visited.add(obj) + result = obj.transform_values{|v| normalize(v, visited)} + visited.delete(obj) + result + when Array + visited.add(obj) + result = obj.map{|v| normalize(v, visited)} + visited.delete(obj) + result + else + if obj.respond_to?(:as_json) && (as_json = obj.as_json) && as_json != obj + visited.add(obj) + result = normalize(as_json, visited) + visited.delete(obj) + result + else + obj + end + end + end + + def register_types + @factory.register_type(0x00, Symbol) + + @factory.register_type( + 0x01, + Exception, + packer: self.method(:pack_exception), + unpacker: self.method(:unpack_exception), + recursive: true, + ) + + @factory.register_type( + 0x02, + Class, + packer: ->(klass) {klass.name}, + unpacker: ->(name) {name}, + ) + + @factory.register_type( + MessagePack::Timestamp::TYPE, + Time, + packer: MessagePack::Time::Packer, + unpacker: MessagePack::Time::Unpacker + ) + end + + def pack_exception(exception, packer) + message = [exception.class.name, exception.message, exception.backtrace] + packer.write(message) + end + + def unpack_exception(unpacker) + klass, message, backtrace = unpacker.read + klass = Object.const_get(klass) + + exception = klass.new(message) + exception.set_backtrace(backtrace) + + return exception + end + end + end + end +end diff --git a/test/async/container/connection.rb b/test/async/container/connection.rb index 673b754..1972cce 100644 --- a/test/async/container/connection.rb +++ b/test/async/container/connection.rb @@ -20,11 +20,16 @@ def dispatch(call) describe Async::Container::Supervisor::Connection do let(:stream) {StringIO.new} let(:connection) {Async::Container::Supervisor::Connection.new(stream)} + let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new(stream)} + + def write_message(message) + message_wrapper.write(message) + stream.rewind + end with "dispatch" do it "handles failed writes when dispatching a call" do - stream.write(JSON.dump({id: 1, do: :test}) << "\n") - stream.rewind + write_message({id: 1, do: :test}) expect(stream).to receive(:write).and_raise(IOError, "Write error") @@ -42,8 +47,7 @@ def dispatch(call) end it "closes the queue when the connection fails" do - stream.write(JSON.dump({id: 1, do: :test}) << "\n") - stream.rewind + write_message({id: 1, do: :test}) expect(stream).to receive(:write).and_raise(IOError, "Write error") @@ -83,21 +87,11 @@ def dispatch(call) with subject::Call do let(:test_call) {Async::Container::Supervisor::Connection::Call.new(connection, 1, {do: :test, data: "value"})} - it "can serialize call to JSON" do - json = test_call.to_json - parsed = JSON.parse(json) - - expect(parsed).to have_keys( - "do" => be == "test", - "data" => be == "value" - ) - end - it "can get call message via as_json" do expect(test_call.as_json).to have_keys( - do: be == :test, - data: be == "value" - ) + do: be == :test, + data: be == "value" + ) end it "can iterate over call responses with each" do @@ -121,11 +115,11 @@ def dispatch(call) response = test_call.pop expect(response).to have_keys( - id: be == 1, - finished: be == true, - failed: be == true, - error: be == "Something went wrong" - ) + id: be == 1, + finished: be == true, + failed: be == true, + error: be == "Something went wrong" + ) expect(test_call.closed?).to be == true end @@ -144,35 +138,6 @@ def dispatch(call) end end - it "writes JSON with newline" do - connection.write(id: 1, do: :test) - - stream.rewind - output = stream.read - - # Check it's valid JSON with a newline - expect(output[-1]).to be == "\n" - - parsed = JSON.parse(output.chomp) - expect(parsed).to have_keys( - "id" => be == 1, - "do" => be == "test" - ) - end - - it "parses JSON lines" do - stream.string = JSON.dump({id: 1, do: "test"}) << "\n" - stream.rewind - - message = connection.read - - # Connection.read uses symbolize_names: true (keys are symbols, values are as-is) - expect(message).to have_keys( - id: be == 1, - do: be == "test" - ) - end - it "returns nil when stream is closed" do stream.string = "" stream.rewind diff --git a/test/async/container/message_wrapper.rb b/test/async/container/message_wrapper.rb new file mode 100644 index 0000000..c05c878 --- /dev/null +++ b/test/async/container/message_wrapper.rb @@ -0,0 +1,321 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "async/container/supervisor/connection" +require "sus/fixtures/async/scheduler_context" +require "stringio" +require "msgpack" + +class TrueObject + def as_json + true + end +end + +describe Async::Container::Supervisor::MessageWrapper do + let(:stream) {StringIO.new} + let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new(stream)} + + def write_message(message) + message_wrapper.write(message) + stream.rewind + end + + def read_message + message_wrapper.read + end + + with "write and read" do + it "normalizes without infinite loop" do + Integer.define_method(:as_json) do + self + end + + write_message({id: 1, do: :test}) + + message = read_message + expect(message[:id]).to be == 1 + ensure + Integer.send(:remove_method, :as_json) + end + + it "normalizes circular references" do + array = [] + array << array + + write_message({data: array}) + + result = read_message + expect(result[:data]).to be == ["..."] + end + + it "handles simple strings" do + write_message({message: "hello world"}) + + result = read_message + expect(result[:message]).to be == "hello world" + end + + it "handles integers" do + write_message({count: 42, negative: -10}) + + result = read_message + expect(result[:count]).to be == 42 + expect(result[:negative]).to be == -10 + end + + it "handles floats" do + write_message({pi: 3.14159, negative: -2.5}) + + result = read_message + expect(result[:pi]).to be == 3.14159 + expect(result[:negative]).to be == -2.5 + end + + it "handles boolean values" do + write_message({success: true, failed: false}) + + result = read_message + expect(result[:success]).to be == true + expect(result[:failed]).to be == false + end + + it "handles nil values" do + write_message({empty: nil}) + + result = read_message + expect(result[:empty]).to be_nil + end + + it "handles arrays" do + write_message({items: [1, 2, 3, "four", true]}) + + result = read_message + expect(result[:items]).to be == [1, 2, 3, "four", true] + end + + it "handles nested hashes" do + write_message({ + user: { + name: "Alice", + details: { + age: 30, + active: true + } + } + }) + + result = read_message + expect(result[:user][:name]).to be == "Alice" + expect(result[:user][:details][:age]).to be == 30 + expect(result[:user][:details][:active]).to be == true + end + + it "handles nested arrays" do + write_message({matrix: [[1, 2], [3, 4], [5, 6]]}) + + result = read_message + expect(result[:matrix]).to be == [[1, 2], [3, 4], [5, 6]] + end + + it "handles symbols" do + write_message({action: :start, status: :success}) + + result = read_message + expect(result[:action]).to be == :start + expect(result[:status]).to be == :success + end + + it "handles empty hash" do + write_message({}) + + result = read_message + expect(result).to be == {} + end + + it "handles empty array" do + write_message({items: []}) + + result = read_message + expect(result[:items]).to be == [] + end + end + + with "Time handling" do + it "serializes and deserializes Time objects" do + time = Time.now + write_message({timestamp: time}) + + result = read_message + expect(result[:timestamp]).to be_within(0.001).of(time) + end + + it "handles Time in nested structures" do + time = Time.now + write_message({ + event: { + name: "test", + occurred_at: time + } + }) + + result = read_message + expect(result[:event][:occurred_at]).to be_within(0.001).of(time) + end + end + + with "Class handling" do + it "serializes class names" do + write_message({type: String}) + + result = read_message + expect(result[:type]).to be == "String" + end + + it "handles multiple classes" do + write_message({types: [String, Integer, Array]}) + + result = read_message + expect(result[:types]).to be == ["String", "Integer", "Array"] + end + end + + with "Exception handling" do + it "serializes and deserializes RuntimeError" do + error = RuntimeError.new("Something went wrong") + error.set_backtrace(["line1", "line2"]) + + write_message({error: error}) + + result = read_message + expect(result[:error]).to be_a(RuntimeError) + expect(result[:error].message).to be == "Something went wrong" + expect(result[:error].backtrace).to be == ["line1", "line2"] + end + + it "handles StandardError" do + error = StandardError.new("Standard error") + write_message({error: error}) + + result = read_message + expect(result[:error]).to be_a(StandardError) + expect(result[:error].message).to be == "Standard error" + end + + it "handles ArgumentError" do + error = ArgumentError.new("Invalid argument") + write_message({error: error}) + + result = read_message + expect(result[:error]).to be_a(ArgumentError) + expect(result[:error].message).to be == "Invalid argument" + end + end + + with "normalize method" do + it "normalizes objects with as_json method" do + obj = TrueObject.new + write_message({custom: obj}) + + result = read_message + expect(result[:custom]).to be == true + end + + it "normalizes arrays of objects with as_json" do + write_message({items: [TrueObject.new, TrueObject.new]}) + + result = read_message + expect(result[:items]).to be == [true, true] + end + + it "normalizes nested objects with as_json" do + write_message({ + data: { + flag: TrueObject.new, + nested: { + another: TrueObject.new + } + } + }) + + result = read_message + expect(result[:data][:flag]).to be == true + expect(result[:data][:nested][:another]).to be == true + end + end + + with "complex messages" do + it "handles complex nested structures" do + write_message({ + id: 123, + action: :process, + data: { + items: [1, 2, 3], + metadata: { + timestamp: Time.now, + type: String + } + }, + flags: { + active: true, + debug: false + } + }) + + result = read_message + expect(result[:id]).to be == 123 + expect(result[:action]).to be == :process + expect(result[:data][:items]).to be == [1, 2, 3] + expect(result[:flags][:active]).to be == true + end + + it "handles large arrays" do + large_array = (1..1000).to_a + write_message({data: large_array}) + + result = read_message + expect(result[:data]).to be == large_array + end + + it "handles deeply nested structures" do + deep = {level1: {level2: {level3: {level4: {level5: "deep"}}}}} + write_message(deep) + + result = read_message + expect(result[:level1][:level2][:level3][:level4][:level5]).to be == "deep" + end + end + + with "edge cases" do + it "handles empty string" do + write_message({text: ""}) + + result = read_message + expect(result[:text]).to be == "" + end + + it "handles zero values" do + write_message({count: 0, amount: 0.0}) + + result = read_message + expect(result[:count]).to be == 0 + expect(result[:amount]).to be == 0.0 + end + + it "handles unicode strings" do + write_message({text: "Hello δΈ–η•Œ 🌍"}) + + result = read_message + expect(result[:text]).to be == "Hello δΈ–η•Œ 🌍" + end + + it "handles special characters" do + write_message({text: "Line1\nLine2\tTabbed"}) + + result = read_message + expect(result[:text]).to be == "Line1\nLine2\tTabbed" + end + end +end diff --git a/test/async/container/server.rb b/test/async/container/server.rb index 905a13e..2027ec8 100644 --- a/test/async/container/server.rb +++ b/test/async/container/server.rb @@ -10,11 +10,19 @@ include Async::Container::Supervisor::AServer include Sus::Fixtures::Console::CapturedLogger + def write_message(stream, message) + Async::Container::Supervisor::MessageWrapper.new(stream).write(message) + end + + def read_message(stream) + Async::Container::Supervisor::MessageWrapper.new(stream).read + end + it "can handle unexpected failures" do # First, send invalid JSON to trigger the error: endpoint.connect do |stream| - # Send malformed JSON that will cause parsing errors: - stream.write("not valid json\n") + # Send malformed data (just 2 bytes claiming huge size, but no actual data): + stream.write([999999].pack("N")) stream.flush end @@ -23,11 +31,10 @@ # Send a valid register message: message = {id: 1, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Read the response: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should respond with a finished message: expect(response).to have_keys( @@ -69,11 +76,10 @@ def status(call) stream = endpoint.connect message = {id: 1, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Read the response: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should still finish despite the monitor error: expect(response).to have_keys( @@ -93,11 +99,10 @@ def status(call) stream = endpoint.connect message = {id: 1, do: :status} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Read the response: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should still respond with a finished message despite the monitor error: expect(response).to have_keys( @@ -128,10 +133,9 @@ def status(call) # Verify server is still working by sending a new request: stream = endpoint.connect message = {id: 1, do: :status} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) expect(response).to have_keys( id: be == 1, finished: be == true @@ -152,8 +156,7 @@ def status(call) # Simulate what happens when a timed-out response arrives: # The response only has id and finished (no 'do' key) because it's a response, not a request message = {id: 1, finished: true} - stream.puts(JSON.dump(message)) - stream.flush + write_message(stream, message) # Wait for the message to be processed reactor.sleep(0.01) @@ -164,11 +167,10 @@ def status(call) # Send a valid message to confirm the server is still working: valid_message = {id: 3, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(valid_message)) - stream.flush + write_message(stream, valid_message) # Read the response to the valid message: - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) # The server should have ignored the stale response and processed the valid one: expect(response).to have_keys( @@ -186,17 +188,15 @@ def status(call) # Send a stale response: stale_message = {id: 5, finished: true} - stream.puts(JSON.dump(stale_message)) - stream.flush + write_message(stream, stale_message) # Send a valid message: valid_message = {id: 7, do: :register, state: {process_id: ::Process.pid}} - stream.puts(JSON.dump(valid_message)) - stream.flush + write_message(stream, valid_message) # We should only get ONE response - for the valid message. # Not an error response for the stale message. - response = JSON.parse(stream.gets, symbolize_names: true) + response = read_message(stream) expect(response).to have_keys( id: be == 7,