Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion elasticgraph-indexer/lib/elastic_graph/indexer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def initialize(
config:,
datastore_core:,
datastore_router: nil,
ingestion_schema: nil,
monotonic_clock: nil,
clock: nil
)
Expand All @@ -38,6 +39,7 @@ def initialize(
@logger = datastore_core.logger
@datastore_router = datastore_router
@schema_artifacts = @datastore_core.schema_artifacts
@ingestion_schema = ingestion_schema
@monotonic_clock = monotonic_clock
@clock = clock || ::Time
end
Expand Down Expand Up @@ -81,7 +83,8 @@ def operation_factory
record_preparer_factory: record_preparer_factory,
logger: datastore_core.logger,
skip_derived_indexing_type_updates: config.skip_derived_indexing_type_updates,
configure_record_validator: nil
configure_record_validator: nil,
ingestion_schema: @ingestion_schema
)
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/support/json_schema/validator_factory"

module ElasticGraph
class Indexer
module IngestionSchemas
# Default ingestion schema implementation based on ElasticGraph JSON schema artifacts.
class JSONSchema
# Builds an ingestion schema adapter over ElasticGraph JSON schema artifacts.
def initialize(schema_artifacts:, record_preparer_factory:, configure_record_validator:)
@schema_artifacts = schema_artifacts
@record_preparer_factory = record_preparer_factory
@configure_record_validator = configure_record_validator
end

# Returns all known JSON schema versions that can be selected for ingestion.
def available_versions
@schema_artifacts.available_json_schema_versions
end

# Returns a validator for the given type and JSON schema version.
def validator_for(type, version)
factory = validator_factories_by_version[version] # : Support::JSONSchema::ValidatorFactory
factory.validator_for(type)
end

# Returns a record preparer configured for the given JSON schema version.
def record_preparer_for(version)
@record_preparer_factory.for_json_schema_version(version)
end

private

# Lazily builds and memoizes validator factories by schema version.
def validator_factories_by_version
@validator_factories_by_version ||= ::Hash.new do |hash, schema_version|
factory = Support::JSONSchema::ValidatorFactory.new(
schema: @schema_artifacts.json_schemas_for(schema_version),
sanitize_pii: true
)
factory = @configure_record_validator.call(factory) if @configure_record_validator
hash[schema_version] = factory
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
require "elastic_graph/constants"
require "elastic_graph/indexer/event_id"
require "elastic_graph/indexer/failed_event_error"
require "elastic_graph/indexer/ingestion_schemas/json_schema"
require "elastic_graph/indexer/operation/update"
require "elastic_graph/indexer/record_preparer"
require "elastic_graph/support/json_schema/validator_factory"
require "elastic_graph/support/memoizable_data"

module ElasticGraph
Expand All @@ -23,34 +23,41 @@ class Factory < Support::MemoizableData.define(
:record_preparer_factory,
:logger,
:skip_derived_indexing_type_updates,
:configure_record_validator
:configure_record_validator,
:ingestion_schema
)
REQUIRED_INGESTION_SCHEMA_METHODS = %i[
available_versions
validator_for
record_preparer_for
].freeze

def build(event)
event = prepare_event(event)

selected_json_schema_version = select_json_schema_version(event) { |failure| return failure }
selected_schema_version = select_schema_version(event) { |failure| return failure }

# Because the `select_json_schema_version` picks the closest-matching json schema version, the incoming
# Because `select_schema_version` picks the closest-matching schema version, the incoming
# event might not match the expected json_schema_version value in the json schema (which is a `const` field).
# This is by design, since we're picking a schema based on best-effort, so to avoid that by-design validation error,
# performing the envelope validation on a "patched" version of the event.
event_with_patched_envelope = event.merge({JSON_SCHEMA_VERSION_KEY => selected_json_schema_version})
event_with_patched_envelope = event.merge({JSON_SCHEMA_VERSION_KEY => selected_schema_version})

if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_json_schema_version).validate_with_error_message(event_with_patched_envelope))
if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_schema_version).validate_with_error_message(event_with_patched_envelope))
return build_failed_result(event, "event payload", error_message)
end

failed_result = validate_record_returning_failure(event, selected_json_schema_version)
failed_result = validate_record_returning_failure(event, selected_schema_version)
failed_result || BuildResult.success(build_all_operations_for(
event,
record_preparer_factory.for_json_schema_version(selected_json_schema_version)
resolved_ingestion_schema.record_preparer_for(selected_schema_version)
))
end

private

def select_json_schema_version(event)
available_json_schema_versions = schema_artifacts.available_json_schema_versions
def select_schema_version(event)
available_schema_versions = resolved_ingestion_schema.available_versions

requested_json_schema_version = event[JSON_SCHEMA_VERSION_KEY]

Expand All @@ -70,7 +77,7 @@ def select_json_schema_version(event)
# behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available json schema versions), the higher version
# should be selected. So to get that behavior, the list is sorted in descending order.
#
selected_json_schema_version = available_json_schema_versions.sort.reverse.min_by { |version| (requested_json_schema_version - version).abs }
selected_json_schema_version = available_schema_versions.sort.reverse.min_by { |version| (requested_json_schema_version - version).abs }

if selected_json_schema_version != requested_json_schema_version
logger.info({
Expand All @@ -87,27 +94,15 @@ def select_json_schema_version(event)
yield build_failed_result(
event, JSON_SCHEMA_VERSION_KEY,
"Failed to select json schema version. Requested version: #{event[JSON_SCHEMA_VERSION_KEY]}. \
Available json schema versions: #{available_json_schema_versions.sort.join(", ")}"
Available json schema versions: #{available_schema_versions.sort.join(", ")}"
)
end

selected_json_schema_version
end

def validator(type, selected_json_schema_version)
factory = validator_factories_by_version[selected_json_schema_version] # : Support::JSONSchema::ValidatorFactory
factory.validator_for(type)
end

def validator_factories_by_version
@validator_factories_by_version ||= ::Hash.new do |hash, json_schema_version|
factory = Support::JSONSchema::ValidatorFactory.new(
schema: schema_artifacts.json_schemas_for(json_schema_version),
sanitize_pii: true
)
factory = configure_record_validator.call(factory) if configure_record_validator
hash[json_schema_version] = factory
end
resolved_ingestion_schema.validator_for(type, selected_json_schema_version)
end

# This copies the `id` from event into the actual record
Expand All @@ -130,14 +125,44 @@ def validate_record_returning_failure(event, selected_json_schema_version)
def build_failed_result(event, payload_description, validation_message)
message = "Malformed #{payload_description}. #{validation_message}"

# Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid JSON schema
# version number in this case (which is usually required to get a `RecordPreparer` from the factory), and
# Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid schema
# version number in this case (which is usually required to get a non-identity record preparer), and
# we won't wind up using the record preparer for real on these operations, anyway.
operations = build_all_operations_for(event, RecordPreparer::Identity)

BuildResult.failure(FailedEventError.new(event: event, operations: operations.to_set, main_message: message))
end

def resolved_ingestion_schema
@resolved_ingestion_schema ||= if ingestion_schema
validate_ingestion_schema!(ingestion_schema)
else
IngestionSchemas::JSONSchema.new(
schema_artifacts: schema_artifacts,
record_preparer_factory: record_preparer_factory,
configure_record_validator: configure_record_validator
)
end
end

# Ensures custom ingestion schema overrides implement the required interface.
def validate_ingestion_schema!(candidate)
missing_methods = REQUIRED_INGESTION_SCHEMA_METHODS.reject do |method_name|
candidate.respond_to?(method_name)
end

return candidate if missing_methods.empty?

required_methods = REQUIRED_INGESTION_SCHEMA_METHODS.map { |method_name| "`#{method_name}`" }.join(", ")
missing_methods_display = missing_methods.map { |method_name| "`#{method_name}`" }.join(", ")

::Kernel.raise(
::ArgumentError,
"Invalid ingestion schema override. Expected an object responding to #{required_methods}. " \
"Missing methods: #{missing_methods_display}."
)
end

def build_all_operations_for(event, record_preparer)
# If `type` is missing or is not a known type (as indicated by `runtime_metadata` being nil)
# then we can't build a derived indexing type update operation. That case will only happen when we build
Expand Down
2 changes: 2 additions & 0 deletions elasticgraph-indexer/sig/elastic_graph/indexer.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module ElasticGraph
config: Config,
datastore_core: DatastoreCore,
?datastore_router: Indexer::_DatastoreRouter?,
?ingestion_schema: IngestionSchemas::_IngestionSchema?,
?monotonic_clock: Support::MonotonicClock?,
?clock: singleton(::Time)?
) -> void
Expand All @@ -33,6 +34,7 @@ module ElasticGraph

private

@ingestion_schema: IngestionSchemas::_IngestionSchema?
@clock: singleton(::Time)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module ElasticGraph
class Indexer
module IngestionSchemas
interface _IngestionSchema
def available_versions: () -> ::Set[::Integer]
def validator_for: (::String, ::Integer) -> Support::JSONSchema::Validator
def record_preparer_for: (::Integer) -> _RecordPreparer
end

class JSONSchema
include _IngestionSchema

def initialize: (
schema_artifacts: schemaArtifacts,
record_preparer_factory: RecordPreparer::Factory,
configure_record_validator: (^(Support::JSONSchema::ValidatorFactory) -> Support::JSONSchema::ValidatorFactory)?
) -> void

private

@schema_artifacts: schemaArtifacts
@record_preparer_factory: RecordPreparer::Factory
@configure_record_validator: (^(Support::JSONSchema::ValidatorFactory) -> Support::JSONSchema::ValidatorFactory)?
@validator_factories_by_version: ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]?

def validator_factories_by_version: () -> ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ module ElasticGraph
attr_reader logger: ::Logger
attr_reader skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]
attr_reader configure_record_validator: (^(validatorFactory) -> validatorFactory)?
attr_reader ingestion_schema: IngestionSchemas::_IngestionSchema?

def initialize: (
schema_artifacts: schemaArtifacts,
index_definitions_by_graphql_type: ::Hash[::String, ::Array[DatastoreCore::_IndexDefinition]],
record_preparer_factory: RecordPreparer::Factory,
logger: ::Logger,
skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]],
configure_record_validator: (^(validatorFactory) -> validatorFactory)?
configure_record_validator: (^(validatorFactory) -> validatorFactory)?,
ingestion_schema: IngestionSchemas::_IngestionSchema?
) -> void

def with: (
Expand All @@ -28,7 +30,8 @@ module ElasticGraph
?record_preparer_factory: RecordPreparer::Factory,
?logger: ::Logger,
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]],
?configure_record_validator: (^(validatorFactory) -> validatorFactory)?
?configure_record_validator: (^(validatorFactory) -> validatorFactory)?,
?ingestion_schema: IngestionSchemas::_IngestionSchema?
) -> instance
end

Expand All @@ -39,10 +42,11 @@ module ElasticGraph

def validator: (::String, ::Integer) -> Support::JSONSchema::Validator

@validator_factories_by_version: ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]?
def validator_factories_by_version: () -> ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]
@resolved_ingestion_schema: IngestionSchemas::_IngestionSchema?
def resolved_ingestion_schema: () -> IngestionSchemas::_IngestionSchema
def validate_ingestion_schema!: (untyped) -> IngestionSchemas::_IngestionSchema

def select_json_schema_version: (event) { (BuildResult) -> bot } -> (::Integer | bot)
def select_schema_version: (event) { (BuildResult) -> bot } -> (::Integer | bot)
def prepare_event: (event) -> event
def validate_record_returning_failure: (event, ::Integer) -> BuildResult?
def build_failed_result: (event, ::String, ::String) -> BuildResult
Expand Down
Loading