Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# frozen_string_literal: true

require "elastic_graph/admin/cluster_configurator/action_reporter"
require "elastic_graph/admin/index_definition_configurator/mapping_update"
require "elastic_graph/datastore_core/index_config_normalizer"
require "elastic_graph/indexer/hash_differ"
require "elastic_graph/support/hash_util"
Expand All @@ -32,9 +33,8 @@ def initialize(datastore_client, index, env_agnostic_index_config, output)
# and the state of the index in the datastore, does one of the following:
#
# - If the index did not already exist: creates the index with the desired mappings and settings.
# - If the desired mapping has fewer fields than what is in the index: raises an exception,
# because the datastore provides no way to remove fields from a mapping and it would be confusing
# for this method to silently ignore the issue.
# - If the desired mapping has fewer fields than what is in the index: leaves the existing fields
# alone, because the datastore provides no way to remove fields from a mapping.
# - If the settings have desired changes: updates the settings, restoring any setting that
# no longer has a desired value to its default.
# - If the mapping has desired changes: updates the mappings.
Expand Down Expand Up @@ -75,14 +75,9 @@ def create_new_index
end

def update_mapping
@datastore_client.put_index_mapping(index: @index.name, body: desired_mapping)
@datastore_client.put_index_mapping(index: @index.name, body: desired_mapping_for_update)
action_description = "Updated mappings for index `#{@index.name}`:\n#{mapping_diff}"

if mapping_removals.any?
action_description += "\n\nNote: the extra fields listed here will not actually get removed. " \
"Mapping removals are unsupported (but ElasticGraph will leave them alone and they'll cause no problems)."
end

report_action action_description
end

Expand All @@ -100,10 +95,6 @@ def index_exists?
!current_config.empty?
end

def mapping_removals
@mapping_removals ||= mapping_fields_from(current_mapping) - mapping_fields_from(desired_mapping)
end

def mapping_type_changes
@mapping_type_changes ||= begin
flattened_current = Support::HashUtil.flatten_and_stringify_keys(current_mapping)
Expand All @@ -116,7 +107,7 @@ def mapping_type_changes
end

def has_mapping_updates?
current_mapping != desired_mapping
current_mapping != desired_mapping_for_update
end

def settings_updates
Expand All @@ -127,15 +118,8 @@ def settings_updates
end
end

def mapping_fields_from(mapping_hash, prefix = "")
(mapping_hash["properties"] || []).flat_map do |key, params|
field = prefix + key
if params.key?("properties")
[field] + mapping_fields_from(params, "#{field}.")
else
[field]
end
end
def desired_mapping_for_update
@desired_mapping_for_update ||= MappingUpdate.merge_existing_fields_into(desired_mapping, current_mapping)
end

def desired_mapping
Expand Down Expand Up @@ -178,7 +162,7 @@ def current_config
end

def mapping_diff
@mapping_diff ||= Indexer::HashDiffer.diff(current_mapping, desired_mapping) || "(no diff)"
@mapping_diff ||= Indexer::HashDiffer.diff(current_mapping, desired_mapping_for_update) || "(no diff)"
end

def settings_diff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

require "elastic_graph/admin/cluster_configurator/action_reporter"
require "elastic_graph/admin/index_definition_configurator/for_index"
require "elastic_graph/admin/index_definition_configurator/mapping_update"
require "elastic_graph/datastore_core/index_config_normalizer"
require "elastic_graph/indexer/hash_differ"
require "elastic_graph/support/hash_util"
Expand Down Expand Up @@ -36,9 +37,8 @@ def initialize(datastore_client, index_template, env_agnostic_index_config_paren
# and the state of the index in the datastore, does one of the following:
#
# - If the index did not already exist: creates the index with the desired mappings and settings.
# - If the desired mapping has fewer fields than what is in the index: raises an exception,
# because the datastore provides no way to remove fields from a mapping and it would be confusing
# for this method to silently ignore the issue.
# - If the desired mapping has fewer fields than what is in the index: leaves the existing fields
# alone, because the datastore provides no way to remove fields from a mapping.
# - If the settings have desired changes: updates the settings, restoring any setting that
# no longer has a desired value to its default.
# - If the mapping has desired changes: updates the mappings.
Expand Down Expand Up @@ -67,19 +67,13 @@ def validate
private

def put_index_template
desired_template_config_payload = Support::HashUtil.deep_merge(
desired_config_parent,
{"template" => {"mappings" => merge_properties(desired_mapping, current_mapping)}}
)

action_description = "Updated index template: `#{@index_template.name}`:\n#{config_diff}"

if mapping_removals.any?
action_description += "\n\nNote: the extra fields listed here will not actually get removed. " \
"Mapping removals are unsupported (but ElasticGraph will leave them alone and they'll cause no problems)."
action_description = if index_template_exists?
"Updated index template: `#{@index_template.name}`:\n#{config_diff}"
else
"Created index template: `#{@index_template.name}`"
end

@datastore_client.put_index_template(name: @index_template.name, body: desired_template_config_payload)
@datastore_client.put_index_template(name: @index_template.name, body: desired_config_parent_for_update)
report_action action_description
end

Expand All @@ -92,10 +86,6 @@ def index_template_exists?
!current_config_parent.empty?
end

def mapping_removals
@mapping_removals ||= mapping_fields_from(current_mapping) - mapping_fields_from(desired_mapping)
end

def mapping_type_changes
@mapping_type_changes ||= begin
flattened_current = Support::HashUtil.flatten_and_stringify_keys(current_mapping)
Expand All @@ -108,7 +98,7 @@ def mapping_type_changes
end

def has_mapping_updates?
current_mapping != desired_mapping
current_mapping != desired_mapping_for_update
end

def settings_updates
Expand All @@ -119,15 +109,8 @@ def settings_updates
end
end

def mapping_fields_from(mapping_hash, prefix = "")
(mapping_hash["properties"] || []).flat_map do |key, params|
field = prefix + key
if params.key?("properties")
[field] + mapping_fields_from(params, "#{field}.")
else
[field]
end
end
def desired_mapping_for_update
@desired_mapping_for_update ||= MappingUpdate.merge_existing_fields_into(desired_mapping, current_mapping)
end

def desired_mapping
Expand Down Expand Up @@ -158,6 +141,13 @@ def desired_config_parent
end
end

def desired_config_parent_for_update
@desired_config_parent_for_update ||= Support::HashUtil.deep_merge(
desired_config_parent,
{"template" => {"mappings" => desired_mapping_for_update}}
)
end

def current_mapping
current_config_parent.dig("template", "mappings") || {}
end
Expand All @@ -178,43 +168,13 @@ def current_config_parent
end

def config_diff
@config_diff ||= Indexer::HashDiffer.diff(current_config_parent, desired_config_parent) || "(no diff)"
@config_diff ||= Indexer::HashDiffer.diff(current_config_parent, desired_config_parent_for_update) || "(no diff)"
end

def report_action(message)
@reporter.report_action(message)
end

# Helper method used to merge properties between a _desired_ configuration and a _current_ configuration.
# This is used when we are figuring out how to update an index template. We do not want to delete existing
# fields from a template--while the datastore would allow it, our schema evolution strategy depends upon
# us not dropping old unused fields. The datastore doesn't allow it on indices, anyway (though it does allow
# it on index templates). We've ran into trouble (a near SEV) when allowing the logic here to delete an unused
# field from an index template. The indexer "mapping completeness" check started failing because an old version
# of the code (from back when the field in question was still used) noticed the expected field was missing and
# started failing on every event.
#
# This helps us avoid that problem by retaining any currently existing fields.
#
# Long term, if we want to support fully "garbage collecting" these old fields on templates, we will need
# to have them get dropped in a follow up step. We could have our `update_datastore_config` script notice that
# the deployed prod indexers are at a version that will tolerate the fields being dropped, or support it
# via an opt-in flag or something.
def merge_properties(desired_object, current_object)
desired_properties = desired_object.fetch("properties") { _ = {} }
current_properties = current_object.fetch("properties") { _ = {} }

merged_properties = desired_properties.merge(current_properties) do |key, desired, current|
if current.is_a?(::Hash) && current.key?("properties") && desired.key?("properties")
merge_properties(desired, current)
else
desired
end
end

desired_object.merge("properties" => merged_properties)
end

def related_index_configurators
# Here we fan out and get a configurator for each related index. These are generally concrete
# index that are based on a template, either via being specified in our config YAML, or via
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

module ElasticGraph
class Admin
module IndexDefinitionConfigurator
module MappingUpdate
# Elasticsearch/OpenSearch do not support removing mapping fields from an index. Index templates
# allow it, but doing so can break old indexer versions that still expect those fields to exist.
# Preserve current fields when building update payloads and diffs, while still allowing updates to
# existing field parameters and additions of new fields.
def self.merge_existing_fields_into(desired_object, current_object)
desired_properties = desired_object.fetch("properties") { _ = {} }
current_properties = current_object.fetch("properties") { _ = {} }

merged_properties = desired_properties.merge(current_properties) do |_key, desired, current|
if current.is_a?(::Hash) && current.key?("properties") && desired.key?("properties")
merge_existing_fields_into(desired, current)
else
desired
end
end

desired_object.merge("properties" => merged_properties)
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ module ElasticGraph
def cannot_modify_mapping_field_type_error: () -> ::String
def index_exists?: () -> bool

@mapping_removals: ::Array[::String]?
def mapping_removals: () -> ::Array[::String]

@mapping_type_changes: ::Array[::String]?
def mapping_type_changes: () -> ::Array[::String]

Expand All @@ -39,7 +36,8 @@ module ElasticGraph
@settings_updates: DatastoreCore::indexSettingsHash?
def settings_updates: () -> DatastoreCore::indexSettingsHash

def mapping_fields_from: (DatastoreCore::indexMappingHash, ?::String) -> ::Array[::String]
@desired_mapping_for_update: DatastoreCore::indexMappingHash?
def desired_mapping_for_update: () -> DatastoreCore::indexMappingHash

def desired_mapping: () -> DatastoreCore::indexMappingHash

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ module ElasticGraph
def cannot_modify_mapping_field_type_error: () -> ::String
def index_template_exists?: () -> bool

@mapping_removals: ::Array[::String]?
def mapping_removals: () -> ::Array[::String]

@mapping_type_changes: ::Array[::String]?
def mapping_type_changes: () -> ::Array[::String]

Expand All @@ -40,7 +37,8 @@ module ElasticGraph
@settings_updates: DatastoreCore::indexSettingsHash?
def settings_updates: () -> DatastoreCore::indexSettingsHash

def mapping_fields_from: (DatastoreCore::indexMappingHash, ?::String) -> ::Array[::String]
@desired_mapping_for_update: DatastoreCore::indexMappingHash?
def desired_mapping_for_update: () -> DatastoreCore::indexMappingHash

def desired_mapping: () -> DatastoreCore::indexMappingHash

Expand All @@ -50,6 +48,9 @@ module ElasticGraph
@desired_config_parent: ::Hash[::String, untyped]
def desired_config_parent: () -> ::Hash[::String, untyped]

@desired_config_parent_for_update: ::Hash[::String, untyped]
def desired_config_parent_for_update: () -> ::Hash[::String, untyped]

def current_mapping: () -> DatastoreCore::indexMappingHash

@current_settings: DatastoreCore::indexSettingsHash?
Expand All @@ -62,7 +63,6 @@ module ElasticGraph
def config_diff: () -> ::String

def report_action: (::String) -> void
def merge_properties: (::Hash[::String, untyped], ::Hash[::String, untyped]) -> ::Hash[::String, untyped]

@related_index_configurators: ::Array[ForIndex]?
def related_index_configurators: () -> ::Array[ForIndex]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module ElasticGraph
class Admin
module IndexDefinitionConfigurator
module MappingUpdate
def self.merge_existing_fields_into: (::Hash[::String, untyped], ::Hash[::String, untyped]) -> ::Hash[::String, untyped]
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value
RSpec.shared_examples_for IndexDefinitionConfigurator, :uses_datastore, :builds_indexer do
let(:output_io) { StringIO.new }
let(:clock) { class_double(::Time, now: ::Time.utc(2024, 3, 20, 12, 0, 0)) }
let(:mapping_removal_note_snippet) { "extra fields listed here will not actually get removed" }
let(:index_meta_fields) { ["__sources", "__typename", "__versions"] }

it "idempotently creates an index or index template, avoiding unneeded datastore write calls" do
Expand All @@ -60,7 +59,7 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value
}.to maintain { get_index_definition_configuration(unique_index_name) }
.and make_no_datastore_write_calls("main")

expect(output_io.string).not_to include(mapping_removal_note_snippet)
expect(output_io.string).to exclude("+ mappings", "+ settings")
end

it "allows new top-level fields to be added to an existing index or index template" do
Expand All @@ -77,7 +76,6 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value
# The printed description of what was changed should not mention settings that are not being updated.
# (Requires us to normalize settings properly in the logic for this to be the case).
expect(output_io.string).to include("properties.amount_cents").and exclude("coerce", "ignore_malformed", "number_of_replicas", "number_of_shards")
expect(output_io.string).not_to include(mapping_removal_note_snippet)
end

it "handles both `object` lists and `nested` lists" do
Expand Down Expand Up @@ -153,13 +151,16 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value
end
}
))
output_io.string = +"" # use `+` so it is not a frozen string literal.

expect {
configure_index_definition(schema_def)
}.to change { get_index_definition_configuration(unique_index_name).dig("mappings", "properties", "name") }
.from({"type" => "keyword", "meta" => {"foo" => "1"}})
.to({"type" => "keyword"})
.and make_datastore_calls_to_configure_index_def(unique_index_name, :mappings)

expect(output_io.string).to include("properties.name.meta")
end

it "allows some previously unset settings to be changed on an existing index or index template" do
Expand Down Expand Up @@ -204,6 +205,7 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value

it "is a no-op when we attempt to drop a field because the datastore doesn't support dropping mapping fields" do
configure_index_definition(schema_def)
output_io.string = +"" # use `+` so it is not a frozen string literal.

expect {
# Here we remove the `name` field and the `options.size` field to verify it works for both root and nested fields.
Expand All @@ -215,9 +217,9 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value
props = get_index_definition_configuration(unique_index_name).dig("mappings", "properties")
[props.keys.sort, props.dig("options", "properties").keys.sort]
}.from([[*index_meta_fields, "created_at", "id", "name", "options"], ["color", "size"]])
.and make_datastore_calls_to_configure_index_def(unique_index_name, :mappings)
.and make_no_datastore_write_calls("main")

expect(output_io.string).to include(mapping_removal_note_snippet)
expect(output_io.string).to exclude("Updated mappings", "properties.name", "properties.options.properties.size")
end

it "maintains `_meta.ElasticGraph.sources` as a stateful append-only-set that remembers sources that were once active but we no longer have" do
Expand Down
Loading
Loading