diff --git a/lib/schematic/datastream/client.rb b/lib/schematic/datastream/client.rb index 8ebc1eb..390a7e0 100644 --- a/lib/schematic/datastream/client.rb +++ b/lib/schematic/datastream/client.rb @@ -295,6 +295,7 @@ def handle_ready def handle_message(message) entity_type = message[:entity_type] || message["entity_type"] message_type = message[:message_type] || message["message_type"] + entity_id = message[:entity_id] || message["entity_id"] data = message[:data] || message["data"] @logger.debug("Processing DataStream message: EntityType=#{entity_type}, MessageType=#{message_type}") @@ -305,9 +306,9 @@ def handle_message(message) when ENTITY_TYPE_FLAG handle_flag_message(data, message_type) when ENTITY_TYPE_COMPANY, ENTITY_TYPE_COMPANIES - handle_company_message(data, message_type) + handle_company_message(data, message_type, entity_id) when ENTITY_TYPE_USER, ENTITY_TYPE_USERS - handle_user_message(data, message_type) + handle_user_message(data, message_type, entity_id) else if message[:error] || message["error"] handle_error_message(message) @@ -357,25 +358,28 @@ def handle_flag_message(data, message_type) end end - def handle_company_message(data, message_type) - id = data[:id] || data["id"] - + def handle_company_message(data, message_type, entity_id = nil) case message_type when MESSAGE_TYPE_DELETE - @company_cache.delete_entity(data) if id + delete_id = data[:id] || data["id"] if data.is_a?(Hash) + @company_cache.delete_entity(data) if delete_id when MESSAGE_TYPE_PARTIAL - if id - existing = @company_cache.get_by_id(id) + # Cache lookup uses envelope entity_id; data is the wrapped partial + # payload (e.g. {"credit_balances": {...}}), with no top-level id. + if entity_id + existing = @company_cache.get_by_id(entity_id) if existing merged = Merge.partial_company(existing, data) @company_cache.cache_entity(merged, ttl: @cache_ttl) else - @logger.warn("Cache miss for partial company '#{id}', caching as new") - @company_cache.cache_entity(data, ttl: @cache_ttl) + @logger.warn("Cache miss for partial company '#{entity_id}', skipping") end + else + @logger.warn("Partial company message missing entity_id") end when MESSAGE_TYPE_FULL - @company_cache.cache_entity(data, ttl: @cache_ttl) if id + full_id = data[:id] || data["id"] if data.is_a?(Hash) + @company_cache.cache_entity(data, ttl: @cache_ttl) if full_id end notify_pending(:company, data) @@ -383,25 +387,28 @@ def handle_company_message(data, message_type) @logger.error("Failed to process company message: #{e.message}") end - def handle_user_message(data, message_type) - id = data[:id] || data["id"] - + def handle_user_message(data, message_type, entity_id = nil) case message_type when MESSAGE_TYPE_DELETE - @user_cache.delete_entity(data) if id + delete_id = data[:id] || data["id"] if data.is_a?(Hash) + @user_cache.delete_entity(data) if delete_id when MESSAGE_TYPE_PARTIAL - if id - existing = @user_cache.get_by_id(id) + # Cache lookup uses envelope entity_id; data is the wrapped partial + # payload with no top-level id. + if entity_id + existing = @user_cache.get_by_id(entity_id) if existing merged = Merge.partial_user(existing, data) @user_cache.cache_entity(merged, ttl: @cache_ttl) else - @logger.warn("Cache miss for partial user '#{id}', caching as new") - @user_cache.cache_entity(data, ttl: @cache_ttl) + @logger.warn("Cache miss for partial user '#{entity_id}', skipping") end + else + @logger.warn("Partial user message missing entity_id") end when MESSAGE_TYPE_FULL - @user_cache.cache_entity(data, ttl: @cache_ttl) if id + full_id = data[:id] || data["id"] if data.is_a?(Hash) + @user_cache.cache_entity(data, ttl: @cache_ttl) if full_id end notify_pending(:user, data) diff --git a/test/custom.test.rb b/test/custom.test.rb index 6ecfd64..c1c2bc9 100644 --- a/test/custom.test.rb +++ b/test/custom.test.rb @@ -2416,11 +2416,13 @@ def build_ds_client(flag_cache: nil, company_cache: nil, user_cache: nil, rules_ data: existing }) - # Send a partial update + # Send a partial update. The cache lookup uses entity_id from the envelope; + # data is the wrapped partial payload with no top-level id. ds.send(:handle_message, { entity_type: Schematic::DataStream::ENTITY_TYPE_COMPANY, + entity_id: "comp_1", message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL, - data: { id: "comp_1", traits: { "seats" => "25" } } + data: { traits: { "seats" => "25" } } }) cached = ds.instance_variable_get(:@company_cache).get_by_id("comp_1") @@ -2445,10 +2447,13 @@ def build_ds_client(flag_cache: nil, company_cache: nil, user_cache: nil, rules_ data: existing }) + # Send a partial update. Cache lookup uses entity_id from the envelope; + # data is the wrapped partial payload with no top-level id. ds.send(:handle_message, { entity_type: Schematic::DataStream::ENTITY_TYPE_USER, + entity_id: "user_1", message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL, - data: { id: "user_1", keys: { "user_id" => "u123" } } + data: { keys: { "user_id" => "u123" } } }) cached = ds.instance_variable_get(:@user_cache).get_by_id("user_1") @@ -2460,36 +2465,39 @@ def build_ds_client(flag_cache: nil, company_cache: nil, user_cache: nil, rules_ ds.close end - it "partial company for non-existing entity caches as new" do + it "partial company for non-existing entity is skipped" do ctx = build_ds_client ds = ctx[:client] + # No FULL message for new_comp; partial should be dropped on cache miss. ds.send(:handle_message, { entity_type: Schematic::DataStream::ENTITY_TYPE_COMPANY, + entity_id: "new_comp", message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL, - data: { id: "new_comp", keys: { "org_id" => "xyz" }, traits: { "tier" => "free" } } + data: { traits: { "tier" => "free" } } }) cached = ds.instance_variable_get(:@company_cache).get_by_id("new_comp") - refute_nil cached, "partial message for non-existing entity should create a new cache entry" + assert_nil cached, "partial message for non-existing entity should be dropped" ds.close end - it "partial user for non-existing entity caches as new" do + it "partial user for non-existing entity is skipped" do ctx = build_ds_client ds = ctx[:client] ds.send(:handle_message, { entity_type: Schematic::DataStream::ENTITY_TYPE_USER, + entity_id: "new_user", message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL, - data: { id: "new_user", keys: { "email" => "new@test.com" } } + data: { keys: { "email" => "new@test.com" } } }) cached = ds.instance_variable_get(:@user_cache).get_by_id("new_user") - refute_nil cached + assert_nil cached, "partial message for non-existing entity should be dropped" ds.close end