Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions lib/schematic/datastream/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def handle_ready
def handle_message(message)
entity_type = message[:entity_type] || message["entity_type"]
message_type = message[:message_type] || message["message_type"]
entity_id = message[:entity_id] || message["entity_id"]
data = message[:data] || message["data"]

@logger.debug("Processing DataStream message: EntityType=#{entity_type}, MessageType=#{message_type}")
Expand All @@ -305,9 +306,9 @@ def handle_message(message)
when ENTITY_TYPE_FLAG
handle_flag_message(data, message_type)
when ENTITY_TYPE_COMPANY, ENTITY_TYPE_COMPANIES
handle_company_message(data, message_type)
handle_company_message(data, message_type, entity_id)
when ENTITY_TYPE_USER, ENTITY_TYPE_USERS
handle_user_message(data, message_type)
handle_user_message(data, message_type, entity_id)
else
if message[:error] || message["error"]
handle_error_message(message)
Expand Down Expand Up @@ -357,51 +358,57 @@ def handle_flag_message(data, message_type)
end
end

def handle_company_message(data, message_type)
id = data[:id] || data["id"]

def handle_company_message(data, message_type, entity_id = nil)
case message_type
when MESSAGE_TYPE_DELETE
@company_cache.delete_entity(data) if id
delete_id = data[:id] || data["id"] if data.is_a?(Hash)
@company_cache.delete_entity(data) if delete_id
when MESSAGE_TYPE_PARTIAL
if id
existing = @company_cache.get_by_id(id)
# Cache lookup uses envelope entity_id; data is the wrapped partial
# payload (e.g. {"credit_balances": {...}}), with no top-level id.
if entity_id
existing = @company_cache.get_by_id(entity_id)
if existing
merged = Merge.partial_company(existing, data)
@company_cache.cache_entity(merged, ttl: @cache_ttl)
else
@logger.warn("Cache miss for partial company '#{id}', caching as new")
@company_cache.cache_entity(data, ttl: @cache_ttl)
@logger.warn("Cache miss for partial company '#{entity_id}', skipping")
end
else
@logger.warn("Partial company message missing entity_id")
end
when MESSAGE_TYPE_FULL
@company_cache.cache_entity(data, ttl: @cache_ttl) if id
full_id = data[:id] || data["id"] if data.is_a?(Hash)
@company_cache.cache_entity(data, ttl: @cache_ttl) if full_id
end

notify_pending(:company, data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have notify_pending accept the entity_id directly, since with a partial message there may not be guaranteed to be an ID or keys in the message - eg a partial update that contains only a metric update

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there shouldn't be any pending requests for partial messages, this really only applies to full messages because when we make requests to the websocket we're expecting full messages back.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that makes sense

rescue StandardError => e
@logger.error("Failed to process company message: #{e.message}")
end

def handle_user_message(data, message_type)
id = data[:id] || data["id"]

def handle_user_message(data, message_type, entity_id = nil)
case message_type
when MESSAGE_TYPE_DELETE
@user_cache.delete_entity(data) if id
delete_id = data[:id] || data["id"] if data.is_a?(Hash)
@user_cache.delete_entity(data) if delete_id
when MESSAGE_TYPE_PARTIAL
if id
existing = @user_cache.get_by_id(id)
# Cache lookup uses envelope entity_id; data is the wrapped partial
# payload with no top-level id.
if entity_id
existing = @user_cache.get_by_id(entity_id)
if existing
merged = Merge.partial_user(existing, data)
@user_cache.cache_entity(merged, ttl: @cache_ttl)
else
@logger.warn("Cache miss for partial user '#{id}', caching as new")
@user_cache.cache_entity(data, ttl: @cache_ttl)
@logger.warn("Cache miss for partial user '#{entity_id}', skipping")
end
else
@logger.warn("Partial user message missing entity_id")
end
when MESSAGE_TYPE_FULL
@user_cache.cache_entity(data, ttl: @cache_ttl) if id
full_id = data[:id] || data["id"] if data.is_a?(Hash)
@user_cache.cache_entity(data, ttl: @cache_ttl) if full_id
end

notify_pending(:user, data)
Expand Down
26 changes: 17 additions & 9 deletions test/custom.test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2416,11 +2416,13 @@ def build_ds_client(flag_cache: nil, company_cache: nil, user_cache: nil, rules_
data: existing
})

# Send a partial update
# Send a partial update. The cache lookup uses entity_id from the envelope;
# data is the wrapped partial payload with no top-level id.
ds.send(:handle_message, {
entity_type: Schematic::DataStream::ENTITY_TYPE_COMPANY,
entity_id: "comp_1",
message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL,
data: { id: "comp_1", traits: { "seats" => "25" } }
data: { traits: { "seats" => "25" } }
})

cached = ds.instance_variable_get(:@company_cache).get_by_id("comp_1")
Expand All @@ -2445,10 +2447,13 @@ def build_ds_client(flag_cache: nil, company_cache: nil, user_cache: nil, rules_
data: existing
})

# Send a partial update. Cache lookup uses entity_id from the envelope;
# data is the wrapped partial payload with no top-level id.
ds.send(:handle_message, {
entity_type: Schematic::DataStream::ENTITY_TYPE_USER,
entity_id: "user_1",
message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL,
data: { id: "user_1", keys: { "user_id" => "u123" } }
data: { keys: { "user_id" => "u123" } }
})

cached = ds.instance_variable_get(:@user_cache).get_by_id("user_1")
Expand All @@ -2460,36 +2465,39 @@ def build_ds_client(flag_cache: nil, company_cache: nil, user_cache: nil, rules_
ds.close
end

it "partial company for non-existing entity caches as new" do
it "partial company for non-existing entity is skipped" do
ctx = build_ds_client
ds = ctx[:client]

# No FULL message for new_comp; partial should be dropped on cache miss.
ds.send(:handle_message, {
entity_type: Schematic::DataStream::ENTITY_TYPE_COMPANY,
entity_id: "new_comp",
message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL,
data: { id: "new_comp", keys: { "org_id" => "xyz" }, traits: { "tier" => "free" } }
data: { traits: { "tier" => "free" } }
})

cached = ds.instance_variable_get(:@company_cache).get_by_id("new_comp")

refute_nil cached, "partial message for non-existing entity should create a new cache entry"
assert_nil cached, "partial message for non-existing entity should be dropped"

ds.close
end

it "partial user for non-existing entity caches as new" do
it "partial user for non-existing entity is skipped" do
ctx = build_ds_client
ds = ctx[:client]

ds.send(:handle_message, {
entity_type: Schematic::DataStream::ENTITY_TYPE_USER,
entity_id: "new_user",
message_type: Schematic::DataStream::MESSAGE_TYPE_PARTIAL,
data: { id: "new_user", keys: { "email" => "new@test.com" } }
data: { keys: { "email" => "new@test.com" } }
})

cached = ds.instance_variable_get(:@user_cache).get_by_id("new_user")

refute_nil cached
assert_nil cached, "partial message for non-existing entity should be dropped"

ds.close
end
Expand Down