From fd345b45fdd3fa3b94f694284e124512d933b1d8 Mon Sep 17 00:00:00 2001 From: ellisandrews-toast Date: Fri, 22 May 2026 18:39:15 -0400 Subject: [PATCH] Refactor index_data.painless script --- config/schema/artifacts/datastore_config.yaml | 105 ++++++++++-------- config/schema/artifacts/runtime_metadata.yaml | 40 +++---- .../datastore_config.yaml | 105 ++++++++++-------- .../runtime_metadata.yaml | 40 +++---- .../indexer/operation/count_accumulator.rb | 4 +- .../indexer/datastore_indexing_router_spec.rb | 8 +- .../operation/count_accumulator_spec.rb | 8 +- .../indexer/operation/update_spec.rb | 20 ++-- .../runtime_metadata/update_target.rb | 2 +- .../runtime_metadata/update_target_spec.rb | 6 +- .../indexing/derived_indexed_type.rb | 2 +- .../schema_definition/mixins/has_indices.rb | 2 +- .../scripts/update/index_data.painless | 99 ++++++++++------- .../lib/elastic_graph/constants.rb | 2 +- .../warehouse_lambda/warehouse_dumper.rb | 2 +- 15 files changed, 242 insertions(+), 203 deletions(-) diff --git a/config/schema/artifacts/datastore_config.yaml b/config/schema/artifacts/datastore_config.yaml index 7fd1aebc7..5ff90ed68 100644 --- a/config/schema/artifacts/datastore_config.yaml +++ b/config/schema/artifacts/datastore_config.yaml @@ -1852,7 +1852,7 @@ indices: index.number_of_shards: 1 index.max_result_window: 10000 scripts: - update_WidgetCurrency_from_Widget_a63538953d45f84cba67edd655d129a8: + update_WidgetCurrency_from_Widget_e72813bd1a8767343222b77bf53be31c: context: update script: lang: painless @@ -1996,7 +1996,7 @@ scripts: return false; } - Map data = params.data; + Map data = params.topLevelFields; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List scriptErrors = new ArrayList(); if (ctx._source.details == null) { @@ -2156,67 +2156,67 @@ scripts: // No timestamp values matched the params, so return `false`. return false; - update_index_data_1fdfaf1c9261c96019decc89b515bd9a: + update_index_data_db769e42fd258f3884d2c5b721bb3c03: context: update script: lang: painless source: |- - Map source = ctx._source; - String sourceId = params.sourceId; - String relationship = params.relationship; - - // Numbers in JSON appear to be parsed as doubles, but we want the version stored as a long, so we need to cast it here. - long eventVersion = (long) params.version; + // --- Helper Functions --- // + void setup(Map source, String relationship, Map counts) { + if (source.__sources == null) { + source.__sources = []; + } - if (source.__sources == null) { - source.__sources = []; - } + if (source.__versions == null) { + source.__versions = [:]; + } - if (source.__versions == null) { - source.__versions = [:]; - } + if (source.__versions[relationship] == null) { + source.__versions[relationship] = [:]; + } - if (source.__versions[relationship] == null) { - source.__versions[relationship] = [:]; + if (counts != null && source.__counts == null) { + source.__counts = [:]; + } } - Map relationshipVersionsMap = source.__versions.get(relationship); - List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(id -> id != sourceId).collect(Collectors.toList()); + void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) { + Map relationshipVersionsMap = source.__versions.get(relationship); + List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); - if (previousSourceIdsForRelationship.size() > 0) { - String previousIdDescription = previousSourceIdsForRelationship.size() == 1 ? previousSourceIdsForRelationship.get(0) : previousSourceIdsForRelationship.toString(); - throw new IllegalArgumentException( - "Cannot update document " + params.id + " " + - "with data from related " + relationship + " " + sourceId + " " + - "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + - "but mutations of relationships used with `sourced_from` are not supported because " + - "allowing it could break ElasticGraph's out-of-order processing guarantees." - ); - } + if (previousSourceIdsForRelationship.size() > 0) { + throw new IllegalArgumentException( + "Cannot update document " + id + " " + + "with data from related " + relationship + " " + sourceId + " " + + "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + + "but mutations of relationships used with `sourced_from` are not supported because " + + "allowing it could break ElasticGraph's out-of-order processing guarantees." + ); + } - Number maybeDocVersion = source.__versions.get(params.relationship)?.get(params.sourceId); + Number maybeDocVersion = relationshipVersionsMap.get(sourceId); - // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. - long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); + // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. + long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); - if (docVersion >= eventVersion) { - throw new IllegalArgumentException("ElasticGraph update was a no-op: [" + - params.id + "]: version conflict, current version [" + - docVersion + "] is higher or equal to the one provided [" + - eventVersion + "]"); - } else { - source.putAll(params.data); - Map __counts = params.__counts; + if (docVersion >= eventVersion) { + throw new IllegalArgumentException("ElasticGraph update was a no-op: [" + + id + "]: version conflict, current version [" + + docVersion + "] is higher or equal to the one provided [" + + eventVersion + "]"); + } + } - if (__counts != null) { - if (source.__counts == null) { - source.__counts = [:]; - } + void applyTopLevelFields(Map source, Map topLevelFields, Map counts) { + source.putAll(topLevelFields); - source.__counts.putAll(__counts); + if (counts != null) { + source.__counts.putAll(counts); } + } - source.id = params.id; + void recordSource(Map source, String id, String relationship, String sourceId, long eventVersion) { + source.id = id; source.__versions[relationship][sourceId] = eventVersion; // Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list. @@ -2233,3 +2233,16 @@ scripts: source.__sources.add(-sourceBinarySearchResult - 1, relationship); } } + + // --- Main script body --- // + Map source = ctx._source; + String id = params.id; + String relationship = params.relationship; + String sourceId = params.sourceId; + long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles + Map counts = params.__counts; + + setup(source, relationship, counts); + validateSource(source, id, relationship, sourceId, eventVersion); + applyTopLevelFields(source, params.topLevelFields, counts); + recordSource(source, id, relationship, sourceId, eventVersion); diff --git a/config/schema/artifacts/runtime_metadata.yaml b/config/schema/artifacts/runtime_metadata.yaml index 4990c3fe9..71fa14689 100644 --- a/config/schema/artifacts/runtime_metadata.yaml +++ b/config/schema/artifacts/runtime_metadata.yaml @@ -3096,7 +3096,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Address AddressAggregatedValues: graphql_fields_by_name: @@ -3280,7 +3280,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: BrokerWholesaler ColorListFilterInput: graphql_fields_by_name: @@ -3321,7 +3321,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Company Component: graphql_fields_by_name: @@ -3429,7 +3429,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Component ComponentAggregatedValues: graphql_fields_by_name: @@ -3729,7 +3729,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: DirectWholesaler DistributionChannel: graphql_fields_by_name: @@ -3906,7 +3906,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: ElectricalPart ElectricalPartAggregatedValues: graphql_fields_by_name: @@ -4389,7 +4389,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Manufacturer ManufacturerAggregatedValues: graphql_fields_by_name: @@ -4549,7 +4549,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: MechanicalPart MechanicalPartAggregatedValues: graphql_fields_by_name: @@ -5510,7 +5510,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: OnlineStore PageInfo: graphql_fields_by_name: @@ -5707,7 +5707,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Person PersonAggregatedValues: graphql_fields_by_name: @@ -5772,7 +5772,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: PhysicalStore PhysicalStoreAggregatedValues: graphql_fields_by_name: @@ -6307,7 +6307,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Sponsor SponsorAggregatedValues: graphql_fields_by_name: @@ -6689,7 +6689,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: formed_on routing_value_source: league - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Team TeamAggregatedValues: graphql_fields_by_name: @@ -7737,7 +7737,7 @@ object_types_by_name: id_source: cost.currency rollover_timestamp_value_source: cost_currency_introduced_on routing_value_source: cost_currency_primary_continent - script_id: update_WidgetCurrency_from_Widget_a63538953d45f84cba67edd655d129a8 + script_id: update_WidgetCurrency_from_Widget_e72813bd1a8767343222b77bf53be31c type: WidgetCurrency - data_params: amount_cents: @@ -7817,7 +7817,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: created_at routing_value_source: workspace_id2 - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Widget - data_params: widget_cost: @@ -7848,7 +7848,7 @@ object_types_by_name: version: cardinality: one relationship: widget - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Component WidgetAggregatedValues: graphql_fields_by_name: @@ -8085,7 +8085,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: introduced_on routing_value_source: primary_continent - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: WidgetCurrency WidgetCurrencyAggregatedValues: graphql_fields_by_name: @@ -9112,7 +9112,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: WidgetWorkspace - data_params: workspace_name: @@ -9133,7 +9133,7 @@ object_types_by_name: relationship: workspace rollover_timestamp_value_source: widget.created_at routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Widget WidgetWorkspaceAggregatedValues: graphql_fields_by_name: @@ -9364,4 +9364,4 @@ static_script_ids_by_scoped_name: field/as_day_of_week: field_as_day_of_week_f2b5c7d9e8f75bf2457b52412bfb6537 field/as_time_of_day: field_as_time_of_day_ed82aba44fc66bff5635bec4305c1c66 filter/by_time_of_day: filter_by_time_of_day_ea12d0561b24961789ab68ed38435612 - update/index_data: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + update/index_data: update_index_data_db769e42fd258f3884d2c5b721bb3c03 diff --git a/config/schema/artifacts_with_apollo/datastore_config.yaml b/config/schema/artifacts_with_apollo/datastore_config.yaml index 7fd1aebc7..5ff90ed68 100644 --- a/config/schema/artifacts_with_apollo/datastore_config.yaml +++ b/config/schema/artifacts_with_apollo/datastore_config.yaml @@ -1852,7 +1852,7 @@ indices: index.number_of_shards: 1 index.max_result_window: 10000 scripts: - update_WidgetCurrency_from_Widget_a63538953d45f84cba67edd655d129a8: + update_WidgetCurrency_from_Widget_e72813bd1a8767343222b77bf53be31c: context: update script: lang: painless @@ -1996,7 +1996,7 @@ scripts: return false; } - Map data = params.data; + Map data = params.topLevelFields; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List scriptErrors = new ArrayList(); if (ctx._source.details == null) { @@ -2156,67 +2156,67 @@ scripts: // No timestamp values matched the params, so return `false`. return false; - update_index_data_1fdfaf1c9261c96019decc89b515bd9a: + update_index_data_db769e42fd258f3884d2c5b721bb3c03: context: update script: lang: painless source: |- - Map source = ctx._source; - String sourceId = params.sourceId; - String relationship = params.relationship; - - // Numbers in JSON appear to be parsed as doubles, but we want the version stored as a long, so we need to cast it here. - long eventVersion = (long) params.version; + // --- Helper Functions --- // + void setup(Map source, String relationship, Map counts) { + if (source.__sources == null) { + source.__sources = []; + } - if (source.__sources == null) { - source.__sources = []; - } + if (source.__versions == null) { + source.__versions = [:]; + } - if (source.__versions == null) { - source.__versions = [:]; - } + if (source.__versions[relationship] == null) { + source.__versions[relationship] = [:]; + } - if (source.__versions[relationship] == null) { - source.__versions[relationship] = [:]; + if (counts != null && source.__counts == null) { + source.__counts = [:]; + } } - Map relationshipVersionsMap = source.__versions.get(relationship); - List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(id -> id != sourceId).collect(Collectors.toList()); + void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) { + Map relationshipVersionsMap = source.__versions.get(relationship); + List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); - if (previousSourceIdsForRelationship.size() > 0) { - String previousIdDescription = previousSourceIdsForRelationship.size() == 1 ? previousSourceIdsForRelationship.get(0) : previousSourceIdsForRelationship.toString(); - throw new IllegalArgumentException( - "Cannot update document " + params.id + " " + - "with data from related " + relationship + " " + sourceId + " " + - "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + - "but mutations of relationships used with `sourced_from` are not supported because " + - "allowing it could break ElasticGraph's out-of-order processing guarantees." - ); - } + if (previousSourceIdsForRelationship.size() > 0) { + throw new IllegalArgumentException( + "Cannot update document " + id + " " + + "with data from related " + relationship + " " + sourceId + " " + + "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + + "but mutations of relationships used with `sourced_from` are not supported because " + + "allowing it could break ElasticGraph's out-of-order processing guarantees." + ); + } - Number maybeDocVersion = source.__versions.get(params.relationship)?.get(params.sourceId); + Number maybeDocVersion = relationshipVersionsMap.get(sourceId); - // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. - long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); + // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. + long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); - if (docVersion >= eventVersion) { - throw new IllegalArgumentException("ElasticGraph update was a no-op: [" + - params.id + "]: version conflict, current version [" + - docVersion + "] is higher or equal to the one provided [" + - eventVersion + "]"); - } else { - source.putAll(params.data); - Map __counts = params.__counts; + if (docVersion >= eventVersion) { + throw new IllegalArgumentException("ElasticGraph update was a no-op: [" + + id + "]: version conflict, current version [" + + docVersion + "] is higher or equal to the one provided [" + + eventVersion + "]"); + } + } - if (__counts != null) { - if (source.__counts == null) { - source.__counts = [:]; - } + void applyTopLevelFields(Map source, Map topLevelFields, Map counts) { + source.putAll(topLevelFields); - source.__counts.putAll(__counts); + if (counts != null) { + source.__counts.putAll(counts); } + } - source.id = params.id; + void recordSource(Map source, String id, String relationship, String sourceId, long eventVersion) { + source.id = id; source.__versions[relationship][sourceId] = eventVersion; // Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list. @@ -2233,3 +2233,16 @@ scripts: source.__sources.add(-sourceBinarySearchResult - 1, relationship); } } + + // --- Main script body --- // + Map source = ctx._source; + String id = params.id; + String relationship = params.relationship; + String sourceId = params.sourceId; + long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles + Map counts = params.__counts; + + setup(source, relationship, counts); + validateSource(source, id, relationship, sourceId, eventVersion); + applyTopLevelFields(source, params.topLevelFields, counts); + recordSource(source, id, relationship, sourceId, eventVersion); diff --git a/config/schema/artifacts_with_apollo/runtime_metadata.yaml b/config/schema/artifacts_with_apollo/runtime_metadata.yaml index 9d11bdd4a..80fed6991 100644 --- a/config/schema/artifacts_with_apollo/runtime_metadata.yaml +++ b/config/schema/artifacts_with_apollo/runtime_metadata.yaml @@ -3125,7 +3125,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Address AddressAggregatedValues: graphql_fields_by_name: @@ -3309,7 +3309,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: BrokerWholesaler ColorListFilterInput: graphql_fields_by_name: @@ -3350,7 +3350,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Company Component: graphql_fields_by_name: @@ -3479,7 +3479,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Component ComponentAggregatedValues: graphql_fields_by_name: @@ -3831,7 +3831,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: DirectWholesaler DistributionChannel: graphql_fields_by_name: @@ -4008,7 +4008,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: ElectricalPart ElectricalPartAggregatedValues: graphql_fields_by_name: @@ -4491,7 +4491,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Manufacturer ManufacturerAggregatedValues: graphql_fields_by_name: @@ -4651,7 +4651,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: MechanicalPart MechanicalPartAggregatedValues: graphql_fields_by_name: @@ -5633,7 +5633,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: OnlineStore PageInfo: graphql_fields_by_name: @@ -5830,7 +5830,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Person PersonAggregatedValues: graphql_fields_by_name: @@ -5895,7 +5895,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: PhysicalStore PhysicalStoreAggregatedValues: graphql_fields_by_name: @@ -6436,7 +6436,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Sponsor SponsorAggregatedValues: graphql_fields_by_name: @@ -6818,7 +6818,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: formed_on routing_value_source: league - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Team TeamAggregatedValues: graphql_fields_by_name: @@ -7866,7 +7866,7 @@ object_types_by_name: id_source: cost.currency rollover_timestamp_value_source: cost_currency_introduced_on routing_value_source: cost_currency_primary_continent - script_id: update_WidgetCurrency_from_Widget_a63538953d45f84cba67edd655d129a8 + script_id: update_WidgetCurrency_from_Widget_e72813bd1a8767343222b77bf53be31c type: WidgetCurrency - data_params: amount_cents: @@ -7946,7 +7946,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: created_at routing_value_source: workspace_id2 - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Widget - data_params: widget_cost: @@ -7977,7 +7977,7 @@ object_types_by_name: version: cardinality: one relationship: widget - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Component WidgetAggregatedValues: graphql_fields_by_name: @@ -8214,7 +8214,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: introduced_on routing_value_source: primary_continent - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: WidgetCurrency WidgetCurrencyAggregatedValues: graphql_fields_by_name: @@ -9241,7 +9241,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: WidgetWorkspace - data_params: workspace_name: @@ -9262,7 +9262,7 @@ object_types_by_name: relationship: workspace rollover_timestamp_value_source: widget.created_at routing_value_source: id - script_id: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + script_id: update_index_data_db769e42fd258f3884d2c5b721bb3c03 type: Widget WidgetWorkspaceAggregatedValues: graphql_fields_by_name: @@ -9536,4 +9536,4 @@ static_script_ids_by_scoped_name: field/as_day_of_week: field_as_day_of_week_f2b5c7d9e8f75bf2457b52412bfb6537 field/as_time_of_day: field_as_time_of_day_ed82aba44fc66bff5635bec4305c1c66 filter/by_time_of_day: filter_by_time_of_day_ea12d0561b24961789ab68ed38435612 - update/index_data: update_index_data_1fdfaf1c9261c96019decc89b515bd9a + update/index_data: update_index_data_db769e42fd258f3884d2c5b721bb3c03 diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/operation/count_accumulator.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/operation/count_accumulator.rb index 1450e72ee..4d97597b9 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/operation/count_accumulator.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/operation/count_accumulator.rb @@ -56,7 +56,7 @@ module Operation # @implements CountAccumulator def self.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:) # Here we compute the counts of our list elements so that we can index it. - data = compute_list_counts_of(params.fetch("data"), CountAccumulator.new_parent( + data = compute_list_counts_of(params.fetch("topLevelFields"), CountAccumulator.new_parent( # We merge in `type: nested` since the `nested` type indicates a new count accumulator parent and we want that applied at the root. mapping.merge("type" => "nested"), list_counts_field_paths_for_source @@ -67,7 +67,7 @@ def self.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_so # fields from multiple sources, we need `__counts` to get merged properly. So here we "promote" it from # `data.__counts` to being a root-level parameter. params.merge( - "data" => data.except(LIST_COUNTS_FIELD), + "topLevelFields" => data.except(LIST_COUNTS_FIELD), LIST_COUNTS_FIELD => data[LIST_COUNTS_FIELD] ) end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb index e1a57663a..6d77a53e5 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb @@ -240,21 +240,21 @@ def type_name_for_index(index_name) expect(submitted_body[0]).to eq({update: {_id: "1", _index: "widgets", retry_on_conflict: 5}}) expect(submitted_body[1]).to include( - script: a_hash_including(id: INDEX_DATA_UPDATE_SCRIPT_ID, params: a_hash_including("data")), + script: a_hash_including(id: INDEX_DATA_UPDATE_SCRIPT_ID, params: a_hash_including("topLevelFields")), scripted_upsert: true, upsert: {} ) expect(submitted_body[2]).to eq({update: {_id: "2", _index: "widgets", retry_on_conflict: 5}}) expect(submitted_body[3]).to include( - script: a_hash_including(id: INDEX_DATA_UPDATE_SCRIPT_ID, params: a_hash_including("data")), + script: a_hash_including(id: INDEX_DATA_UPDATE_SCRIPT_ID, params: a_hash_including("topLevelFields")), scripted_upsert: true, upsert: {} ) expect(submitted_body[4]).to eq({update: {_id: "1", _index: "components", retry_on_conflict: 5}}) expect(submitted_body[5]).to include( - script: a_hash_including(id: INDEX_DATA_UPDATE_SCRIPT_ID, params: a_hash_including("data")), + script: a_hash_including(id: INDEX_DATA_UPDATE_SCRIPT_ID, params: a_hash_including("topLevelFields")), scripted_upsert: true, upsert: {} ) @@ -265,7 +265,7 @@ def type_name_for_index(index_name) upsert: {}, script: a_hash_including( id: /WidgetCurrency_from_Widget_/, - params: {"data" => {"name" => ["thing1"]}, "id" => "USD"} + params: {"topLevelFields" => {"name" => ["thing1"]}, "id" => "USD"} ) ) end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb index 2114e954e..d420a40a3 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb @@ -148,7 +148,7 @@ module Operation params = script_params_for(data: data, source_type: "Manufacturer", destination_type: "Manufacturer") expect(params[LIST_COUNTS_FIELD]).to eq nil - expect(params["data"]).to exclude(LIST_COUNTS_FIELD) + expect(params["topLevelFields"]).to exclude(LIST_COUNTS_FIELD) end end @@ -188,14 +188,14 @@ module Operation {"currency" => "USD", "amount_cents" => 725} ]}) - expect(params.dig("data", "forbes_valuation_moneys_nested")).to eq [ + expect(params.dig("topLevelFields", "forbes_valuation_moneys_nested")).to eq [ {"currency" => "USD", "amount_cents" => 525}, {"currency" => "USD", "amount_cents" => 725} ] end def current_players_counts_for_team_event(data) - script_params_for_team_event(data).dig("data", "current_players_nested").map do |player| + script_params_for_team_event(data).dig("topLevelFields", "current_players_nested").map do |player| player.fetch(LIST_COUNTS_FIELD) end end @@ -259,7 +259,7 @@ def current_players_counts_for_team_event(data) end def seasons_for_team_event(data) - script_params_for_team_event(data).dig("data", "seasons_object") + script_params_for_team_event(data).dig("topLevelFields", "seasons_object") end end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb index fa6de53f2..4899ae56a 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb @@ -73,7 +73,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"name" => ["thing1"]}, + "topLevelFields" => {"name" => ["thing1"]}, "id" => "17" }}, scripted_upsert: true, @@ -102,7 +102,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: INDEX_DATA_UPDATE_SCRIPT_ID, params: { - "data" => {"name" => "thing1"}, + "topLevelFields" => {"name" => "thing1"}, "id" => "17", "staticValue" => 47, "sourceType" => "Widget", @@ -156,7 +156,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"name" => []}, + "topLevelFields" => {"name" => []}, "id" => "17" }}, scripted_upsert: true, @@ -177,7 +177,7 @@ module Operation {update: {_id: "embedded_workspace_id", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"name" => ["thing1"]}, + "topLevelFields" => {"name" => ["thing1"]}, "id" => "embedded_workspace_id" }}, scripted_upsert: true, @@ -202,7 +202,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"embedded_values.missing_field" => [], "name" => nil}, + "topLevelFields" => {"embedded_values.missing_field" => [], "name" => nil}, "id" => "17" }}, scripted_upsert: true, @@ -229,7 +229,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => { + "topLevelFields" => { "embedded_values" => ["thing1"], "name" => { "name" => "embedded_name", @@ -261,7 +261,7 @@ module Operation { script: {id: operations.first.update_target.script_id, params: { # Float-typed integer values are coerced to true ints before indexing - "data" => {"size" => [an_instance_of(::Integer).and(eq_to(4))]}, + "topLevelFields" => {"size" => [an_instance_of(::Integer).and(eq_to(4))]}, "id" => "17" }}, scripted_upsert: true, @@ -282,7 +282,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"name" => ["thing1"]}, + "topLevelFields" => {"name" => ["thing1"]}, "id" => "17" }}, scripted_upsert: true, @@ -291,7 +291,7 @@ module Operation {update: {_id: "18", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"name" => ["thing1"]}, + "topLevelFields" => {"name" => ["thing1"]}, "id" => "18" }}, scripted_upsert: true, @@ -300,7 +300,7 @@ module Operation {update: {_id: "19", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "data" => {"name" => ["thing1"]}, + "topLevelFields" => {"name" => ["thing1"]}, "id" => "19" }}, scripted_upsert: true, diff --git a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb index 2e8fd0653..dfb6c33a5 100644 --- a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb +++ b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb @@ -74,7 +74,7 @@ def params_for(doc_id:, event:, prepared_record:) [name, param.value_for(event)] end - meta.merge({"id" => doc_id, "data" => data}) + meta.merge({"id" => doc_id, "topLevelFields" => data}) end end end diff --git a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb index ea03bc500..482cc70c8 100644 --- a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb +++ b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb @@ -84,7 +84,7 @@ module RuntimeMetadata } ) - without_id_or_data = params.except("id", "data") + without_id_or_data = params.except("id", "topLevelFields") expect(without_id_or_data).to eq( "foo" => 43, @@ -93,7 +93,7 @@ module RuntimeMetadata ) end - it "extracts `event_params` from `prepared_record` and include them under `data`" do + it "extracts `event_params` from `prepared_record` and include them under `topLevelFields`" do params = params_for( data_params: { "foo" => static_param_with(43), @@ -108,7 +108,7 @@ module RuntimeMetadata } ) - expect(params.fetch("data")).to eq( + expect(params.fetch("topLevelFields")).to eq( "foo" => 43, "bar" => "hello", "bazz" => [12] diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb index da6183cc3..4adcd4544 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb @@ -336,7 +336,7 @@ def was_noop_variable(field) SCRIPT_ERRORS_VAR = "scriptErrors" STATIC_SETUP_STATEMENTS = <<~EOS.strip - Map data = params.data; + Map data = params.topLevelFields; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List #{SCRIPT_ERRORS_VAR} = new ArrayList(); EOS diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/mixins/has_indices.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/mixins/has_indices.rb index 577f80faa..0e644f6ad 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/mixins/has_indices.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/mixins/has_indices.rb @@ -431,7 +431,7 @@ def self_update_target return nil if abstract? || !root_document_type? # We exclude `id` from `data_params` because `Indexer::Operator::Update` automatically includes - # `params.id` so we don't want it duplicated at `params.data.id` alongside other data params. + # `params.id` so we don't want it duplicated at `params.topLevelFields.id` alongside other data params. # # In addition, we exclude fields that have an alternate `source` -- those fields will get populated # by a different event and we don't want to risk "stomping" their value via this update target. diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless index cc73bbb1b..3d956b777 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless @@ -1,59 +1,59 @@ -Map source = ctx._source; -String sourceId = params.sourceId; -String relationship = params.relationship; - -// Numbers in JSON appear to be parsed as doubles, but we want the version stored as a long, so we need to cast it here. -long eventVersion = (long) params.version; +// --- Helper Functions --- // +void setup(Map source, String relationship, Map counts) { + if (source.__sources == null) { + source.__sources = []; + } -if (source.__sources == null) { - source.__sources = []; -} + if (source.__versions == null) { + source.__versions = [:]; + } -if (source.__versions == null) { - source.__versions = [:]; -} + if (source.__versions[relationship] == null) { + source.__versions[relationship] = [:]; + } -if (source.__versions[relationship] == null) { - source.__versions[relationship] = [:]; + if (counts != null && source.__counts == null) { + source.__counts = [:]; + } } -Map relationshipVersionsMap = source.__versions.get(relationship); -List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(id -> id != sourceId).collect(Collectors.toList()); +void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) { + Map relationshipVersionsMap = source.__versions.get(relationship); + List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); -if (previousSourceIdsForRelationship.size() > 0) { - String previousIdDescription = previousSourceIdsForRelationship.size() == 1 ? previousSourceIdsForRelationship.get(0) : previousSourceIdsForRelationship.toString(); - throw new IllegalArgumentException( - "Cannot update document " + params.id + " " + - "with data from related " + relationship + " " + sourceId + " " + - "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + - "but mutations of relationships used with `sourced_from` are not supported because " + - "allowing it could break ElasticGraph's out-of-order processing guarantees." - ); -} + if (previousSourceIdsForRelationship.size() > 0) { + throw new IllegalArgumentException( + "Cannot update document " + id + " " + + "with data from related " + relationship + " " + sourceId + " " + + "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + + "but mutations of relationships used with `sourced_from` are not supported because " + + "allowing it could break ElasticGraph's out-of-order processing guarantees." + ); + } -Number maybeDocVersion = source.__versions.get(params.relationship)?.get(params.sourceId); + Number maybeDocVersion = relationshipVersionsMap.get(sourceId); -// Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. -long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); + // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. + long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); -if (docVersion >= eventVersion) { - throw new IllegalArgumentException("ElasticGraph update was a no-op: [" + - params.id + "]: version conflict, current version [" + - docVersion + "] is higher or equal to the one provided [" + - eventVersion + "]"); -} else { - source.putAll(params.data); - Map __counts = params.__counts; + if (docVersion >= eventVersion) { + throw new IllegalArgumentException("ElasticGraph update was a no-op: [" + + id + "]: version conflict, current version [" + + docVersion + "] is higher or equal to the one provided [" + + eventVersion + "]"); + } +} - if (__counts != null) { - if (source.__counts == null) { - source.__counts = [:]; - } +void applyTopLevelFields(Map source, Map topLevelFields, Map counts) { + source.putAll(topLevelFields); - source.__counts.putAll(__counts); + if (counts != null) { + source.__counts.putAll(counts); } +} - source.id = params.id; +void recordSource(Map source, String id, String relationship, String sourceId, long eventVersion) { + source.id = id; source.__versions[relationship][sourceId] = eventVersion; // Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list. @@ -70,3 +70,16 @@ if (docVersion >= eventVersion) { source.__sources.add(-sourceBinarySearchResult - 1, relationship); } } + +// --- Main script body --- // +Map source = ctx._source; +String id = params.id; +String relationship = params.relationship; +String sourceId = params.sourceId; +long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles +Map counts = params.__counts; + +setup(source, relationship, counts); +validateSource(source, id, relationship, sourceId, eventVersion); +applyTopLevelFields(source, params.topLevelFields, counts); +recordSource(source, id, relationship, sourceId, eventVersion); diff --git a/elasticgraph-support/lib/elastic_graph/constants.rb b/elasticgraph-support/lib/elastic_graph/constants.rb index 26c22d9f5..6a49d5fe2 100644 --- a/elasticgraph-support/lib/elastic_graph/constants.rb +++ b/elasticgraph-support/lib/elastic_graph/constants.rb @@ -140,7 +140,7 @@ module ElasticGraph # # Note: this constant is automatically kept up-to-date by our `schema_artifacts:dump` rake task. # @private - INDEX_DATA_UPDATE_SCRIPT_ID = "update_index_data_1fdfaf1c9261c96019decc89b515bd9a" + INDEX_DATA_UPDATE_SCRIPT_ID = "update_index_data_db769e42fd258f3884d2c5b721bb3c03" # When an update script has a no-op result we often want to communicate more information about # why it was a no-op back to ElatsicGraph from the script. The only way to do that is to throw diff --git a/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb b/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb index 4fe462741..bf1d8e8af 100644 --- a/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb +++ b/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb @@ -116,7 +116,7 @@ def build_jsonl_file_from(operations) next nil if op.update_target.type != op.event.fetch("type") params = op.to_datastore_bulk[1].fetch(:script).fetch(:params) - data = params.fetch("data").merge({ + data = params.fetch("topLevelFields").merge({ "id" => params.fetch("id"), "__eg_version" => params.fetch("version") })