diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f686efc5e11..15f519615af 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7612,9 +7612,12 @@ version = "0.8.0" dependencies = [ "anyhow", "arrow", + "chrono", "parquet", "proptest", + "prost 0.14.3", "quickwit-common", + "quickwit-proto", "sea-query", "serde", "serde_json", diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 9f79032d7c5..e29d4af2540 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -68,7 +68,7 @@ pub struct IndexingSchedulerState { /// /// Scheduling executes the following steps: /// 1. Builds a [`PhysicalIndexingPlan`] from the list of logical indexing tasks. See -/// [`build_physical_indexing_plan`] for the implementation details. +/// `build_physical_indexing_plan` for the implementation details. /// 2. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks /// by gRPC. An indexer immediately returns an Ok and apply asynchronously the received plan. Any /// errors (network) happening in this step are ignored. The scheduler runs a control loop that @@ -98,7 +98,7 @@ pub struct IndexingSchedulerState { /// Concretely, it will send the faulty nodes of the plan they are supposed to follow. // /// Finally, in order to give the time for each indexer to run their indexing tasks, the control -/// plane will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired +/// plane will wait at least `MIN_DURATION_BETWEEN_SCHEDULING` before comparing the desired /// plan with the running plan. pub struct IndexingScheduler { cluster_id: String, diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 1827af7a153..448b7a4e312 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -66,7 +66,7 @@ pub enum UploaderType { /// [`SplitsUpdateMailbox`] wraps either a [`Mailbox>`] or [`Mailbox

`]. /// /// It makes it possible to send a splits update either to the [`Sequencer`] or directly -/// to the publisher actor `P`. It is used in combination with [`SplitsUpdateSender`] that +/// to the publisher actor `P`. It is used in combination with `SplitsUpdateSender` that /// will do the send. /// /// This is useful as we have different requirements between the indexing pipeline and diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 6fd5ce244be..e35618b99f8 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! [`FileBackedIndex`] module. It is public so that the crate `quickwit-backward-compat` can -//! import [`FileBackedIndex`] and run backward-compatibility tests. You should not have to import +//! `FileBackedIndex` module. It is public so that the crate `quickwit-backward-compat` can +//! import `FileBackedIndex` and run backward-compatibility tests. You should not have to import //! anything from here directly. mod serialize; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs index c13711de0b2..43979334f55 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs @@ -24,7 +24,7 @@ use tracing::error; use super::file_backed_index::FileBackedIndex; use super::store_operations::{METASTORE_FILE_NAME, load_index}; -/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first +/// Lazy `FileBackedIndex`. It loads a `FileBackedIndex` on demand. When the index is first /// loaded, it optionally spawns a task to periodically poll the storage and update the index. pub(crate) struct LazyFileBackedIndex { index_id: IndexId, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 00791488e65..74e0f05abae 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. //! Module for [`FileBackedMetastore`]. It is public so that the crate `quickwit-backward-compat` -//! can import [`FileBackedIndex`] and run backward-compatibility tests. You should not have to +//! can import `FileBackedIndex` and run backward-compatibility tests. You should not have to //! import anything from here directly. pub mod file_backed_index; @@ -116,9 +116,9 @@ impl From for MutationOccurred<()> { /// into as many files and stores a map of indexes /// (index_id, index_status) in a dedicated file `manifest.json`. /// -/// A [`LazyIndexStatus`] describes the lifecycle of an index: [`LazyIndexStatus::Creating`] and -/// [`LazyIndexStatus::Deleting`] are transitioning states that indicates that the index is not -/// yet available. On the contrary, the [`LazyIndexStatus::Active`] status indicates the index is +/// A `LazyIndexStatus` describes the lifecycle of an index: `LazyIndexStatus::Creating` and +/// `LazyIndexStatus::Deleting` are transitioning states that indicates that the index is not +/// yet available. On the contrary, the `LazyIndexStatus::Active` status indicates the index is /// ready to be fetched and updated. /// /// Transitioning states are useful to track inconsistencies between the in-memory and on-disk data diff --git a/quickwit/quickwit-metastore/src/metastore_resolver.rs b/quickwit/quickwit-metastore/src/metastore_resolver.rs index 7793fdbbf45..265d9069f3c 100644 --- a/quickwit/quickwit-metastore/src/metastore_resolver.rs +++ b/quickwit/quickwit-metastore/src/metastore_resolver.rs @@ -45,7 +45,7 @@ impl fmt::Debug for MetastoreResolver { } impl MetastoreResolver { - /// Creates an empty [`MetastoreResolverBuilder`]. + /// Creates an empty `MetastoreResolverBuilder`. pub fn builder() -> MetastoreResolverBuilder { MetastoreResolverBuilder::default() } diff --git a/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs b/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs index 17eab282bc1..994bf7cb324 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs @@ -886,9 +886,9 @@ mod tests { Some("unknown_service") ); // No metric_unit tag when unit is empty - assert!(dp.tags.get("metric_unit").is_none()); + assert!(!dp.tags.contains_key("metric_unit")); // No start_timestamp_secs tag when start time is 0 - assert!(dp.tags.get("start_timestamp_secs").is_none()); + assert!(!dp.tags.contains_key("start_timestamp_secs")); // Only "service" should be in tags (no attributes, no unit, no start time) assert_eq!(dp.tags.len(), 1); } diff --git a/quickwit/quickwit-parquet-engine/Cargo.toml b/quickwit/quickwit-parquet-engine/Cargo.toml index 9842ceb6f00..9744dbcc2bc 100644 --- a/quickwit/quickwit-parquet-engine/Cargo.toml +++ b/quickwit/quickwit-parquet-engine/Cargo.toml @@ -13,8 +13,10 @@ license.workspace = true [dependencies] anyhow = { workspace = true } arrow = { workspace = true } +chrono = { workspace = true } parquet = { workspace = true } quickwit-common = { workspace = true } +quickwit-proto = { workspace = true } sea-query = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/quickwit/quickwit-parquet-engine/src/lib.rs b/quickwit/quickwit-parquet-engine/src/lib.rs index 309ebf4f442..d34c67c665d 100644 --- a/quickwit/quickwit-parquet-engine/src/lib.rs +++ b/quickwit/quickwit-parquet-engine/src/lib.rs @@ -24,8 +24,10 @@ pub mod index; pub mod ingest; pub mod metrics; pub mod schema; +pub mod sort_fields; pub mod split; pub mod storage; +pub mod table_config; #[cfg(any(test, feature = "testsuite"))] pub mod test_helpers; diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/column_type.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/column_type.rs new file mode 100644 index 00000000000..21690f01e0e --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/column_type.rs @@ -0,0 +1,240 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Column type identification from name suffixes and string names. +//! +//! Type can be specified via Husky-convention suffixes (`__s`, `__i`, `__nf`) +//! or inferred from well-known bare names. The discriminant values match +//! the Go iota exactly for cross-system interoperability. + +use std::str::FromStr; + +use super::SortFieldsError; + +/// Well-known column name for timestamps. +pub const TIMESTAMP: &str = "timestamp"; + +/// Well-known column name for tiebreaker. +pub const TIEBREAKER: &str = "tiebreaker"; + +/// Well-known column name for timeseries ID hash. +pub const TIMESERIES_ID: &str = "timeseries_id"; + +/// Well-known column name for metric value. +pub const METRIC_VALUE: &str = "metric_value"; + +/// Column type IDs matching Go `types.TypeID` iota values. +/// +/// Only the types that appear in sort schemas are included here. +/// The discriminant values MUST match Go exactly for cross-system interop. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u64)] +pub enum ColumnTypeId { + Int64 = 2, + Float64 = 10, + String = 14, + Sketch = 17, + CpcSketch = 20, + ItemSketch = 22, +} + +impl ColumnTypeId { + /// The Husky-convention suffix for this column type. + /// + /// Used when serializing back to the string format with explicit types. + pub fn suffix(self) -> &'static str { + match self { + Self::Int64 => "__i", + Self::Float64 => "__nf", + Self::String => "__s", + Self::Sketch => "__sk", + Self::CpcSketch => "__cpcsk", + Self::ItemSketch => "__isk", + } + } + + /// Human-readable type name matching Go `TypeID.String()`. + pub fn as_str(self) -> &'static str { + match self { + Self::Int64 => "dense-int64", + Self::Float64 => "dense-float64", + Self::String => "dense-string", + Self::Sketch => "dense-sketch", + Self::CpcSketch => "dense-cpc-sketch", + Self::ItemSketch => "dense-item-sketch", + } + } + + /// Resolve column type from a column name, stripping any type suffix. + /// + /// Returns `(bare_name, type)`. Type resolution order: + /// 1. Explicit suffix (`__s`, `__i`, `__nf`, etc.) — stripped, type from suffix + /// 2. Well-known bare name defaults: + /// - `timestamp`, `tiebreaker`, `timeseries_id` → Int64 + /// - `metric_value` → Float64 + /// - everything else → String + pub fn from_column_name(name: &str) -> Result<(&str, Self), SortFieldsError> { + // Try explicit suffixes first (longest match first to avoid ambiguity). + if let Some(bare) = name.strip_suffix("__isk") { + return Ok((bare, Self::ItemSketch)); + } + if let Some(bare) = name.strip_suffix("__cpcsk") { + return Ok((bare, Self::CpcSketch)); + } + if let Some(bare) = name.strip_suffix("__sk") { + return Ok((bare, Self::Sketch)); + } + if let Some(bare) = name.strip_suffix("__nf") { + return Ok((bare, Self::Float64)); + } + if let Some(bare) = name.strip_suffix("__i") { + return Ok((bare, Self::Int64)); + } + if let Some(bare) = name.strip_suffix("__s") { + return Ok((bare, Self::String)); + } + + // No suffix — use well-known name defaults. + Ok((name, default_type_for_name(name))) + } +} + +/// Default column type and sort direction for a bare column name. +/// +/// This is the single source of truth for well-known column defaults. +/// Used by the parser (type inference, default direction), display +/// (suffix omission, direction omission), and validation. +pub struct ColumnDefaults { + pub column_type: ColumnTypeId, + /// True if the default sort direction is descending. + pub descending: bool, +} + +/// Well-known name → default type and sort direction lookup table. +/// +/// Columns not in this table default to String, ascending. +static WELL_KNOWN_COLUMNS: &[(&str, ColumnDefaults)] = &[ + ( + TIMESTAMP, + ColumnDefaults { + column_type: ColumnTypeId::Int64, + descending: true, + }, + ), + ( + "timestamp_secs", + ColumnDefaults { + column_type: ColumnTypeId::Int64, + descending: true, + }, + ), + ( + TIEBREAKER, + ColumnDefaults { + column_type: ColumnTypeId::Int64, + descending: false, + }, + ), + ( + TIMESERIES_ID, + ColumnDefaults { + column_type: ColumnTypeId::Int64, + descending: false, + }, + ), + ( + METRIC_VALUE, + ColumnDefaults { + column_type: ColumnTypeId::Float64, + descending: false, + }, + ), + ( + "value", + ColumnDefaults { + column_type: ColumnTypeId::Float64, + descending: false, + }, + ), +]; + +const DEFAULT_COLUMN: ColumnDefaults = ColumnDefaults { + column_type: ColumnTypeId::String, + descending: false, +}; + +/// Look up default type and direction for a bare column name. +pub fn column_defaults(name: &str) -> &'static ColumnDefaults { + WELL_KNOWN_COLUMNS + .iter() + .find(|(n, _)| *n == name) + .map(|(_, d)| d) + .unwrap_or(&DEFAULT_COLUMN) +} + +/// Default column type for a bare name (convenience wrapper). +pub fn default_type_for_name(name: &str) -> ColumnTypeId { + column_defaults(name).column_type +} + +/// Whether this bare name defaults to descending sort. +pub fn default_is_descending(name: &str) -> bool { + column_defaults(name).descending +} + +impl std::fmt::Display for ColumnTypeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Parse a type name string (e.g., "dense-int64") into a `ColumnTypeId`. +impl FromStr for ColumnTypeId { + type Err = SortFieldsError; + + fn from_str(s: &str) -> Result { + match s { + "dense-int64" => Ok(Self::Int64), + "dense-float64" => Ok(Self::Float64), + "dense-string" => Ok(Self::String), + "dense-sketch" => Ok(Self::Sketch), + "dense-cpc-sketch" => Ok(Self::CpcSketch), + "dense-item-sketch" => Ok(Self::ItemSketch), + _ => Err(SortFieldsError::UnknownColumnType(format!( + "unknown column type '{}'", + s + ))), + } + } +} + +/// Convert a proto `column_type` u64 back to a `ColumnTypeId`. +impl TryFrom for ColumnTypeId { + type Error = SortFieldsError; + + fn try_from(value: u64) -> Result { + match value { + 2 => Ok(Self::Int64), + 10 => Ok(Self::Float64), + 14 => Ok(Self::String), + 17 => Ok(Self::Sketch), + 20 => Ok(Self::CpcSketch), + 22 => Ok(Self::ItemSketch), + _ => Err(SortFieldsError::UnknownColumnType(format!( + "unknown column type id: {}", + value + ))), + } + } +} diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/display.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/display.rs new file mode 100644 index 00000000000..8a6d74f482a --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/display.rs @@ -0,0 +1,124 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sort schema to string serialization -- direct port of Go `SchemaToString` and +//! `SchemaToStringShort`. +//! +//! The proto `SortColumn.name` stores the bare Parquet column name (no type suffix). +//! These functions reconstruct the Husky-format suffixed name for serialization +//! using `SortColumn.column_type` to determine the suffix. + +use quickwit_proto::sortschema::{SortColumn, SortColumnDirection, SortSchema}; + +use super::column_type::{ColumnTypeId, default_is_descending, default_type_for_name}; + +fn direction_str(sort_direction: i32) -> &'static str { + match SortColumnDirection::try_from(sort_direction) { + Ok(SortColumnDirection::SortDirectionAscending) => ":+", + Ok(SortColumnDirection::SortDirectionDescending) => ":-", + _ => ":???", + } +} + +fn type_str(column_type: u64) -> &'static str { + match ColumnTypeId::try_from(column_type) { + Ok(ct) => ct.as_str(), + Err(_) => "unknown", + } +} + +/// Reconstruct the column name for the Husky string format. +/// +/// Only appends the type suffix when the column's type differs from the +/// default for its bare name. This keeps the string short and readable: +/// `metric_name` (default String) → no suffix needed +/// `timestamp` (default Int64) → no suffix needed +/// `my_counter__i` → suffix needed (Int64 differs from default String) +fn display_name(col: &SortColumn) -> String { + let col_type = match ColumnTypeId::try_from(col.column_type) { + Ok(ct) => ct, + Err(_) => return col.name.clone(), + }; + let default_type = default_type_for_name(&col.name); + if col_type == default_type { + col.name.clone() + } else { + format!("{}{}", col.name, col_type.suffix()) + } +} + +/// Convert a `SortSchema` to its full string representation. +/// +/// Format: `[name=]column__suffix:type:+/-[|...][/V#]` +/// +/// Direct port of Go `SchemaToString`. +pub fn schema_to_string(schema: &SortSchema) -> String { + schema_to_string_inner(schema, true) +} + +/// Convert a `SortSchema` to its short string representation. +/// +/// Format: `[name=]column__suffix[|...][/V#]` +/// +/// Omits the explicit type and skips the sort direction when it matches the +/// default (ascending for non-timestamp, descending for timestamp). +/// +/// Direct port of Go `SchemaToStringShort`. +pub fn schema_to_string_short(schema: &SortSchema) -> String { + schema_to_string_inner(schema, false) +} + +/// Shared implementation for both full and short schema string formats. +/// +/// When `verbose` is true, includes the explicit type and always emits direction. +/// When `verbose` is false, omits type and skips direction when it matches the default. +fn schema_to_string_inner(schema: &SortSchema, verbose: bool) -> String { + let mut rv = String::new(); + + if !schema.name.is_empty() { + rv.push_str(&schema.name); + rv.push('='); + } + + for (i, col) in schema.column.iter().enumerate() { + if i > 0 { + rv.push('|'); + } + if schema.lsm_comparison_cutoff > 0 && i == schema.lsm_comparison_cutoff as usize { + rv.push('&'); + } + rv.push_str(&display_name(col)); + + if verbose { + rv.push(':'); + rv.push_str(type_str(col.column_type)); + rv.push_str(direction_str(col.sort_direction)); + } else { + let is_default_direction = if default_is_descending(&col.name) { + col.sort_direction == SortColumnDirection::SortDirectionDescending as i32 + } else { + col.sort_direction == SortColumnDirection::SortDirectionAscending as i32 + }; + if !is_default_direction { + rv.push_str(direction_str(col.sort_direction)); + } + } + } + + if schema.sort_version > 0 { + rv.push_str(&format!("/V{}", schema.sort_version)); + } + + rv +} diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/equivalence.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/equivalence.rs new file mode 100644 index 00000000000..403ec9fd135 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/equivalence.rs @@ -0,0 +1,62 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sort schema equivalence comparison -- direct port of Go `EquivalentSchemas` +//! and `EquivalentSchemasForCompaction`. + +use quickwit_proto::sortschema::SortSchema; + +/// Base comparison: checks sort_version and all columns match (name, type, direction). +/// +/// Hand-rolled comparison (not proto equality) because the Go compactor calls this +/// in tight loops on 10s-100s of thousands of fragments and proto.Equal allocates. +fn equivalent_schemas_base(a: &SortSchema, b: &SortSchema) -> bool { + if a.sort_version != b.sort_version { + return false; + } + if a.column.len() != b.column.len() { + return false; + } + for (a_col, b_col) in a.column.iter().zip(b.column.iter()) { + if a_col.name != b_col.name { + return false; + } + if a_col.column_type != b_col.column_type { + return false; + } + if a_col.sort_direction != b_col.sort_direction { + return false; + } + } + true +} + +/// Check if two schemas are equivalent, ignoring names and versioned schema. +/// +/// Compares columns, sort_version, and `lsm_comparison_cutoff`. +/// +/// Direct port of Go `EquivalentSchemas`. +pub fn equivalent_schemas(a: &SortSchema, b: &SortSchema) -> bool { + equivalent_schemas_base(a, b) && a.lsm_comparison_cutoff == b.lsm_comparison_cutoff +} + +/// Check if two schemas are equivalent for compaction purposes. +/// +/// Same as `equivalent_schemas` but ignores `lsm_comparison_cutoff`, providing +/// backward compatibility when old cplanners send steps without LSM cutoff info. +/// +/// Direct port of Go `EquivalentSchemasForCompaction`. +pub fn equivalent_schemas_for_compaction(a: &SortSchema, b: &SortSchema) -> bool { + equivalent_schemas_base(a, b) +} diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/mod.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/mod.rs new file mode 100644 index 00000000000..3043531c557 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/mod.rs @@ -0,0 +1,52 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sort fields types, parsing, and time-window arithmetic for metrics compaction. +//! +//! The sort fields define how rows are ordered within a Parquet split, +//! which determines merge and compaction behavior. The parser is a direct +//! port of Husky's Go `schemautils.go` for cross-system interoperability. + +// TableConfig.effective_sort_fields() is wired into ParquetWriter at construction +// time. The writer resolves sort field names to physical ParquetField columns; +// columns not yet in the schema (e.g., timeseries_id) are skipped during sort +// but recorded in metadata. +// +// TODO(Phase 32): Wire per-index TableConfig into IndexConfig so each index can +// override the default sort fields. Currently all metrics indexes use +// ProductType::Metrics default. +// +// When accepting user-supplied sort_fields for metrics indexes, validation MUST +// reject schemas that do not include timeseries_id__i immediately before timestamp. +// The timeseries_id tiebreaker is mandatory for metrics to ensure deterministic +// sort order across splits with identical tag combinations. + +pub mod column_type; +pub mod display; +pub mod equivalence; +pub mod parser; +pub mod validation; +pub mod window; + +#[cfg(test)] +mod tests; + +// Public API re-exports. +pub use column_type::ColumnTypeId; +pub use display::{schema_to_string, schema_to_string_short}; +pub use equivalence::{equivalent_schemas, equivalent_schemas_for_compaction}; +pub use parser::parse_sort_fields; +pub use quickwit_proto::SortFieldsError; +pub use validation::validate_schema; +pub use window::{validate_window_duration, window_start}; diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/parser.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/parser.rs new file mode 100644 index 00000000000..77b11757b29 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/parser.rs @@ -0,0 +1,338 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sort schema string parser -- direct port of Go `StringToSchema`. +//! +//! Parses Husky-style sort schema strings like: +//! `"metric_name|host|env|timeseries_id|timestamp/V2"` +//! into a `SortSchema` proto with correct column names, types, directions, and version. + +use quickwit_proto::sortschema::{SortColumn, SortColumnDirection, SortSchema}; + +use super::SortFieldsError; +use super::column_type::{ColumnTypeId, default_is_descending}; +use super::validation::validate_schema; + +/// The minimum sort version we accept. V0 (INCORRECT_TRIM) and V1 (TRIMMED_WITH_BUDGET) +/// are rejected per the strict V2-only decision. +const MINIMUM_SORT_VERSION: i32 = 2; + +/// Parse a sort schema string into a `SortSchema` proto. +/// +/// Direct port of Go `StringToSchema`. Accepts the format: +/// `[name=]column[|column...][/V#]` +/// +/// Each column can be: +/// - `name` (1-part): infer type from suffix, default direction +/// - `+name` or `name+` (1-part with prefix/suffix direction) +/// - `name:+/-` (2-part): infer type from suffix, explicit direction +/// - `name:type:+/-` (3-part): explicit type + verify matches suffix +/// +/// Direction (`+`/`-`) may appear as a prefix (`+name`), suffix (`name+`), +/// or after a colon (`name:+`). It is an error for direction to appear in +/// more than one position for a given column (e.g., `+name+` or `+name:+`). +/// +/// The `&` marker before a column name indicates the LSM comparison cutoff. +/// +/// **V2-only enforcement**: Only sort_version >= 2 is accepted. Unversioned +/// strings (default to 0), V0, and V1 are rejected. +pub fn parse_sort_fields(s: &str) -> Result { + let mut schema = SortSchema::default(); + let mut input = s; + + // Split on `=` for optional name prefix. + if let Some((name, rest)) = split_once_max2(input, '=', s)? { + schema.name = name.to_string(); + input = rest; + } + + // Split on `/` for version suffix. + let sort_version = parse_version_suffix(&mut input, s)?; + if sort_version < MINIMUM_SORT_VERSION { + return Err(SortFieldsError::UnsupportedVersion { + version: sort_version, + minimum: MINIMUM_SORT_VERSION, + }); + } + schema.sort_version = sort_version; + + // Parse columns. + let mut cutoff_marker_count = 0; + + for (i, col_str) in input.split('|').enumerate() { + let col_remaining = parse_cutoff_marker(col_str, i, &mut cutoff_marker_count, &mut schema)?; + + let (prefix_dir, after_prefix) = strip_direction_prefix(col_remaining); + let parts: Vec<&str> = after_prefix.split(':').collect(); + + let column = match parts.len() { + 3 => parse_3part(parts, prefix_dir, col_str)?, + 2 => parse_2part(parts, prefix_dir, col_str)?, + 1 => { + schema + .column + .push(parse_1part(parts[0], prefix_dir, col_str)?); + continue; + } + _ => { + return Err(SortFieldsError::InvalidColumnFormat(format!( + "columns should be of the form 'name:type:+/-' or 'name:+/-' or 'name', \ + found: {}", + col_str + ))); + } + }; + + schema.column.push(column); + } + + if cutoff_marker_count > 0 && schema.column.len() < 2 { + return Err(SortFieldsError::InvalidCutoffPlacement( + "LSM cutoff marker (&) requires at least 2 columns".to_string(), + )); + } + + validate_schema(&schema)?; + Ok(schema) +} + +/// Parse the `/V#` version suffix, updating `input` to point at the columns portion. +/// Returns 0 if no version suffix is present. +fn parse_version_suffix(input: &mut &str, original: &str) -> Result { + let Some((columns, version_str)) = split_once_max2(input, '/', original)? else { + return Ok(0); + }; + let version_str = version_str.strip_prefix('V').ok_or_else(|| { + SortFieldsError::BadSortVersion(format!( + "mal-formatted sort schema '{}' -- bad sort version", + *input + )) + })?; + let version = version_str.parse::().map_err(|_| { + SortFieldsError::BadSortVersion(format!( + "mal-formatted sort schema '{}' parsing sort version", + *input + )) + })?; + *input = columns; + Ok(version) +} + +/// Handle the `&` LSM cutoff marker at the start of a column string. +/// Returns the remaining column string after stripping `&`. +fn parse_cutoff_marker<'a>( + col_str: &'a str, + column_index: usize, + cutoff_count: &mut usize, + schema: &mut SortSchema, +) -> Result<&'a str, SortFieldsError> { + let Some(rest) = col_str.strip_prefix('&') else { + if col_str.contains('&') { + return Err(SortFieldsError::MalformedSchema(format!( + "LSM cutoff marker (&) must appear at the beginning of column name, found in \ + middle of: {}", + col_str + ))); + } + return Ok(col_str); + }; + + *cutoff_count += 1; + if *cutoff_count > 1 { + return Err(SortFieldsError::InvalidCutoffPlacement( + "only one LSM cutoff marker (&) is allowed per schema".to_string(), + )); + } + schema.lsm_comparison_cutoff = column_index as i32; + if rest.is_empty() { + return Err(SortFieldsError::InvalidCutoffPlacement( + "LSM cutoff marker (&) must be followed by a valid column name".to_string(), + )); + } + if column_index == 0 { + return Err(SortFieldsError::InvalidCutoffPlacement( + "LSM cutoff marker (&) cannot be used on the first column as it would ignore all \ + columns" + .to_string(), + )); + } + if rest.contains('&') { + return Err(SortFieldsError::MalformedSchema(format!( + "LSM cutoff marker (&) must appear at the beginning of column name, found in middle \ + of: {}", + rest + ))); + } + Ok(rest) +} + +/// Resolve bare name and type from a column name string via `ColumnTypeId::from_column_name`. +fn resolve_name_type(name: &str) -> Result<(&str, ColumnTypeId), SortFieldsError> { + ColumnTypeId::from_column_name(name).map_err(|_| { + SortFieldsError::UnknownColumnType(format!( + "error determining type for column {} from suffix", + name + )) + }) +} + +/// Parse a 3-part column: `name__suffix:type:+/-`. +fn parse_3part( + parts: Vec<&str>, + prefix_dir: Option, + col_str: &str, +) -> Result { + let explicit_type: ColumnTypeId = parts[1].parse().map_err(|_| { + SortFieldsError::UnknownColumnType(format!( + "error determining type for column {}: unknown type '{}'", + parts[0], parts[1] + )) + })?; + let (bare_name, suffix_type) = resolve_name_type(parts[0])?; + if explicit_type != suffix_type { + return Err(SortFieldsError::TypeMismatch { + column: parts[0].to_string(), + from_suffix: suffix_type.to_string(), + explicit: explicit_type.to_string(), + }); + } + let colon_dir = parse_direction(parts[2])?; + if prefix_dir.is_some() { + return Err(SortFieldsError::DuplicateDirection(col_str.to_string())); + } + Ok(SortColumn { + name: bare_name.to_string(), + column_type: explicit_type as u64, + sort_direction: colon_dir, + }) +} + +/// Parse a 2-part column: `name__suffix:+/-`. +fn parse_2part( + parts: Vec<&str>, + prefix_dir: Option, + col_str: &str, +) -> Result { + // Reject direction suffix embedded in the name part: `name-:-` has + // direction in both the name suffix and the colon position. + let (embedded_suffix_dir, name_without_suffix) = strip_direction_suffix(parts[0]); + if embedded_suffix_dir.is_some() { + return Err(SortFieldsError::DuplicateDirection(col_str.to_string())); + } + let (bare_name, col_type) = resolve_name_type(name_without_suffix)?; + let colon_dir = parse_direction(parts[1])?; + if prefix_dir.is_some() { + return Err(SortFieldsError::DuplicateDirection(col_str.to_string())); + } + Ok(SortColumn { + name: bare_name.to_string(), + column_type: col_type as u64, + sort_direction: colon_dir, + }) +} + +/// Parse a 1-part column: `name__suffix` with optional direction prefix/suffix. +fn parse_1part( + part: &str, + prefix_dir: Option, + col_str: &str, +) -> Result { + let (suffix_dir, suffixed_name) = strip_direction_suffix(part); + if prefix_dir.is_some() && suffix_dir.is_some() { + return Err(SortFieldsError::DuplicateDirection(col_str.to_string())); + } + let (bare_name, col_type) = resolve_name_type(suffixed_name)?; + let direction = prefix_dir.or(suffix_dir).unwrap_or_else(|| { + if default_is_descending(bare_name) { + SortColumnDirection::SortDirectionDescending as i32 + } else { + SortColumnDirection::SortDirectionAscending as i32 + } + }); + Ok(SortColumn { + name: bare_name.to_string(), + column_type: col_type as u64, + sort_direction: direction, + }) +} + +/// Split `input` on the first `sep`, returning None if no separator. +/// Errors if there are more than 2 parts (i.e., multiple separators). +fn split_once_max2<'a>( + input: &'a str, + sep: char, + original: &str, +) -> Result, SortFieldsError> { + let mut iter = input.splitn(3, sep); + let first = iter.next().unwrap(); // always present + let second = match iter.next() { + Some(s) => s, + None => return Ok(None), + }; + if iter.next().is_some() { + return Err(SortFieldsError::MalformedSchema(format!( + "mal-formatted sort schema '{}'", + original + ))); + } + Ok(Some((first, second))) +} + +/// Strip a leading `+` or `-` from a string, returning the direction and remainder. +fn strip_direction_prefix(s: &str) -> (Option, &str) { + if let Some(rest) = s.strip_prefix('+') { + ( + Some(SortColumnDirection::SortDirectionAscending as i32), + rest, + ) + } else if let Some(rest) = s.strip_prefix('-') { + ( + Some(SortColumnDirection::SortDirectionDescending as i32), + rest, + ) + } else { + (None, s) + } +} + +/// Strip a trailing `+` or `-` from a string, returning the direction and trimmed name. +fn strip_direction_suffix(s: &str) -> (Option, &str) { + if s.len() > 1 { + if let Some(rest) = s.strip_suffix('+') { + return ( + Some(SortColumnDirection::SortDirectionAscending as i32), + rest, + ); + } + if let Some(rest) = s.strip_suffix('-') { + return ( + Some(SortColumnDirection::SortDirectionDescending as i32), + rest, + ); + } + } + (None, s) +} + +/// Parse a sort direction string ("+" or "-") into the proto enum value. +fn parse_direction(s: &str) -> Result { + match s { + "+" => Ok(SortColumnDirection::SortDirectionAscending as i32), + "-" => Ok(SortColumnDirection::SortDirectionDescending as i32), + _ => Err(SortFieldsError::UnknownSortDirection(format!( + "unknown sort direction '{}'", + s + ))), + } +} diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/tests.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/tests.rs new file mode 100644 index 00000000000..ee8e188916f --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/tests.rs @@ -0,0 +1,963 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Test suite ported from Go `schemautils_test.go` plus strict V2-only enforcement tests. + +use quickwit_proto::sortschema::{SortColumnDirection, SortSchema}; + +use super::column_type::ColumnTypeId; +use super::display::{schema_to_string, schema_to_string_short}; +use super::equivalence::{equivalent_schemas, equivalent_schemas_for_compaction}; +use super::parser::parse_sort_fields; + +// --------------------------------------------------------------------------- +// Helper +// --------------------------------------------------------------------------- + +fn must_parse(s: &str) -> SortSchema { + parse_sort_fields(s).unwrap_or_else(|e| panic!("failed to parse '{}': {}", s, e)) +} + +// --------------------------------------------------------------------------- +// Strict V2-only enforcement tests +// --------------------------------------------------------------------------- + +#[test] +fn test_v2_only_rejects_unversioned() { + // No version suffix defaults to version 0 -> rejected. + let err = parse_sort_fields("timestamp").unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("unsupported") || msg.contains("version"), + "expected unsupported version error, got: {}", + msg + ); +} + +#[test] +fn test_v2_only_rejects_v0() { + let err = parse_sort_fields("timestamp/V0").unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("unsupported") || msg.contains("version"), + "expected unsupported version error, got: {}", + msg + ); +} + +#[test] +fn test_v2_only_rejects_v1() { + let err = parse_sort_fields("timestamp/V1").unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("unsupported") || msg.contains("version"), + "expected unsupported version error, got: {}", + msg + ); +} + +#[test] +fn test_v2_only_accepts_v2() { + let schema = must_parse("timestamp/V2"); + assert_eq!(schema.sort_version, 2); + assert_eq!(schema.column.len(), 1); + assert_eq!(schema.column[0].name, "timestamp"); +} + +#[test] +fn test_v2_only_accepts_v3() { + let schema = must_parse("timestamp/V3"); + assert_eq!(schema.sort_version, 3); +} + +// --------------------------------------------------------------------------- +// Port of Go TestStringToSchema -- error paths +// --------------------------------------------------------------------------- + +#[test] +fn test_string_to_schema_mangled() { + // Mangled schema with multiple `=` + assert!(parse_sort_fields("a=b=c/V2").is_err(), "must disallow >1 ="); +} + +#[test] +fn test_string_to_schema_invalid_column_format() { + assert!( + parse_sort_fields("name:dense-int64:+:what-is-this?/V2").is_err(), + "must disallow invalid column formats" + ); + assert!( + parse_sort_fields("name:dense-int64:+:what-is-this?:really?/V2").is_err(), + "must disallow invalid column formats" + ); +} + +#[test] +fn test_string_to_schema_3part_errors() { + // Invalid type name. + assert!( + parse_sort_fields("name__i:invalid-type:+|timestamp/V2").is_err(), + "must disallow an invalid type" + ); + // Type mismatch: suffix says int64 but explicit says string. + assert!( + parse_sort_fields("name__i:dense-string:+|timestamp/V2").is_err(), + "must disallow mismatch between type suffix and explicit type" + ); + // Invalid sort direction. + assert!( + parse_sort_fields("name__i:dense-int64:invalid-sort-direction|timestamp/V2").is_err(), + "must disallow an invalid sort direction" + ); +} + +#[test] +fn test_string_to_schema_2part_errors() { + // `name__x` no longer errors: unknown suffixes are treated as bare column names + // with default type (String), so `name__x` is a valid column name. + assert!( + parse_sort_fields("name__x:+|timestamp/V2").is_ok(), + "bare column names with unknown suffix-like patterns are now valid" + ); + // Invalid sort direction. + assert!( + parse_sort_fields("name__i:invalid-sort-direction|timestamp/V2").is_err(), + "must disallow an invalid sort direction" + ); +} + +#[test] +fn test_string_to_schema_missing_timestamp() { + // Semantically invalid: missing timestamp column. + assert!( + parse_sort_fields("name__i:dense-int64:+/V2").is_err(), + "must disallow schema with missing timestamp column" + ); +} + +#[test] +fn test_string_to_schema_bad_version() { + // `/X` is not a valid version specification. + assert!( + parse_sort_fields("timestamp/X").is_err(), + "/X isn't a valid version specification" + ); + // `/VX` -- V followed by non-numeric. + assert!( + parse_sort_fields("timestamp/VX").is_err(), + "/VX isn't a valid version specification" + ); +} + +// --------------------------------------------------------------------------- +// LSM cutoff marker error paths (from Go TestStringToSchema) +// --------------------------------------------------------------------------- + +#[test] +fn test_lsm_cutoff_multiple_markers() { + assert!( + parse_sort_fields("service__s|&env__s|&source__s|timestamp/V2").is_err(), + "must disallow multiple LSM cutoff markers" + ); +} + +#[test] +fn test_lsm_cutoff_double_ampersand() { + assert!( + parse_sort_fields("service__s|&&env__s|timestamp/V2").is_err(), + "must disallow multiple consecutive LSM cutoff markers" + ); +} + +#[test] +fn test_lsm_cutoff_empty_after_marker() { + assert!( + parse_sort_fields("service__s|&|timestamp/V2").is_err(), + "must disallow empty column name after LSM cutoff marker" + ); +} + +#[test] +fn test_lsm_cutoff_in_middle() { + assert!( + parse_sort_fields("service__s|env&__s|timestamp/V2").is_err(), + "must disallow LSM cutoff marker in middle of column name" + ); +} + +#[test] +fn test_lsm_cutoff_single_column() { + assert!( + parse_sort_fields("×tamp/V2").is_err(), + "must disallow LSM cutoff marker on single column schema" + ); +} + +#[test] +fn test_lsm_cutoff_first_column() { + assert!( + parse_sort_fields("&service__s|env__s|timestamp/V2").is_err(), + "must disallow LSM cutoff marker on first column" + ); +} + +// --------------------------------------------------------------------------- +// Port of Go TestStringToSchema -- valid schemas +// --------------------------------------------------------------------------- + +#[test] +fn test_string_to_schema_timestamp_only() { + let s = must_parse("timestamp/V2"); + assert_eq!(s.column.len(), 1); + assert_eq!(s.column[0].name, "timestamp"); + assert_eq!(s.column[0].column_type, ColumnTypeId::Int64 as u64); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_string_to_schema_named_timestamp() { + let s = must_parse("defaultTimestampSchema=timestamp/V2"); + assert_eq!(s.name, "defaultTimestampSchema"); + assert_eq!(s.column.len(), 1); + assert_eq!(s.column[0].name, "timestamp"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_string_to_schema_named_timestamp_explicit_direction() { + let s = must_parse("defaultTimestampSchema=timestamp:-/V2"); + assert_eq!(s.name, "defaultTimestampSchema"); + assert_eq!(s.column[0].name, "timestamp"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_string_to_schema_named_timestamp_explicit_type() { + let s = must_parse("defaultTimestampSchema=timestamp:dense-int64:-/V2"); + assert_eq!(s.name, "defaultTimestampSchema"); + assert_eq!(s.column[0].name, "timestamp"); + assert_eq!(s.column[0].column_type, ColumnTypeId::Int64 as u64); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_string_to_schema_multi_column() { + let s = must_parse("testSchema=columnA__s|columnB__i:-|timestamp/V2"); + assert_eq!(s.name, "testSchema"); + assert_eq!(s.sort_version, 2); + assert_eq!(s.column.len(), 3); + + // columnA__s: string, ascending (default) + assert_eq!(s.column[0].name, "columnA"); + assert_eq!(s.column[0].column_type, ColumnTypeId::String as u64); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); + + // columnB__i: int64, descending (explicit) + assert_eq!(s.column[1].name, "columnB"); + assert_eq!(s.column[1].column_type, ColumnTypeId::Int64 as u64); + assert_eq!( + s.column[1].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); + + // timestamp: int64, descending (default) + assert_eq!(s.column[2].name, "timestamp"); + assert_eq!(s.column[2].column_type, ColumnTypeId::Int64 as u64); + assert_eq!( + s.column[2].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_string_to_schema_multi_column_explicit_type() { + let s = must_parse("testSchema=columnA__s:dense-string:+|columnB__i:-|timestamp/V2"); + assert_eq!(s.column[0].column_type, ColumnTypeId::String as u64); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +// --------------------------------------------------------------------------- +// SchemaToString and SchemaToStringShort +// --------------------------------------------------------------------------- + +#[test] +fn test_schema_to_string_full() { + let s = must_parse("testSchema=columnA__s|columnB__i:-|timestamp/V2"); + let full = schema_to_string(&s); + // columnA has type String == default for a generic name, so no suffix in output. + // columnB has type Int64 != default String, so __i suffix is preserved. + assert_eq!( + full, + "testSchema=columnA:dense-string:+|columnB__i:dense-int64:-|timestamp:dense-int64:-/V2" + ); +} + +#[test] +fn test_schema_to_string_short() { + let s = must_parse("testSchema=columnA__s|columnB__i:-|timestamp/V2"); + let short = schema_to_string_short(&s); + // columnA has type String == default, so no suffix. columnB keeps __i (Int64 != default). + assert_eq!(short, "testSchema=columnA|columnB__i:-|timestamp/V2"); +} + +// --------------------------------------------------------------------------- +// Round-trip tests +// --------------------------------------------------------------------------- + +#[test] +fn test_round_trip_short_form() { + // Inputs that round-trip to themselves: bare names (no suffix) and non-default + // typed columns already serialize without a suffix change. + let exact_round_trip_cases = [ + "timestamp/V2", + // `service` with no suffix: default String, serializes as `service` (no suffix). + "service|timestamp/V2", + "service|env|timestamp/V2", + // columnB__i: Int64 != default String, keeps __i suffix. + "testSchema=columnA|columnB__i:-|timestamp/V2", + ]; + for input in exact_round_trip_cases { + let parsed = must_parse(input); + let short = schema_to_string_short(&parsed); + assert_eq!(short, input, "short round-trip failed for '{}'", input); + + // Also verify full round-trip: parse(to_string(schema)) == schema. + let full = schema_to_string(&parsed); + let reparsed = must_parse(&full); + assert_eq!( + parsed.column.len(), + reparsed.column.len(), + "column count mismatch after full round-trip for '{}'", + input + ); + for (a, b) in parsed.column.iter().zip(reparsed.column.iter()) { + assert_eq!(a.name, b.name); + assert_eq!(a.column_type, b.column_type); + assert_eq!(a.sort_direction, b.sort_direction); + } + assert_eq!(parsed.sort_version, reparsed.sort_version); + assert_eq!(parsed.lsm_comparison_cutoff, reparsed.lsm_comparison_cutoff); + } + + // Inputs with explicit __s suffix on default-String columns: the short form drops + // the suffix (it is not needed), but the semantic content is preserved. + let semantic_round_trip_cases = [ + ("service__s|timestamp/V2", "service|timestamp/V2"), + ("service__s|env__s|timestamp/V2", "service|env|timestamp/V2"), + ( + "testSchema=columnA__s|columnB__i:-|timestamp/V2", + "testSchema=columnA|columnB__i:-|timestamp/V2", + ), + ]; + for (input, expected_short) in semantic_round_trip_cases { + let parsed = must_parse(input); + let short = schema_to_string_short(&parsed); + assert_eq!(short, expected_short, "short form mismatch for '{}'", input); + + // Verify semantic round-trip: parse the short output and compare protos. + let reparsed = must_parse(&short); + assert_eq!( + parsed.column.len(), + reparsed.column.len(), + "column count mismatch after semantic round-trip for '{}'", + input + ); + for (a, b) in parsed.column.iter().zip(reparsed.column.iter()) { + assert_eq!(a.name, b.name); + assert_eq!(a.column_type, b.column_type); + assert_eq!(a.sort_direction, b.sort_direction); + } + assert_eq!(parsed.sort_version, reparsed.sort_version); + assert_eq!(parsed.lsm_comparison_cutoff, reparsed.lsm_comparison_cutoff); + } +} + +// --------------------------------------------------------------------------- +// Port of Go TestEquivalentSchemas +// --------------------------------------------------------------------------- + +#[test] +fn test_equivalent_schemas_identical() { + let a = must_parse("timestamp/V2"); + let b = must_parse("timestamp/V2"); + assert!(equivalent_schemas(&a, &b)); + assert!(equivalent_schemas_for_compaction(&a, &b)); +} + +#[test] +fn test_equivalent_schemas_different_column_counts() { + let a = must_parse("service__s|timestamp/V2"); + let b = must_parse("timestamp/V2"); + assert!(!equivalent_schemas(&a, &b)); + assert!(!equivalent_schemas_for_compaction(&a, &b)); +} + +#[test] +fn test_equivalent_schemas_same_columns_different_names() { + let a = must_parse("service__s|timestamp/V2"); + let b = must_parse("serviceSchema=service__s|timestamp/V2"); + assert!(equivalent_schemas(&a, &b)); + assert!(equivalent_schemas_for_compaction(&a, &b)); +} + +#[test] +fn test_equivalent_schemas_different_column_order() { + let a = must_parse("env__s|service__s|timestamp/V2"); + let b = must_parse("serviceSchema=service__s|timestamp/V2"); + assert!(!equivalent_schemas(&a, &b)); + assert!(!equivalent_schemas_for_compaction(&a, &b)); +} + +#[test] +fn test_equivalent_schemas_different_versions() { + let a = must_parse("service__s|timestamp/V2"); + let b = must_parse("serviceSchema=service__s|timestamp/V3"); + assert!(!equivalent_schemas(&a, &b)); + assert!(!equivalent_schemas_for_compaction(&a, &b)); +} + +#[test] +fn test_equivalent_schemas_different_lsm_cutoffs() { + // Different LSM cutoffs: affects EquivalentSchemas but NOT EquivalentSchemasForCompaction. + let a = must_parse("service__s|&env__s|timestamp/V2"); + let b = must_parse("service__s|env__s|timestamp/V2"); + assert!( + !equivalent_schemas(&a, &b), + "different LSM cutoffs should not be equivalent" + ); + assert!( + equivalent_schemas_for_compaction(&a, &b), + "different LSM cutoffs should be equivalent for compaction" + ); +} + +#[test] +fn test_equivalent_schemas_different_cutoff_positions() { + let a = must_parse("service__s|&env__s|timestamp/V2"); + let b = must_parse("service__s|env__s|×tamp/V2"); + assert!(!equivalent_schemas(&a, &b)); + assert!(equivalent_schemas_for_compaction(&a, &b)); +} + +#[test] +fn test_equivalent_schemas_identical_lsm_cutoffs() { + let a = must_parse("service__s|&env__s|timestamp/V2"); + let b = must_parse("service__s|&env__s|timestamp/V2"); + assert!(equivalent_schemas(&a, &b)); + assert!(equivalent_schemas_for_compaction(&a, &b)); +} + +// --------------------------------------------------------------------------- +// Port of Go TestStringToSchemaWithLSMCutoff +// --------------------------------------------------------------------------- + +#[test] +fn test_lsm_cutoff_after_first_column() { + let s = must_parse("service__s|&env__s|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 1); + assert_eq!(s.column[1].name, "env"); // "&" stripped +} + +#[test] +fn test_lsm_cutoff_after_second_column() { + let s = must_parse("service__s|env__s|&source__s|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 2); + assert_eq!(s.column[2].name, "source"); +} + +#[test] +fn test_lsm_cutoff_before_timestamp() { + let s = must_parse("service__s|env__s|source__s|×tamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 3); + assert_eq!(s.column[3].name, "timestamp"); +} + +#[test] +fn test_lsm_cutoff_named_schema() { + let s = must_parse("testSchema=service__s|&env__s|timestamp/V2"); + assert_eq!(s.name, "testSchema"); + assert_eq!(s.lsm_comparison_cutoff, 1); +} + +#[test] +fn test_lsm_cutoff_with_version() { + let s = must_parse("service__s|&env__s|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 1); + assert_eq!(s.sort_version, 2); +} + +#[test] +fn test_lsm_cutoff_with_explicit_type_direction() { + let s = must_parse("service__s:dense-string:+|&env__s:dense-string:+|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 1); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +#[test] +fn test_no_lsm_cutoff() { + let s = must_parse("service__s|env__s|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 0); +} + +// --------------------------------------------------------------------------- +// Port of Go TestSchemaToStringWithLSMCutoff +// --------------------------------------------------------------------------- + +#[test] +fn test_schema_to_string_preserves_cutoff_marker_short() { + let s = must_parse("service__s|&env__s|timestamp/V2"); + // String columns with default type serialize without the __s suffix. + assert_eq!(schema_to_string_short(&s), "service|&env|timestamp/V2"); +} + +#[test] +fn test_schema_to_string_preserves_cutoff_marker_full() { + let s = must_parse("service__s|&env__s|timestamp/V2"); + // String columns with default type serialize without the __s suffix in full form too. + let expected = "service:dense-string:+|&env:dense-string:+|timestamp:dense-int64:-/V2"; + assert_eq!(schema_to_string(&s), expected); +} + +#[test] +fn test_schema_to_string_named_with_cutoff() { + let s = must_parse("testSchema=service__s|&env__s|timestamp/V2"); + // String columns with default type serialize without the __s suffix. + assert_eq!( + schema_to_string_short(&s), + "testSchema=service|&env|timestamp/V2" + ); +} + +#[test] +fn test_schema_to_string_no_cutoff() { + let s = must_parse("service__s|env__s|timestamp/V2"); + // String columns with default type serialize without the __s suffix. + assert_eq!(schema_to_string_short(&s), "service|env|timestamp/V2"); +} + +// --------------------------------------------------------------------------- +// Port of Go TestLSMCutoffRoundTrip +// --------------------------------------------------------------------------- + +#[test] +fn test_lsm_cutoff_round_trip() { + // Pairs of (input, expected_short_output). + // String columns with default type serialize without the __s suffix, so inputs + // using __s produce shorter output that is semantically equivalent. + let test_cases = [ + ( + "service__s|&env__s|timestamp/V2", + "service|&env|timestamp/V2", + ), + ( + "service__s|env__s|&source__s|timestamp/V2", + "service|env|&source|timestamp/V2", + ), + ( + "service__s|env__s|source__s|×tamp/V2", + "service|env|source|×tamp/V2", + ), + ( + "testSchema=service__s|&env__s|timestamp/V2", + "testSchema=service|&env|timestamp/V2", + ), + ("service__s|env__s|timestamp/V2", "service|env|timestamp/V2"), + ]; + for (input, expected_short) in test_cases { + let parsed = must_parse(input); + let result = schema_to_string_short(&parsed); + assert_eq!(result, expected_short, "round-trip failed for '{}'", input); + + // Verify the output parses back to the same proto (semantic round-trip). + let reparsed = must_parse(&result); + assert_eq!( + parsed.column.len(), + reparsed.column.len(), + "column count mismatch after round-trip for '{}'", + input + ); + for (a, b) in parsed.column.iter().zip(reparsed.column.iter()) { + assert_eq!(a.name, b.name); + assert_eq!(a.column_type, b.column_type); + assert_eq!(a.sort_direction, b.sort_direction); + } + assert_eq!( + parsed.lsm_comparison_cutoff, reparsed.lsm_comparison_cutoff, + "LSM cutoff mismatch for '{}'", + input + ); + } +} + +// --------------------------------------------------------------------------- +// Direction prefix/suffix tests +// --------------------------------------------------------------------------- + +#[test] +fn test_direction_prefix_ascending() { + let s = must_parse("+service__s|timestamp/V2"); + assert_eq!(s.column[0].name, "service"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +#[test] +fn test_direction_prefix_descending() { + let s = must_parse("-service__s|timestamp/V2"); + assert_eq!(s.column[0].name, "service"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_direction_suffix_ascending() { + let s = must_parse("service__s+|timestamp/V2"); + assert_eq!(s.column[0].name, "service"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +#[test] +fn test_direction_suffix_descending() { + let s = must_parse("service__s-|timestamp/V2"); + assert_eq!(s.column[0].name, "service"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_direction_prefix_descending_on_timestamp() { + // Explicit descending prefix on timestamp matches the default and is accepted. + let s = must_parse("service__s|-timestamp/V2"); + assert_eq!(s.column[1].name, "timestamp"); + assert_eq!( + s.column[1].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_direction_prefix_ascending_on_timestamp_rejected() { + // Ascending timestamp is rejected by validation (timestamp must be descending). + assert!( + parse_sort_fields("service__s|+timestamp/V2").is_err(), + "ascending timestamp must be rejected by validation" + ); +} + +#[test] +fn test_direction_suffix_on_timestamp() { + let s = must_parse("service__s|timestamp-/V2"); + assert_eq!(s.column[1].name, "timestamp"); + assert_eq!( + s.column[1].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +#[test] +fn test_direction_prefix_and_suffix_is_error() { + // +name+ : direction on both sides + assert!( + parse_sort_fields("+service__s+|timestamp/V2").is_err(), + "must reject direction on both prefix and suffix" + ); + // -name- : same direction both sides, still error + assert!( + parse_sort_fields("-service__s-|timestamp/V2").is_err(), + "must reject direction on both prefix and suffix (same direction)" + ); + // +name- : conflicting directions + assert!( + parse_sort_fields("+service__s-|timestamp/V2").is_err(), + "must reject conflicting direction prefix and suffix" + ); +} + +#[test] +fn test_direction_prefix_with_colon_direction_is_error() { + // +name:+ : prefix direction + colon direction + assert!( + parse_sort_fields("+service__s:+|timestamp/V2").is_err(), + "must reject direction in both prefix and colon form" + ); +} + +#[test] +fn test_direction_suffix_with_colon_direction_is_error() { + // name-:- : suffix direction + colon direction + assert!( + parse_sort_fields("service__s-:-|timestamp/V2").is_err(), + "must reject direction in both suffix and colon form" + ); +} + +#[test] +fn test_direction_prefix_multi_column() { + let s = must_parse("-metric_name__s|+host__s|timestamp/V2"); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); + assert_eq!( + s.column[1].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); + assert_eq!( + s.column[2].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 // default for timestamp + ); +} + +#[test] +fn test_direction_prefix_with_lsm_cutoff() { + // &+env__s : cutoff marker then direction prefix + let s = must_parse("service__s|&+env__s|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 1); + assert_eq!(s.column[1].name, "env"); + assert_eq!( + s.column[1].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +#[test] +fn test_direction_suffix_with_lsm_cutoff() { + let s = must_parse("service__s|&env__s-|timestamp/V2"); + assert_eq!(s.lsm_comparison_cutoff, 1); + assert_eq!(s.column[1].name, "env"); + assert_eq!( + s.column[1].sort_direction, + SortColumnDirection::SortDirectionDescending as i32 + ); +} + +// --------------------------------------------------------------------------- +// Bare name parsing (no type suffix — uses defaults) +// --------------------------------------------------------------------------- + +#[test] +fn test_bare_names_default_to_string() { + let s = must_parse("service|env|host|timestamp/V2"); + assert_eq!(s.column[0].name, "service"); + assert_eq!(s.column[0].column_type, ColumnTypeId::String as u64); + assert_eq!(s.column[1].name, "env"); + assert_eq!(s.column[1].column_type, ColumnTypeId::String as u64); + assert_eq!(s.column[2].name, "host"); + assert_eq!(s.column[2].column_type, ColumnTypeId::String as u64); +} + +#[test] +fn test_bare_timestamp_defaults_to_int64() { + let s = must_parse("service|timestamp/V2"); + assert_eq!(s.column[1].name, "timestamp"); + assert_eq!(s.column[1].column_type, ColumnTypeId::Int64 as u64); +} + +#[test] +fn test_bare_timeseries_id_defaults_to_int64() { + let s = must_parse("service|timeseries_id|timestamp/V2"); + assert_eq!(s.column[1].name, "timeseries_id"); + assert_eq!(s.column[1].column_type, ColumnTypeId::Int64 as u64); +} + +#[test] +fn test_bare_tiebreaker_defaults_to_int64() { + let s = must_parse("service|timestamp|tiebreaker/V2"); + assert_eq!(s.column[2].name, "tiebreaker"); + assert_eq!(s.column[2].column_type, ColumnTypeId::Int64 as u64); +} + +#[test] +fn test_bare_metric_value_defaults_to_float64() { + let s = must_parse("metric_value|timestamp/V2"); + assert_eq!(s.column[0].name, "metric_value"); + assert_eq!(s.column[0].column_type, ColumnTypeId::Float64 as u64); +} + +#[test] +fn test_bare_and_suffixed_produce_same_proto() { + let bare = must_parse("metric_name|host|timeseries_id|timestamp/V2"); + let suffixed = must_parse("metric_name__s|host__s|timeseries_id__i|timestamp/V2"); + assert_eq!(bare.column.len(), suffixed.column.len()); + for (a, b) in bare.column.iter().zip(suffixed.column.iter()) { + assert_eq!(a.name, b.name, "names should match"); + assert_eq!( + a.column_type, b.column_type, + "types should match for {}", + a.name + ); + assert_eq!( + a.sort_direction, b.sort_direction, + "directions should match for {}", + a.name + ); + } +} + +#[test] +fn test_suffix_overrides_default() { + // metric_value defaults to Float64, but __i suffix forces Int64 + let s = must_parse("metric_value__i|timestamp/V2"); + assert_eq!(s.column[0].name, "metric_value"); + assert_eq!(s.column[0].column_type, ColumnTypeId::Int64 as u64); +} + +#[test] +fn test_display_omits_default_suffix() { + let s = must_parse("metric_name|host|timestamp/V2"); + let short = schema_to_string_short(&s); + // All columns use default types, so no suffixes in output. + assert_eq!(short, "metric_name|host|timestamp/V2"); +} + +#[test] +fn test_display_includes_non_default_suffix() { + // Force host to Int64 (non-default for bare "host" which defaults to String) + let s = must_parse("metric_name|host__i|timestamp/V2"); + let short = schema_to_string_short(&s); + assert_eq!(short, "metric_name|host__i|timestamp/V2"); +} + +// --------------------------------------------------------------------------- +// timeseries_id handling +// --------------------------------------------------------------------------- + +#[test] +fn test_timeseries_id_as_int64() { + let s = must_parse("metric_name__s|host__s|timeseries_id__i|timestamp/V2"); + assert_eq!(s.column.len(), 4); + + // timeseries_id__i should be TypeIDInt64, ascending. + let ts_id_col = &s.column[2]; + assert_eq!(ts_id_col.name, "timeseries_id"); + assert_eq!(ts_id_col.column_type, ColumnTypeId::Int64 as u64); + assert_eq!( + ts_id_col.sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +// --------------------------------------------------------------------------- +// Float and sketch column types +// --------------------------------------------------------------------------- + +#[test] +fn test_float_column() { + let s = must_parse("value__nf|timestamp/V2"); + assert_eq!(s.column[0].column_type, ColumnTypeId::Float64 as u64); + assert_eq!( + s.column[0].sort_direction, + SortColumnDirection::SortDirectionAscending as i32 + ); +} + +#[test] +fn test_sketch_column() { + let s = must_parse("latency__sk|timestamp/V2"); + assert_eq!(s.column[0].column_type, ColumnTypeId::Sketch as u64); +} + +// --------------------------------------------------------------------------- +// SchemasToString / SchemasToStringShort multi-schema (convenience) +// --------------------------------------------------------------------------- + +#[test] +fn test_schemas_to_string() { + let schema1 = must_parse("test=key1__s|timestamp/V2"); + let schema2 = must_parse("key2__i|timestamp/V2"); + + let schemas = [schema1, schema2]; + let strings: Vec = schemas.iter().map(schema_to_string).collect(); + let actual = strings.join(","); + + // key1 has type String == default, so no suffix. key2 has type Int64 != default, keeps __i. + assert_eq!( + actual, + "test=key1:dense-string:+|timestamp:dense-int64:-/V2,key2__i:dense-int64:+|timestamp:\ + dense-int64:-/V2" + ); +} + +#[test] +fn test_schemas_to_string_short() { + let schema1 = must_parse("test=key1__s|timestamp/V2"); + let schema2 = must_parse("key2__i|timestamp/V2"); + + let schemas = [schema1, schema2]; + let strings: Vec = schemas.iter().map(schema_to_string_short).collect(); + let actual = strings.join(","); + + // key1 has type String == default, so no suffix. key2 has type Int64 != default, keeps __i. + assert_eq!(actual, "test=key1|timestamp/V2,key2__i|timestamp/V2"); +} + +// --------------------------------------------------------------------------- +// ColumnTypeId TryFrom (proto deserialization path) +// --------------------------------------------------------------------------- + +#[test] +fn test_column_type_try_from_u64_valid() { + assert_eq!(ColumnTypeId::try_from(2u64).unwrap(), ColumnTypeId::Int64); + assert_eq!( + ColumnTypeId::try_from(10u64).unwrap(), + ColumnTypeId::Float64 + ); + assert_eq!(ColumnTypeId::try_from(14u64).unwrap(), ColumnTypeId::String); + assert_eq!(ColumnTypeId::try_from(17u64).unwrap(), ColumnTypeId::Sketch); + assert_eq!( + ColumnTypeId::try_from(20u64).unwrap(), + ColumnTypeId::CpcSketch + ); + assert_eq!( + ColumnTypeId::try_from(22u64).unwrap(), + ColumnTypeId::ItemSketch + ); +} + +#[test] +fn test_column_type_try_from_u64_invalid() { + assert!(ColumnTypeId::try_from(0u64).is_err()); + assert!(ColumnTypeId::try_from(1u64).is_err()); + assert!(ColumnTypeId::try_from(99u64).is_err()); +} diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/validation.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/validation.rs new file mode 100644 index 00000000000..3a33db82f6f --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/validation.rs @@ -0,0 +1,114 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sort schema validation -- direct port of Go `ValidateSchema`. + +use std::collections::HashSet; + +use quickwit_proto::sortschema::{SortColumnDirection, SortSchema}; + +use super::SortFieldsError; +use super::column_type::{ColumnTypeId, TIEBREAKER, default_is_descending}; + +/// Name used for the special skip-builder schema that does not require timestamp. +const DEFAULT_SKIP_BUILDER_SCHEMA_NAME: &str = "defaultSkipBuilderSchema"; + +/// Check if a bare column name is a timestamp column (defaults to descending). +fn is_timestamp_column(name: &str) -> bool { + default_is_descending(name) +} + +/// Validate a sort schema, enforcing all rules from Go `ValidateSchema`. +/// +/// Rules: +/// - Schema must have at least one column. +/// - No duplicate column names. +/// - Sort direction must not be Unknown. +/// - `timestamp` must be present (unless schema name is `defaultSkipBuilderSchema`). +/// - `timestamp` must be Int64 and descending (unless it's a msgid schema). +/// - `timestamp` must come before `tiebreaker`. +/// - No non-tiebreaker columns may appear after `timestamp`. +pub fn validate_schema(schema: &SortSchema) -> Result<(), SortFieldsError> { + if schema.column.is_empty() { + return Err(SortFieldsError::ValidationError("empty schema".to_string())); + } + + let mut seen: HashSet<&str> = HashSet::new(); + let is_msgid = schema.version == 2 || schema.name == "defaultMsgIDsSchema"; + + for col in &schema.column { + let name = col.name.as_str(); + + if seen.contains(name) { + return Err(SortFieldsError::ValidationError(format!( + "column {} is duplicated in schema", + name + ))); + } + seen.insert(name); + + if col.sort_direction == SortColumnDirection::SortDirectionUnknown as i32 { + return Err(SortFieldsError::ValidationError(format!( + "column {} does not specify a sort direction in schema", + name + ))); + } + + let has_seen_timestamp = seen.iter().any(|s| is_timestamp_column(s)); + + if is_timestamp_column(name) { + if seen.contains(TIEBREAKER) { + return Err(SortFieldsError::ValidationError(format!( + "{} column must come before {} in schema", + name, TIEBREAKER + ))); + } + if col.sort_direction != SortColumnDirection::SortDirectionDescending as i32 + && !is_msgid + { + return Err(SortFieldsError::ValidationError(format!( + "{} column must sorted in descending order in schema", + name + ))); + } + if col.column_type != ColumnTypeId::Int64 as u64 { + return Err(SortFieldsError::ValidationError(format!( + "{} column must be of type int64 in schema", + name + ))); + } + } else if name == TIEBREAKER { + if !has_seen_timestamp { + return Err(SortFieldsError::ValidationError(format!( + "timestamp column must come before {} in schema", + TIEBREAKER + ))); + } + } else if has_seen_timestamp && !is_msgid { + return Err(SortFieldsError::ValidationError(format!( + "column {} is after timestamp but timestamp must be the last schema column", + name + ))); + } + } + + let has_timestamp = schema.column.iter().any(|c| is_timestamp_column(&c.name)); + if !has_timestamp && schema.name != DEFAULT_SKIP_BUILDER_SCHEMA_NAME { + return Err(SortFieldsError::ValidationError( + "timestamp column is required, but is missing from schema".to_string(), + )); + } + + Ok(()) +} diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs new file mode 100644 index 00000000000..b1ad896e31f --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs @@ -0,0 +1,240 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Canonical time-window functions for metrics compaction. +//! +//! `window_start` is the foundational time-partitioning function used by +//! ingestion (Phase 32), merge policy (Phase 33), and compaction planning +//! (Phase 35). Correctness at boundary conditions -- especially negative +//! timestamps and zero-crossing -- is critical because an off-by-one error +//! silently misroutes data to wrong windows. +//! +//! The implementation uses `rem_euclid` instead of the `%` operator to handle +//! negative timestamps correctly. Standard `%` truncates toward zero, which +//! gives wrong results for negative inputs: +//! - `-1 % 900 = -1` (wrong: would compute window_start as 0) +//! - `(-1i64).rem_euclid(900) = 899` (correct: window_start = -1 - 899 = -900) + +use chrono::{DateTime, Utc}; + +use super::SortFieldsError; + +/// Validate that a window duration evenly divides one hour (3600 seconds). +/// +/// This is ADR-003 invariant TW-2: all windows within an hour must have +/// identical boundaries regardless of when counting starts. 3600 has 45 +/// positive divisors (1, 2, 3, ..., 1800, 3600). Any of these are accepted. +/// In practice, metrics systems use durations >= 60s: 60, 120, 180, 240, +/// 300, 360, 600, 720, 900, 1200, 1800, 3600. +/// +/// A duration of 0 is rejected as nonsensical. Durations that do not evenly +/// divide 3600 are rejected because they would produce inconsistent window +/// boundaries across different starting points within an hour. +pub fn validate_window_duration(duration_secs: u32) -> Result<(), SortFieldsError> { + if duration_secs == 0 { + return Err(SortFieldsError::InvalidWindowDuration { + duration_secs, + reason: "must be positive", + }); + } + if 3600 % duration_secs != 0 { + return Err(SortFieldsError::InvalidWindowDuration { + duration_secs, + reason: "must evenly divide 3600 (one hour)", + }); + } + Ok(()) +} + +/// Compute the start of the time window containing the given timestamp. +/// +/// Uses `rem_euclid` for correct handling of negative timestamps (before Unix +/// epoch). Standard `%` truncates toward zero: `-1 % 900 = -1` (wrong). +/// `rem_euclid` always returns non-negative: `(-1i64).rem_euclid(900) = 899`. +/// So `window_start(-1, 900) = -1 - 899 = -900` (correct: timestamp -1 is in +/// window [-900, 0)). +/// +/// # Invariants (verified by proptest) +/// - Window start is aligned: `window_start % duration == 0` +/// - Timestamp is contained: `window_start <= timestamp < window_start + duration` +/// - Deterministic: same inputs always produce same output +/// +/// # Errors +/// Returns `SortFieldsError::WindowStartOutOfRange` if the computed start +/// timestamp cannot be represented as a `DateTime`. +pub fn window_start( + timestamp_secs: i64, + duration_secs: i64, +) -> Result, SortFieldsError> { + debug_assert!(duration_secs > 0, "window duration must be positive"); + // TW-2 (ADR-003): window duration must evenly divide one hour. + // This ensures window boundaries align across hours and days. + debug_assert!( + 3600 % duration_secs == 0, + "TW-2 violated: duration_secs={} does not divide 3600", + duration_secs + ); + let remainder = timestamp_secs.rem_euclid(duration_secs); + let start_secs = timestamp_secs - remainder; + DateTime::from_timestamp(start_secs, 0).ok_or(SortFieldsError::WindowStartOutOfRange { + timestamp_secs: start_secs, + }) +} + +#[cfg(test)] +mod tests { + use proptest::prelude::*; + + use super::*; + + // ----------------------------------------------------------------------- + // Proptest properties + // ----------------------------------------------------------------------- + + proptest! { + #[test] + fn window_start_is_aligned( + ts in -1_000_000_000i64..2_000_000_000i64, + dur in prop::sample::select(vec![60i64, 120, 180, 240, 300, 360, + 600, 720, 900, 1200, 1800, 3600]) + ) { + let ws = window_start(ts, dur).unwrap(); + let ws_secs = ws.timestamp(); + // window_start is aligned to duration + prop_assert_eq!(ws_secs.rem_euclid(dur), 0); + // timestamp is within [window_start, window_start + duration) + prop_assert!(ws_secs <= ts); + prop_assert!(ts < ws_secs + dur); + } + + #[test] + fn window_start_is_deterministic( + ts in -1_000_000_000i64..2_000_000_000i64, + dur in prop::sample::select(vec![60i64, 300, 900, 3600]) + ) { + let ws1 = window_start(ts, dur).unwrap(); + let ws2 = window_start(ts, dur).unwrap(); + prop_assert_eq!(ws1, ws2); + } + + #[test] + fn adjacent_windows_do_not_overlap( + ts in 0i64..1_000_000_000i64, + dur in prop::sample::select(vec![60i64, 300, 900, 3600]) + ) { + let ws = window_start(ts, dur).unwrap(); + let next_ws = window_start(ws.timestamp() + dur, dur).unwrap(); + // Next window starts exactly at current window end + prop_assert_eq!(next_ws.timestamp(), ws.timestamp() + dur); + } + } + + // ----------------------------------------------------------------------- + // Unit tests: edge cases + // ----------------------------------------------------------------------- + + #[test] + fn test_negative_timestamp_crossing() { + let ws = window_start(-1, 900).unwrap(); + assert_eq!(ws.timestamp(), -900); + } + + #[test] + fn test_zero_timestamp() { + let ws = window_start(0, 900).unwrap(); + assert_eq!(ws.timestamp(), 0); + } + + #[test] + fn test_exactly_on_boundary() { + let ws = window_start(900, 900).unwrap(); + assert_eq!(ws.timestamp(), 900); + } + + #[test] + fn test_one_before_boundary() { + let ws = window_start(899, 900).unwrap(); + assert_eq!(ws.timestamp(), 0); + } + + #[test] + fn test_large_negative_timestamp() { + let ws = window_start(-3601, 3600).unwrap(); + assert_eq!(ws.timestamp(), -7200); + } + + #[test] + fn test_60s_window() { + let ws = window_start(1_700_000_042, 60).unwrap(); + assert_eq!(ws.timestamp(), 1_700_000_040); + } + + // ----------------------------------------------------------------------- + // Validation tests + // ----------------------------------------------------------------------- + + #[test] + fn test_valid_window_durations() { + let valid = [60, 120, 180, 240, 300, 360, 600, 720, 900, 1200, 1800, 3600]; + for dur in valid { + assert!( + validate_window_duration(dur).is_ok(), + "duration {} should be valid", + dur + ); + } + } + + #[test] + fn test_invalid_window_durations() { + // None of these evenly divide 3600. + let invalid = [0, 7, 11, 13, 17, 700, 1000, 1500, 2000, 2400, 7200]; + for dur in invalid { + assert!( + validate_window_duration(dur).is_err(), + "duration {} should be invalid", + dur + ); + } + } + + #[test] + fn test_small_valid_divisors_also_accepted() { + // The function accepts all positive divisors of 3600, not just >= 60. + let small_valid = [ + 1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 30, 36, 40, 45, 48, 50, + ]; + for dur in small_valid { + assert!( + validate_window_duration(dur).is_ok(), + "duration {} should be valid (divides 3600)", + dur + ); + } + } + + #[test] + fn test_zero_duration_error_message() { + let err = validate_window_duration(0).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("must be positive"), "got: {msg}"); + } + + #[test] + fn test_non_divisor_error_message() { + let err = validate_window_duration(7).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("must evenly divide 3600"), "got: {msg}"); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/storage/config.rs b/quickwit/quickwit-parquet-engine/src/storage/config.rs index 13b8d395a44..2eb63d73510 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/config.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/config.rs @@ -202,8 +202,6 @@ impl ParquetWriterConfig { #[cfg(test)] mod tests { - use std::sync::Arc; - use arrow::datatypes::Field; use super::*; diff --git a/quickwit/quickwit-parquet-engine/src/table_config.rs b/quickwit/quickwit-parquet-engine/src/table_config.rs new file mode 100644 index 00000000000..0573b8177db --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/table_config.rs @@ -0,0 +1,191 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Default table configuration for Parquet pipeline product types. +//! +//! When a table (index) does not specify an explicit sort fields configuration, +//! these defaults are applied based on the product type. The `ParquetWriter` +//! resolves these sort field names to physical `ParquetField` columns at +//! construction time; columns not yet in the schema (e.g., `timeseries_id`) +//! are recorded in metadata but skipped during physical sort. + +use serde::{Deserialize, Serialize}; + +/// Product types supported by the Parquet pipeline. +/// +/// Each product type has a default sort fields schema that matches the common +/// query predicates for that signal type. See ADR-002 for the rationale. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ProductType { + Metrics, + Logs, + Traces, +} + +impl ProductType { + /// Default sort fields string for this product type. + /// + /// The metrics default includes `timeseries_id` as a tiebreaker before + /// `timestamp_secs`. Since `timeseries_id` is not yet a physical column, + /// the writer skips it during sort but records it in the metadata string. + /// When the column is added to the schema, sorting will include it + /// automatically. + /// + /// Logs and traces defaults are placeholders — they will be refined + /// when the Parquet pipeline is extended to those signal types. + pub fn default_sort_fields(self) -> &'static str { + match self { + Self::Metrics => "metric_name|tag_service|tag_env|tag_datacenter|tag_region|tag_host|timeseries_id|timestamp_secs/V2", + // Placeholder: column names TBD when logs Parquet schema is defined. + Self::Logs => "service_name|level|host|timestamp_secs/V2", + // Placeholder: column names TBD when traces Parquet schema is defined. + Self::Traces => "service_name|operation_name|trace_id|timestamp_secs/V2", + } + } +} + +/// Table-level configuration for the Parquet pipeline. +/// +/// Stored per-index. When `sort_fields` is `None`, the default for the +/// product type is used (see `ProductType::default_sort_fields()`). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TableConfig { + /// The product type determines schema defaults. + pub product_type: ProductType, + + /// Explicit sort fields override. When `None`, the product-type default + /// is used. When `Some`, this exact schema string is applied. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sort_fields: Option, + + /// Window duration in seconds for time-windowed compaction. + /// Default: 900 (15 minutes). Must divide 3600. + #[serde(default = "default_window_duration_secs")] + pub window_duration_secs: u32, +} + +fn default_window_duration_secs() -> u32 { + 900 +} + +impl TableConfig { + /// Create a new TableConfig for the given product type with defaults. + pub fn new(product_type: ProductType) -> Self { + Self { + product_type, + sort_fields: None, + window_duration_secs: default_window_duration_secs(), + } + } + + /// Get the effective sort fields string for this table. + /// + /// Returns the explicit override if set, otherwise the product-type default. + pub fn effective_sort_fields(&self) -> &str { + match &self.sort_fields { + Some(sf) => sf.as_str(), + None => self.product_type.default_sort_fields(), + } + } +} + +impl Default for TableConfig { + fn default() -> Self { + Self::new(ProductType::Metrics) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sort_fields::parse_sort_fields; + + #[test] + fn test_metrics_default_sort_fields_parses() { + let schema = parse_sort_fields(ProductType::Metrics.default_sort_fields()) + .expect("metrics default sort fields must parse"); + assert_eq!(schema.column.len(), 8); + // Proto names are bare (suffixes stripped by parser). + assert_eq!(schema.column[0].name, "metric_name"); + assert_eq!(schema.column[1].name, "tag_service"); + assert_eq!(schema.column[6].name, "timeseries_id"); + assert_eq!(schema.column[7].name, "timestamp_secs"); + } + + #[test] + fn test_logs_default_sort_fields_parses() { + let schema = parse_sort_fields(ProductType::Logs.default_sort_fields()) + .expect("logs default sort fields must parse"); + assert_eq!(schema.column.len(), 4); + assert_eq!(schema.column[0].name, "service_name"); + } + + #[test] + fn test_traces_default_sort_fields_parses() { + let schema = parse_sort_fields(ProductType::Traces.default_sort_fields()) + .expect("traces default sort fields must parse"); + assert_eq!(schema.column.len(), 4); + assert_eq!(schema.column[0].name, "service_name"); + } + + #[test] + fn test_effective_sort_fields_uses_default() { + let config = TableConfig::new(ProductType::Metrics); + assert_eq!( + config.effective_sort_fields(), + ProductType::Metrics.default_sort_fields() + ); + } + + #[test] + fn test_effective_sort_fields_uses_override() { + let mut config = TableConfig::new(ProductType::Metrics); + config.sort_fields = Some("custom__s|timestamp/V2".to_string()); + assert_eq!(config.effective_sort_fields(), "custom__s|timestamp/V2"); + } + + #[test] + fn test_default_window_duration() { + let config = TableConfig::default(); + assert_eq!(config.window_duration_secs, 900); + } + + #[test] + fn test_table_config_serde_roundtrip() { + let config = TableConfig::new(ProductType::Traces); + let json = serde_json::to_string(&config).unwrap(); + let recovered: TableConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(recovered.product_type, ProductType::Traces); + assert!(recovered.sort_fields.is_none()); + assert_eq!(recovered.window_duration_secs, 900); + } + + #[test] + fn test_table_config_serde_with_override() { + let mut config = TableConfig::new(ProductType::Metrics); + config.sort_fields = Some("host__s|timestamp/V2".to_string()); + config.window_duration_secs = 3600; + + let json = serde_json::to_string(&config).unwrap(); + assert!(json.contains("host__s|timestamp/V2")); + + let recovered: TableConfig = serde_json::from_str(&json).unwrap(); + assert_eq!( + recovered.sort_fields.as_deref(), + Some("host__s|timestamp/V2") + ); + assert_eq!(recovered.window_duration_secs, 3600); + } +} diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 569d9b5315b..37e7d7c8cb1 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -223,6 +223,19 @@ fn main() -> Result<(), Box> { ], )?; + // Event Store sort schema proto (vendored from dd-source). + let sortschema_prost_config = prost_build::Config::default(); + tonic_prost_build::configure() + .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") + .out_dir("src/codegen/sortschema") + .compile_with_config( + sortschema_prost_config, + &[std::path::PathBuf::from( + "protos/event_store_sortschema/event_store_sortschema.proto", + )], + &[std::path::PathBuf::from("protos/event_store_sortschema")], + )?; + // OTEL proto let mut prost_config = prost_build::Config::default(); prost_config.protoc_arg("--experimental_allow_proto3_optional"); diff --git a/quickwit/quickwit-proto/protos/event_store_sortschema/event_store_sortschema.proto b/quickwit/quickwit-proto/protos/event_store_sortschema/event_store_sortschema.proto new file mode 100644 index 00000000000..a2e1d245b6d --- /dev/null +++ b/quickwit/quickwit-proto/protos/event_store_sortschema/event_store_sortschema.proto @@ -0,0 +1,231 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +// Vendored from dd-source: domains/event-platform/shared/libs/event-store-proto/protos/event_store_sortschema/ +// Keep identical to upstream -- do NOT strip unused fields. + +package sortschema; + +option java_package = "com.dd.event.store.sortschema.proto"; +option java_multiple_files = true; +option go_package = "github.com/DataDog/dd-source/domains/event-platform/shared/libs/event-store-proto/protos/event_store_sortschema"; + +// NOTE: Be thoughtful making changes to this file. Everything included here is +// included in the metadata of every fragment, and in each fragment header. + +enum SortColumnDirection { + // SortDirectionUnknown will pop up if someone forgets to initialize a field. + SortDirectionUnknown = 0; + + // SortDirectionAscending sorts columns in ascending value order. + SortDirectionAscending = 1; + + // SortDirectionDescending sorts columns in descending value order. + SortDirectionDescending = 2; +} + +// SortColumn represents a single column that's participating in the sort order +// for rows in the fragment files. +// +// NOTE: Carefully consider whether you need to modify the implementation of +// schemautils.go/EquivalentSchemas function whenever this struct is modified. +message SortColumn { + string name = 1; + // This integer corresponds 1:1 with the application-level column type which is also a number. + // We do that instead of breaking out a separate ENUM here to avoid having to maintain a bunch + // of mapping code that converts from column types to protobuf ENUMs and back again for no reason. + uint64 column_type = 2; + SortColumnDirection sort_direction = 3; + + // NOTE: If we decide later to form lexicographic keys from the schema, + // it may be useful to supply an integer here which would indicate + // the sort precedence and could be used for tagging for e.g. + // orderedcode lexicographic keys. +} + +// SortSchema represents a set of column names and precedence values which define +// the partial lexicographical sort order of rows in a fragment file, built from the +// individual column orderings. Precedence of the columns for sorting is given by +// the order of the columns in the SortSchema. "timestamp" must appear, with +// SortColumnType INT64, SortDirection DESCENDING, last. +// +// NOTE: Carefully consider whether you need to modify the implementation of +// schemautils.go/EquivalentSchemas function whenever this struct is modified. +message SortSchema { + // Very common schemas used all over the place are assigned a unique version number (reference + // in dd-go/logs/apps/logs-event-store/storage/fragment/schema_versions.go). + // + // This version number can be used in place of the complete schema description in the fragment + // metadata entries, to reduce their size. When this value is not null, the other fields in the + // SortSchema are not used and don't need to be set. + uint64 version = 4; + + repeated SortColumn column = 1; + + // Used in metrics, etc. + string name = 2; + + // SortVersion specifies what type of sorting of the data has been done. + // Fragments with different SortVersion numbers are *never* merged by the + // compactor. + enum SortVersion { + // The initial version of per-file locality sorting had a bug where it sorted + // the rows by the full values of string fields in the sort columns, but reported + // trimmed values in the RowKeys. This could yield what appeared to be + // "out of order" keys because keys that only differ in the portion that was + // trimmed could sort differently than trimmed values, and this would yield + // what looked like overlapping fragments after m:n merges. Fragments produced + // this way are not compatible with the LSM algorithm and must be excluded. + INCORRECT_TRIM = 0; + + // Files marked with this version use LessTrimmed both in their production + // in the writer, and in the merges produced by the compactor. Trimming has + // a "budget" that allows the leading string fields to take up "more" + // characters if they need to without overflowing into huge values in the + // metadata server. This budgeting is intended to cover when e.g. "service" + // is a leading tag with long service names. + TRIMMED_WITH_BUDGET = 1; + } + + SortVersion sort_version = 3; + + // Cutoff position for LSM compaction comparisons. Only columns 0 through + // lsm_comparison_cutoff-1 are considered for fragment locality decisions. + // Allows sorting by columns that improve compression without creating + // unnecessary extra compaction work. + // This cutoff is represented in the sort schema string format with an `&` + // at the beginning of the name of the first column that should be + // ignored, like "service__s|&env__s|timestamp/V1" which would only use + // "service__s" for locality comparisons. + // When unset, or explicitly set to 0, the schema utils will use the + // legacy logic of ignoring timestamp and tiebreaker, if they are present. + int32 lsm_comparison_cutoff = 5; +} + +// A ColumnValue is a string, int, uint or float which corresponds to a sort column. +message ColumnValue { + oneof value { + bytes type_string = 1; + int64 type_int = 2; + double type_float = 3; + } +} + +// ColumnValues represent a set of column values that correspond with a particular +// SortSchema. +message ColumnValues { + repeated ColumnValue column = 1; +} + +// RowKeys represent the "row keys" of the first and last rows of a fragment file. +// The values are the values of the sort columns at the first and last row, respectively. +message RowKeys { + // These are the values of the sort colums at the first row, thus defining the + // lower inclusive boundary of the fragment file, i.e. using the minimum values of any + // multi-values in the column values(1), which is what the sorting operates upon. + ColumnValues min_row_values = 1; + + // These are the values of the sort columns at the last row, thus defining the + // upper boundary of the sorted rows of the fragment file, i.e. using the minimum + // values of any multi-values in the column values(1), which is what the sorting + // operates upon. + // + // Note that the string fields in min_row_values and max_row_values are trimmed + // to avoid storing "too large" values in the metadata server, and the sort order + // of the rows in the fragment files are determined using these trimmed keys to + // match. + // + // During m:n merging of fragments, when boundaries of fragments are selected, + // they are selected at transitions of key values of the trimmed keys so that the + // [min_row_values, max_row_values] ranges of the output fragments do not overlap. + // This property is important for two reasons. First, we want to prune fragments + // from queries based on the values of the sort schema columns. By making the + // fragments non-overlapping, we enable this pruning. Secondly, we don't want + // compaction strategies like the classic LSM strategy to re-merge the same + // fragments over and over. + // + // An interesting consequence of the key trimming is that a fragment may have + // a row in it with values that come "after" the max_row_values keys outside + // of the "trimmed" range of the column values. For example, if the trim budget + // for a column is four characters, and the fragment ends at "abcd", "abcd" and + // "abcd123" would yield the same trimmed key value and would both be stored in + // the same fragment. However "abce" would be stored in a different fragment. + ColumnValues max_row_values = 2; + + // This set of values defines the all-inclusive range boundary of the file considering + // the max values of any multi-values in the column values(2). This boundary is the + // boundary w.r.t. queries. + ColumnValues all_inclusive_max_row_values = 3; + + + // For track table expirations (splitting M fragments into N1 expired fragments and N2 live fragments), + // there is a potential for output fragments to overlap in key range. This is because the sort schema of track + // table starts with retention at intake + scopeID, and retention can change between the moment the fragment + // is created and the moment it is expired. To give an example, say a fragment has scopeIDs 1, 2, 3, 4, 5, + // all with a retention at intake of 7 days. If the retention for 3 and 5 is changed to 15 days, + // then 7 days later at expiration the fragment will be split into two fragments, + // one "expired" with scopeIDs 1, 2 and 4 and one "live" with scopeIDs 3 and 5. + // The row keys will be (7:1..., 7:4...) for the first fragment, and (7:3...,7:5...) for the second, + // which overlap. To avoid this, we add an `expired` boolean to the row keys, so we allow a live and an expired + // fragment to overlap in key range. + bool expired = 4; + + // Footnotes: + // + // (1) For min_row_values and max_row_values with multi-values in the column + // values, the sort order is by the minimum value of a multi-value for an + // ascending-sort-order column and the maximum value for a descending-sort-order + // column. + // (2) For all_inclusive_max_row_values, the value of any multi-values encountered + // that is used is the maximum value for an ascending-sort-order column and + // the minimum value for a descending-sort-order column. + + + + // Example: + // + // Let's say we have the following columns. Values at each row enclosed with [brackets]. + // Column B is sorted in descending order. The others are sorted in ascending order. + // + // Column A Column B Column C + // ======== ======== ======== + // 1: [1, 2, 99] [B] [x, y] + // 2: [1, 4] [A, C] [y] + // 3: [2] [B, C] [z] + // 4: [2] [A, B, C] [x] + // 5: [2] [A] [y, z] + // 6: [3] [B] [x, y, z] + // + // min_row_key is {A=1, B=B, C=x} + // max_row_key is {A=3, B=B, C=x} + // all_inclusive_max_row_key is {A=99, B=B, C=y} + // + // The idea with these keys is that [min_row_key, max_row_key] defines the range that + // covers a single fragment file for the purposes of computing overlaps for deciding + // what to perform an m:n merge to "flatten" files that overlap in key range so that + // queries don't have to visit all the files. + // + // If there are no multi-valued sort columns, this is the end of the story. However + // if there are multi-valued sort columns, the range that a given file covers in + // terms of queries is given by [min_row_key, all_inclusive_max_row_key]. A lot of + // overlaps with these ranges indicate "atomic rows" for a table that cause queries + // to hit more files, degrading performance. + // + // Initially the compactor will only consider [min_row_key, max_row_key], with the + // all_inclusive_max_row_key present for diagnostics. In the future we may use + // this key to determine whether we want to do anything "special" with files that + // have atomic rows, such as duplicating the rows at each value of the multi-value. +} diff --git a/quickwit/quickwit-proto/src/codegen/sortschema/sortschema.rs b/quickwit/quickwit-proto/src/codegen/sortschema/sortschema.rs new file mode 100644 index 00000000000..2550c847794 --- /dev/null +++ b/quickwit/quickwit-proto/src/codegen/sortschema/sortschema.rs @@ -0,0 +1,230 @@ +// This file is @generated by prost-build. +/// SortColumn represents a single column that's participating in the sort order +/// for rows in the fragment files. +/// +/// NOTE: Carefully consider whether you need to modify the implementation of +/// schemautils.go/EquivalentSchemas function whenever this struct is modified. +#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SortColumn { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + /// This integer corresponds 1:1 with the application-level column type which is also a number. + /// We do that instead of breaking out a separate ENUM here to avoid having to maintain a bunch + /// of mapping code that converts from column types to protobuf ENUMs and back again for no reason. + #[prost(uint64, tag = "2")] + pub column_type: u64, + #[prost(enumeration = "SortColumnDirection", tag = "3")] + pub sort_direction: i32, +} +/// SortSchema represents a set of column names and precedence values which define +/// the partial lexicographical sort order of rows in a fragment file, built from the +/// individual column orderings. Precedence of the columns for sorting is given by +/// the order of the columns in the SortSchema. "timestamp" must appear, with +/// SortColumnType INT64, SortDirection DESCENDING, last. +/// +/// NOTE: Carefully consider whether you need to modify the implementation of +/// schemautils.go/EquivalentSchemas function whenever this struct is modified. +#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SortSchema { + /// Very common schemas used all over the place are assigned a unique version number (reference + /// in dd-go/logs/apps/logs-event-store/storage/fragment/schema_versions.go). + /// + /// This version number can be used in place of the complete schema description in the fragment + /// metadata entries, to reduce their size. When this value is not null, the other fields in the + /// SortSchema are not used and don't need to be set. + #[prost(uint64, tag = "4")] + pub version: u64, + #[prost(message, repeated, tag = "1")] + pub column: ::prost::alloc::vec::Vec, + /// Used in metrics, etc. + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + #[prost(enumeration = "sort_schema::SortVersion", tag = "3")] + pub sort_version: i32, + /// Cutoff position for LSM compaction comparisons. Only columns 0 through + /// lsm_comparison_cutoff-1 are considered for fragment locality decisions. + /// Allows sorting by columns that improve compression without creating + /// unnecessary extra compaction work. + /// This cutoff is represented in the sort schema string format with an `&` + /// at the beginning of the name of the first column that should be + /// ignored, like "service__s|&env__s|timestamp/V1" which would only use + /// "service__s" for locality comparisons. + /// When unset, or explicitly set to 0, the schema utils will use the + /// legacy logic of ignoring timestamp and tiebreaker, if they are present. + #[prost(int32, tag = "5")] + pub lsm_comparison_cutoff: i32, +} +/// Nested message and enum types in `SortSchema`. +pub mod sort_schema { + /// SortVersion specifies what type of sorting of the data has been done. + /// Fragments with different SortVersion numbers are *never* merged by the + /// compactor. + #[derive(serde::Serialize, serde::Deserialize)] + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum SortVersion { + /// The initial version of per-file locality sorting had a bug where it sorted + /// the rows by the full values of string fields in the sort columns, but reported + /// trimmed values in the RowKeys. This could yield what appeared to be + /// "out of order" keys because keys that only differ in the portion that was + /// trimmed could sort differently than trimmed values, and this would yield + /// what looked like overlapping fragments after m:n merges. Fragments produced + /// this way are not compatible with the LSM algorithm and must be excluded. + IncorrectTrim = 0, + /// Files marked with this version use LessTrimmed both in their production + /// in the writer, and in the merges produced by the compactor. Trimming has + /// a "budget" that allows the leading string fields to take up "more" + /// characters if they need to without overflowing into huge values in the + /// metadata server. This budgeting is intended to cover when e.g. "service" + /// is a leading tag with long service names. + TrimmedWithBudget = 1, + } + impl SortVersion { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::IncorrectTrim => "INCORRECT_TRIM", + Self::TrimmedWithBudget => "TRIMMED_WITH_BUDGET", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INCORRECT_TRIM" => Some(Self::IncorrectTrim), + "TRIMMED_WITH_BUDGET" => Some(Self::TrimmedWithBudget), + _ => None, + } + } + } +} +/// A ColumnValue is a string, int, uint or float which corresponds to a sort column. +#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ColumnValue { + #[prost(oneof = "column_value::Value", tags = "1, 2, 3")] + pub value: ::core::option::Option, +} +/// Nested message and enum types in `ColumnValue`. +pub mod column_value { + #[derive(serde::Serialize, serde::Deserialize)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(bytes, tag = "1")] + TypeString(::prost::alloc::vec::Vec), + #[prost(int64, tag = "2")] + TypeInt(i64), + #[prost(double, tag = "3")] + TypeFloat(f64), + } +} +/// ColumnValues represent a set of column values that correspond with a particular +/// SortSchema. +#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ColumnValues { + #[prost(message, repeated, tag = "1")] + pub column: ::prost::alloc::vec::Vec, +} +/// RowKeys represent the "row keys" of the first and last rows of a fragment file. +/// The values are the values of the sort columns at the first and last row, respectively. +#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RowKeys { + /// These are the values of the sort colums at the first row, thus defining the + /// lower inclusive boundary of the fragment file, i.e. using the minimum values of any + /// multi-values in the column values(1), which is what the sorting operates upon. + #[prost(message, optional, tag = "1")] + pub min_row_values: ::core::option::Option, + /// These are the values of the sort columns at the last row, thus defining the + /// upper boundary of the sorted rows of the fragment file, i.e. using the minimum + /// values of any multi-values in the column values(1), which is what the sorting + /// operates upon. + /// + /// Note that the string fields in min_row_values and max_row_values are trimmed + /// to avoid storing "too large" values in the metadata server, and the sort order + /// of the rows in the fragment files are determined using these trimmed keys to + /// match. + /// + /// During m:n merging of fragments, when boundaries of fragments are selected, + /// they are selected at transitions of key values of the trimmed keys so that the + /// \[min_row_values, max_row_values\] ranges of the output fragments do not overlap. + /// This property is important for two reasons. First, we want to prune fragments + /// from queries based on the values of the sort schema columns. By making the + /// fragments non-overlapping, we enable this pruning. Secondly, we don't want + /// compaction strategies like the classic LSM strategy to re-merge the same + /// fragments over and over. + /// + /// An interesting consequence of the key trimming is that a fragment may have + /// a row in it with values that come "after" the max_row_values keys outside + /// of the "trimmed" range of the column values. For example, if the trim budget + /// for a column is four characters, and the fragment ends at "abcd", "abcd" and + /// "abcd123" would yield the same trimmed key value and would both be stored in + /// the same fragment. However "abce" would be stored in a different fragment. + #[prost(message, optional, tag = "2")] + pub max_row_values: ::core::option::Option, + /// This set of values defines the all-inclusive range boundary of the file considering + /// the max values of any multi-values in the column values(2). This boundary is the + /// boundary w.r.t. queries. + #[prost(message, optional, tag = "3")] + pub all_inclusive_max_row_values: ::core::option::Option, + /// For track table expirations (splitting M fragments into N1 expired fragments and N2 live fragments), + /// there is a potential for output fragments to overlap in key range. This is because the sort schema of track + /// table starts with retention at intake + scopeID, and retention can change between the moment the fragment + /// is created and the moment it is expired. To give an example, say a fragment has scopeIDs 1, 2, 3, 4, 5, + /// all with a retention at intake of 7 days. If the retention for 3 and 5 is changed to 15 days, + /// then 7 days later at expiration the fragment will be split into two fragments, + /// one "expired" with scopeIDs 1, 2 and 4 and one "live" with scopeIDs 3 and 5. + /// The row keys will be (7:1..., 7:4...) for the first fragment, and (7:3...,7:5...) for the second, + /// which overlap. To avoid this, we add an `expired` boolean to the row keys, so we allow a live and an expired + /// fragment to overlap in key range. + #[prost(bool, tag = "4")] + pub expired: bool, +} +#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum SortColumnDirection { + /// SortDirectionUnknown will pop up if someone forgets to initialize a field. + SortDirectionUnknown = 0, + /// SortDirectionAscending sorts columns in ascending value order. + SortDirectionAscending = 1, + /// SortDirectionDescending sorts columns in descending value order. + SortDirectionDescending = 2, +} +impl SortColumnDirection { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::SortDirectionUnknown => "SortDirectionUnknown", + Self::SortDirectionAscending => "SortDirectionAscending", + Self::SortDirectionDescending => "SortDirectionDescending", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SortDirectionUnknown" => Some(Self::SortDirectionUnknown), + "SortDirectionAscending" => Some(Self::SortDirectionAscending), + "SortDirectionDescending" => Some(Self::SortDirectionDescending), + _ => None, + } + } +} diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index dbe850b55b7..3a7f7ea8992 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -15,6 +15,7 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![allow(clippy::disallowed_methods)] #![allow(clippy::doc_lazy_continuation)] +#![allow(deprecated)] // prost::DecodeError::new is deprecated but used in generated decode impls #![allow(rustdoc::invalid_html_tags)] use std::cmp::Ordering; @@ -37,10 +38,16 @@ pub mod indexing; pub mod ingest; pub mod metastore; pub mod search; +pub mod sort_fields_error; pub mod types; pub use error::{GrpcServiceError, ServiceError, ServiceErrorCode}; use search::ReportSplitsRequest; +pub use sort_fields_error::SortFieldsError; + +pub mod sortschema { + include!("codegen/sortschema/sortschema.rs"); +} pub mod jaeger { pub mod api_v2 { @@ -123,7 +130,8 @@ impl TryFrom for search::SearchRequest { pub struct MutMetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); impl Injector for MutMetadataMap<'_> { - /// Sets a key-value pair in the [`MetadataMap`]. No-op if the key or value is invalid. + /// Sets a key-value pair in the [`tonic::metadata::MetadataMap`]. No-op if the key or value + /// is invalid. fn set(&mut self, key: &str, value: String) { if let Ok(metadata_key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) && let Ok(metadata_value) = tonic::metadata::MetadataValue::try_from(&value) @@ -134,13 +142,13 @@ impl Injector for MutMetadataMap<'_> { } impl Extractor for MutMetadataMap<'_> { - /// Gets a value for a key from the MetadataMap. If the value can't be converted to &str, + /// Gets a value for a key from the `MetadataMap`. If the value can't be converted to &str, /// returns None. fn get(&self, key: &str) -> Option<&str> { self.0.get(key).and_then(|metadata| metadata.to_str().ok()) } - /// Collect all the keys from the MetadataMap. + /// Collect all the keys from the `MetadataMap`. fn keys(&self) -> Vec<&str> { self.0 .keys() @@ -174,13 +182,13 @@ impl Interceptor for SpanContextInterceptor { struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); impl Extractor for MetadataMap<'_> { - /// Gets a value for a key from the MetadataMap. If the value can't be converted to &str, + /// Gets a value for a key from the `MetadataMap`. If the value can't be converted to &str, /// returns None. fn get(&self, key: &str) -> Option<&str> { self.0.get(key).and_then(|metadata| metadata.to_str().ok()) } - /// Collect all the keys from the MetadataMap. + /// Collect all the keys from the `MetadataMap`. fn keys(&self) -> Vec<&str> { self.0 .keys() diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index ba371c13d4a..4f53b9abb5c 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -193,6 +193,15 @@ impl From for MetastoreError { } } +impl From for MetastoreError { + fn from(err: crate::SortFieldsError) -> Self { + MetastoreError::Internal { + message: "sort fields error".to_string(), + cause: err.to_string(), + } + } +} + impl ServiceError for MetastoreError { fn error_code(&self) -> ServiceErrorCode { match self { diff --git a/quickwit/quickwit-proto/src/sort_fields_error.rs b/quickwit/quickwit-proto/src/sort_fields_error.rs new file mode 100644 index 00000000000..8fa6ebc15b3 --- /dev/null +++ b/quickwit/quickwit-proto/src/sort_fields_error.rs @@ -0,0 +1,78 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Error types for sort fields parsing and validation. + +/// Errors arising from sort fields parsing, validation, and time-window arithmetic. +#[derive(Debug, thiserror::Error)] +pub enum SortFieldsError { + /// Window duration does not meet the divisibility constraint. + #[error("invalid window duration {duration_secs}s: {reason}")] + InvalidWindowDuration { + duration_secs: u32, + reason: &'static str, + }, + + /// Schema string is syntactically malformed. + #[error("{0}")] + MalformedSchema(String), + + /// Version suffix could not be parsed. + #[error("{0}")] + BadSortVersion(String), + + /// Sort version is below the minimum accepted (V2-only enforcement). + #[error("unsupported sort version {version}, minimum is {minimum}")] + UnsupportedVersion { version: i32, minimum: i32 }, + + /// Invalid placement or usage of the `&` LSM cutoff marker. + #[error("{0}")] + InvalidCutoffPlacement(String), + + /// Column specification has wrong number of parts. + #[error("{0}")] + InvalidColumnFormat(String), + + /// Unknown column type (from suffix or explicit name). + #[error("{0}")] + UnknownColumnType(String), + + /// Explicit column type does not match the type inferred from the suffix. + #[error( + "column type doesn't match type deduced from suffix for {column}, deduced={from_suffix}, \ + explicit={explicit}" + )] + TypeMismatch { + column: String, + from_suffix: String, + explicit: String, + }, + + /// Unrecognized sort direction string. + #[error("{0}")] + UnknownSortDirection(String), + + /// Sort direction specified in multiple places (e.g., both prefix and suffix, + /// or prefix/suffix combined with colon-separated direction). + #[error("sort direction specified multiple times for column '{0}'")] + DuplicateDirection(String), + + /// Schema is structurally invalid (missing timestamp, wrong order, etc.). + #[error("{0}")] + ValidationError(String), + + /// window_start timestamp cannot be represented as a DateTime. + #[error("window_start timestamp {timestamp_secs} is out of representable range")] + WindowStartOutOfRange { timestamp_secs: i64 }, +} diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 79f6ba81702..0a4518174ce 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -138,7 +138,7 @@ impl ClusterClient { /// Attempts to store a given key value pair within the cluster. /// - /// Tries to replicate the pair to [`TARGET_NUM_REPLICATION`] nodes, but this function may fail + /// Tries to replicate the pair to `TARGET_NUM_REPLICATION` nodes, but this function may fail /// silently (e.g if no client was available). Even in case of success, this storage is not /// persistent. For instance during a rolling upgrade, all replicas will be lost as there is no /// mechanism to maintain the replication count. diff --git a/quickwit/quickwit-storage/src/storage_resolver.rs b/quickwit/quickwit-storage/src/storage_resolver.rs index 6203d6a8d02..85329c19a86 100644 --- a/quickwit/quickwit-storage/src/storage_resolver.rs +++ b/quickwit/quickwit-storage/src/storage_resolver.rs @@ -43,7 +43,7 @@ impl fmt::Debug for StorageResolver { } impl StorageResolver { - /// Creates an empty [`StorageResolverBuilder`]. + /// Creates an empty `StorageResolverBuilder`. pub fn builder() -> StorageResolverBuilder { StorageResolverBuilder::default() }