From 4fed2a078426ef0274994e3a1603298343f94db4 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 12 Mar 2026 16:18:34 -0400 Subject: [PATCH 1/4] feat(31): add shared invariants module to quickwit-dst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract duplicated invariant logic into a shared `invariants/` module within `quickwit-dst`. This is the "single source of truth" layer in the verification pyramid — used by stateright models, production debug_assert checks, and (future) Datadog metrics emission. Key changes: - `invariants/registry.rs`: InvariantId enum (20 variants) with Display - `invariants/window.rs`: shared window_start_secs(), is_valid_window_duration() - `invariants/sort.rs`: generic compare_with_null_ordering() for SS-2 - `invariants/check.rs`: check_invariant! macro wrapping debug_assert - stateright gated behind `model-checking` feature (optional dep) - quickwit-parquet-engine uses shared functions and check_invariant! Co-Authored-By: Claude Opus 4.6 (1M context) --- quickwit/Cargo.toml | 3 + quickwit/quickwit-dst/Cargo.toml | 16 + quickwit/quickwit-dst/src/invariants/check.rs | 45 ++ quickwit/quickwit-dst/src/invariants/mod.rs | 33 + .../quickwit-dst/src/invariants/registry.rs | 180 +++++ quickwit/quickwit-dst/src/invariants/sort.rs | 109 +++ .../quickwit-dst/src/invariants/window.rs | 87 +++ quickwit/quickwit-dst/src/lib.rs | 47 ++ quickwit/quickwit-dst/src/models/mod.rs | 24 + .../src/models/parquet_data_model.rs | 416 +++++++++++ .../quickwit-dst/src/models/sort_schema.rs | 668 ++++++++++++++++++ .../src/models/time_windowed_compaction.rs | 635 +++++++++++++++++ .../quickwit-dst/tests/stateright_models.rs | 77 ++ quickwit/quickwit-parquet-engine/Cargo.toml | 1 + .../src/sort_fields/window.rs | 19 +- .../src/split/metadata.rs | 14 +- .../src/split/postgres.rs | 32 +- .../src/storage/writer.rs | 33 +- 18 files changed, 2402 insertions(+), 37 deletions(-) create mode 100644 quickwit/quickwit-dst/Cargo.toml create mode 100644 quickwit/quickwit-dst/src/invariants/check.rs create mode 100644 quickwit/quickwit-dst/src/invariants/mod.rs create mode 100644 quickwit/quickwit-dst/src/invariants/registry.rs create mode 100644 quickwit/quickwit-dst/src/invariants/sort.rs create mode 100644 quickwit/quickwit-dst/src/invariants/window.rs create mode 100644 quickwit/quickwit-dst/src/lib.rs create mode 100644 quickwit/quickwit-dst/src/models/mod.rs create mode 100644 quickwit/quickwit-dst/src/models/parquet_data_model.rs create mode 100644 quickwit/quickwit-dst/src/models/sort_schema.rs create mode 100644 quickwit/quickwit-dst/src/models/time_windowed_compaction.rs create mode 100644 quickwit/quickwit-dst/tests/stateright_models.rs diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index d252652b27f..cfd732a61f7 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -13,6 +13,7 @@ members = [ "quickwit-datetime", "quickwit-directories", "quickwit-doc-mapper", + "quickwit-dst", "quickwit-index-management", "quickwit-indexing", "quickwit-ingest", @@ -233,6 +234,7 @@ serde_yaml = "0.9" serial_test = { version = "3.2", features = ["file_locks"] } sha2 = "0.10" siphasher = "1.0" +stateright = "0.30" smallvec = "1" sqlx = { version = "0.8", features = [ "migrate", @@ -352,6 +354,7 @@ quickwit-control-plane = { path = "quickwit-control-plane" } quickwit-datetime = { path = "quickwit-datetime" } quickwit-directories = { path = "quickwit-directories" } quickwit-doc-mapper = { path = "quickwit-doc-mapper" } +quickwit-dst = { path = "quickwit-dst" } quickwit-index-management = { path = "quickwit-index-management" } quickwit-indexing = { path = "quickwit-indexing" } quickwit-ingest = { path = "quickwit-ingest" } diff --git a/quickwit/quickwit-dst/Cargo.toml b/quickwit/quickwit-dst/Cargo.toml new file mode 100644 index 00000000000..f9a8abae202 --- /dev/null +++ b/quickwit/quickwit-dst/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "quickwit-dst" +version.workspace = true +edition.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true +authors.workspace = true +license.workspace = true +description = "Deterministic simulation testing and stateright model checking for Quickhouse-Pomsky" + +[features] +model-checking = ["stateright"] + +[dependencies] +stateright = { workspace = true, optional = true } diff --git a/quickwit/quickwit-dst/src/invariants/check.rs b/quickwit/quickwit-dst/src/invariants/check.rs new file mode 100644 index 00000000000..06c59ee9a3c --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/check.rs @@ -0,0 +1,45 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Invariant checking macro. +//! +//! Wraps `debug_assert!` with the invariant ID, providing a single hook point +//! for future Datadog metrics emission (Layer 4 of the verification stack). + +/// Check an invariant condition. In debug builds, panics on violation. +/// In release builds, currently a no-op (future: emit Datadog metric). +/// +/// # Examples +/// +/// ``` +/// use quickwit_dst::check_invariant; +/// use quickwit_dst::invariants::InvariantId; +/// +/// let duration_secs = 900u32; +/// check_invariant!(InvariantId::TW2, 3600 % duration_secs == 0, ": duration={}", duration_secs); +/// ``` +#[macro_export] +macro_rules! check_invariant { + ($id:expr, $cond:expr) => { + debug_assert!($cond, "{} violated", $id); + }; + ($id:expr, $cond:expr, $fmt:literal $($arg:tt)*) => { + debug_assert!($cond, concat!("{} violated", $fmt), $id $($arg)*); + }; +} diff --git a/quickwit/quickwit-dst/src/invariants/mod.rs b/quickwit/quickwit-dst/src/invariants/mod.rs new file mode 100644 index 00000000000..b33ed7849c9 --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/mod.rs @@ -0,0 +1,33 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Shared invariant definitions — the single source of truth. +//! +//! This module contains pure-Rust functions and types that express the +//! invariants verified across all layers of the verification pyramid: +//! TLA+ specs, stateright models, DST tests, and production code. +//! +//! No external dependencies — only `std`. + +mod check; +pub mod registry; +pub mod sort; +pub mod window; + +pub use registry::InvariantId; diff --git a/quickwit/quickwit-dst/src/invariants/registry.rs b/quickwit/quickwit-dst/src/invariants/registry.rs new file mode 100644 index 00000000000..9704f0da891 --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/registry.rs @@ -0,0 +1,180 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Invariant ID catalog — single source of truth for all invariant definitions. +//! +//! Each variant corresponds to a named invariant verified across the TLA+ specs, +//! stateright models, and production code. See `docs/internals/specs/tla/` for +//! the formal definitions. + +use std::fmt; + +/// Unique identifier for each verified invariant. +/// +/// The naming convention is ``: +/// - SS: SortSchema.tla (ADR-002) +/// - TW: TimeWindowedCompaction.tla time-window invariants (ADR-003) +/// - CS: TimeWindowedCompaction.tla compaction-scope invariants (ADR-003) +/// - MC: TimeWindowedCompaction.tla merge-correctness invariants (ADR-003) +/// - DM: ParquetDataModel.tla (ADR-001) +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum InvariantId { + /// SS-1: all rows within a split are sorted according to the split's schema + SS1, + /// SS-2: null values sort correctly per direction (nulls last asc, first desc) + SS2, + /// SS-3: missing sort columns are treated as NULL + SS3, + /// SS-4: a split's sort schema never changes after write + SS4, + /// SS-5: three copies of sort schema are identical per split + SS5, + + /// TW-1: every split belongs to exactly one time window + TW1, + /// TW-2: window_duration evenly divides one hour (3600 seconds) + TW2, + /// TW-3: data is never merged across window boundaries + TW3, + + /// CS-1: only splits sharing all six scope components may be merged + CS1, + /// CS-2: within a scope, only same window_start splits merge + CS2, + /// CS-3: splits before compaction_start_time are never compacted + CS3, + + /// MC-1: row multiset preserved through compaction (no add/remove/duplicate) + MC1, + /// MC-2: row contents unchanged through compaction + MC2, + /// MC-3: output is sorted according to sort schema + MC3, + /// MC-4: column set is the union of input column sets + MC4, + + /// DM-1: each row has all required fields populated + DM1, + /// DM-2: no last-write-wins; duplicate ingests both survive + DM2, + /// DM-3: storage only contains ingested points (no interpolation) + DM3, + /// DM-4: same tags produce same timeseries_id (deterministic TSID) + DM4, + /// DM-5: timeseries_id persists through compaction without recomputation + DM5, +} + +impl InvariantId { + /// Human-readable description of this invariant. + pub fn description(self) -> &'static str { + match self { + Self::SS1 => "rows sorted by split schema", + Self::SS2 => "null ordering correct per direction", + Self::SS3 => "missing sort columns treated as NULL", + Self::SS4 => "sort schema immutable after write", + Self::SS5 => "three copies of sort schema identical", + + Self::TW1 => "one window per split", + Self::TW2 => "window_duration divides 3600", + Self::TW3 => "no cross-window merge", + + Self::CS1 => "scope compatibility for merge", + Self::CS2 => "same window_start for merge", + Self::CS3 => "compaction start time respected", + + Self::MC1 => "row set preserved through compaction", + Self::MC2 => "row contents unchanged through compaction", + Self::MC3 => "sort order preserved after compaction", + Self::MC4 => "column union after compaction", + + Self::DM1 => "point per row — all fields populated", + Self::DM2 => "no last-write-wins", + Self::DM3 => "no interpolation — only ingested points", + Self::DM4 => "deterministic TSID from tags", + Self::DM5 => "TSID persists through compaction", + } + } +} + +impl fmt::Display for InvariantId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Self::SS1 => "SS-1", + Self::SS2 => "SS-2", + Self::SS3 => "SS-3", + Self::SS4 => "SS-4", + Self::SS5 => "SS-5", + + Self::TW1 => "TW-1", + Self::TW2 => "TW-2", + Self::TW3 => "TW-3", + + Self::CS1 => "CS-1", + Self::CS2 => "CS-2", + Self::CS3 => "CS-3", + + Self::MC1 => "MC-1", + Self::MC2 => "MC-2", + Self::MC3 => "MC-3", + Self::MC4 => "MC-4", + + Self::DM1 => "DM-1", + Self::DM2 => "DM-2", + Self::DM3 => "DM-3", + Self::DM4 => "DM-4", + Self::DM5 => "DM-5", + }; + f.write_str(s) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn display_format() { + assert_eq!(InvariantId::SS1.to_string(), "SS-1"); + assert_eq!(InvariantId::TW2.to_string(), "TW-2"); + assert_eq!(InvariantId::CS3.to_string(), "CS-3"); + assert_eq!(InvariantId::MC4.to_string(), "MC-4"); + assert_eq!(InvariantId::DM5.to_string(), "DM-5"); + } + + #[test] + fn descriptions_non_empty() { + let all = [ + InvariantId::SS1, InvariantId::SS2, InvariantId::SS3, + InvariantId::SS4, InvariantId::SS5, + InvariantId::TW1, InvariantId::TW2, InvariantId::TW3, + InvariantId::CS1, InvariantId::CS2, InvariantId::CS3, + InvariantId::MC1, InvariantId::MC2, InvariantId::MC3, InvariantId::MC4, + InvariantId::DM1, InvariantId::DM2, InvariantId::DM3, + InvariantId::DM4, InvariantId::DM5, + ]; + for id in all { + assert!( + !id.description().is_empty(), + "{} has empty description", + id + ); + } + } +} diff --git a/quickwit/quickwit-dst/src/invariants/sort.rs b/quickwit/quickwit-dst/src/invariants/sort.rs new file mode 100644 index 00000000000..68a4ad05cb7 --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/sort.rs @@ -0,0 +1,109 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Shared null-aware comparison for SS-2 (null ordering invariant). +//! +//! This is the single source of truth for how nulls sort relative to non-null +//! values. Used by both the stateright sort_schema model and production code. + +use std::cmp::Ordering; + +/// Compare two optional values with null ordering per SS-2. +/// +/// - Ascending: nulls sort AFTER non-null (nulls last). +/// - Descending: nulls sort BEFORE non-null (nulls first). +/// +/// For two non-null values, the natural ordering is used (reversed for +/// descending). Two nulls compare as equal. +pub fn compare_with_null_ordering( + a: Option<&T>, + b: Option<&T>, + ascending: bool, +) -> Ordering { + match (a, b) { + (None, None) => Ordering::Equal, + (None, Some(_)) => { + if ascending { + Ordering::Greater // null after non-null + } else { + Ordering::Less // null before non-null + } + } + (Some(_), None) => { + if ascending { + Ordering::Less // non-null before null + } else { + Ordering::Greater // non-null after null + } + } + (Some(va), Some(vb)) => { + if ascending { + va.cmp(vb) + } else { + vb.cmp(va) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ascending_null_ordering() { + // null > non-null in ascending + assert_eq!( + compare_with_null_ordering(None::<&i32>, Some(&1), true), + Ordering::Greater + ); + assert_eq!( + compare_with_null_ordering(Some(&1), None::<&i32>, true), + Ordering::Less + ); + // null == null + assert_eq!( + compare_with_null_ordering(None::<&i32>, None::<&i32>, true), + Ordering::Equal + ); + // non-null comparison + assert_eq!( + compare_with_null_ordering(Some(&1), Some(&2), true), + Ordering::Less + ); + } + + #[test] + fn descending_null_ordering() { + // null < non-null in descending + assert_eq!( + compare_with_null_ordering(None::<&i32>, Some(&1), false), + Ordering::Less + ); + assert_eq!( + compare_with_null_ordering(Some(&1), None::<&i32>, false), + Ordering::Greater + ); + // non-null comparison reversed + assert_eq!( + compare_with_null_ordering(Some(&1), Some(&2), false), + Ordering::Greater + ); + } +} diff --git a/quickwit/quickwit-dst/src/invariants/window.rs b/quickwit/quickwit-dst/src/invariants/window.rs new file mode 100644 index 00000000000..31276c1da6b --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/window.rs @@ -0,0 +1,87 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Shared window math for time-windowed compaction. +//! +//! These pure functions are the single source of truth for window arithmetic, +//! used by both stateright models and production code. + +/// Compute window_start for a timestamp. +/// +/// Uses `rem_euclid` for correct handling of negative timestamps (before Unix +/// epoch). Standard `%` truncates toward zero: `-1 % 900 = -1` (wrong). +/// `rem_euclid` always returns non-negative: `(-1i64).rem_euclid(900) = 899`. +/// +/// Mirrors TLA+ `WindowStart(t) == t - (t % WindowDuration)`. +pub fn window_start_secs(timestamp_secs: i64, duration_secs: i64) -> i64 { + timestamp_secs - timestamp_secs.rem_euclid(duration_secs) +} + +/// TW-2: window_duration must evenly divide one hour (3600 seconds). +/// +/// Returns true if the duration is a positive divisor of 3600. This ensures +/// window boundaries align across hours and days regardless of starting point. +pub fn is_valid_window_duration(duration_secs: u32) -> bool { + duration_secs > 0 && 3600 % duration_secs == 0 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn window_start_basic() { + assert_eq!(window_start_secs(0, 2), 0); + assert_eq!(window_start_secs(1, 2), 0); + assert_eq!(window_start_secs(2, 2), 2); + assert_eq!(window_start_secs(3, 2), 2); + assert_eq!(window_start_secs(5, 3), 3); + } + + #[test] + fn window_start_negative_timestamps() { + assert_eq!(window_start_secs(-1, 900), -900); + assert_eq!(window_start_secs(-3601, 3600), -7200); + } + + #[test] + fn window_start_on_boundary() { + assert_eq!(window_start_secs(900, 900), 900); + assert_eq!(window_start_secs(899, 900), 0); + } + + #[test] + fn valid_window_durations() { + let valid = [1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, + 30, 36, 40, 45, 48, 50, 60, 72, 75, 80, 90, 100, 120, + 144, 150, 180, 200, 225, 240, 300, 360, 400, 450, 600, + 720, 900, 1200, 1800, 3600]; + for dur in valid { + assert!(is_valid_window_duration(dur), "expected {} to be valid", dur); + } + } + + #[test] + fn invalid_window_durations() { + assert!(!is_valid_window_duration(0)); + assert!(!is_valid_window_duration(7)); + assert!(!is_valid_window_duration(11)); + assert!(!is_valid_window_duration(7200)); + } +} diff --git a/quickwit/quickwit-dst/src/lib.rs b/quickwit/quickwit-dst/src/lib.rs new file mode 100644 index 00000000000..2fa71cbf47c --- /dev/null +++ b/quickwit/quickwit-dst/src/lib.rs @@ -0,0 +1,47 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Deterministic simulation testing and shared invariants for Quickhouse-Pomsky. +//! +//! # Invariants (always available) +//! +//! The [`invariants`] module contains pure-Rust definitions shared across the +//! entire verification pyramid: TLA+ specs, stateright models, DST tests, and +//! production `debug_assert!` checks. Zero external dependencies. +//! +//! # Models (feature = "model-checking") +//! +//! The [`models`] module contains exhaustive model-checking models that mirror +//! the TLA+ specs in `docs/internals/specs/tla/`. Each model verifies the same +//! invariants as the corresponding TLA+ spec, but runs as a Rust test via +//! [stateright](https://docs.rs/stateright). Requires the `model-checking` +//! feature. +//! +//! ## Models +//! +//! - `models::sort_schema` — SS-1..SS-5 (ADR-002, `SortSchema.tla`) +//! - `models::time_windowed_compaction` — TW-1..TW-3, CS-1..CS-3, MC-1..MC-4 +//! (ADR-003, `TimeWindowedCompaction.tla`) +//! - `models::parquet_data_model` — DM-1..DM-5 (ADR-001, +//! `ParquetDataModel.tla`) + +pub mod invariants; + +#[cfg(feature = "model-checking")] +pub mod models; diff --git a/quickwit/quickwit-dst/src/models/mod.rs b/quickwit/quickwit-dst/src/models/mod.rs new file mode 100644 index 00000000000..a46f5552d35 --- /dev/null +++ b/quickwit/quickwit-dst/src/models/mod.rs @@ -0,0 +1,24 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Stateright models mirroring the TLA+ specifications. + +pub mod parquet_data_model; +pub mod sort_schema; +pub mod time_windowed_compaction; diff --git a/quickwit/quickwit-dst/src/models/parquet_data_model.rs b/quickwit/quickwit-dst/src/models/parquet_data_model.rs new file mode 100644 index 00000000000..321dbc5dfc5 --- /dev/null +++ b/quickwit/quickwit-dst/src/models/parquet_data_model.rs @@ -0,0 +1,416 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Stateright model for Parquet Data Model invariants (ADR-001). +//! +//! Mirrors `docs/internals/specs/tla/ParquetDataModel.tla`. +//! +//! # Invariants +//! - DM-1: Each row is exactly one data point (all required fields populated) +//! - DM-2: No last-write-wins; duplicate (metric, tags, ts) from separate +//! ingests both survive +//! - DM-3: No interpolation; storage contains only ingested points +//! - DM-4: timeseries_id is deterministic for a given tag set +//! - DM-5: timeseries_id persists through compaction without recomputation + +use std::collections::BTreeSet; + +use stateright::*; + +/// Node identifier. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Node(pub u8); + +/// Metric name. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct MetricName(pub u8); + +/// Tag set (opaque identifier; deterministic hash is identity). +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct TagSet(pub u8); + +/// Deterministic hash of a tag set to a timeseries_id. +/// Mirrors TLA+ `TSIDHash(tags) == CHOOSE n \in 0..100 : TRUE`. +/// We use the tag set's inner value as the hash (deterministic + injective). +fn tsid_hash(tags: TagSet) -> u32 { + tags.0 as u32 +} + +/// A data point. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct DataPoint { + pub metric_name: MetricName, + pub tags: TagSet, + pub timestamp: i64, + pub value: i32, + pub request_id: u32, + pub timeseries_id: u32, +} + +/// A split in object storage. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct DataModelSplit { + pub split_id: u32, + pub rows: BTreeSet, +} + +/// Model state. Mirrors TLA+ `VARIABLES`. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct DataModelState { + /// Per-node pending batches. + pub pending: Vec<(Node, BTreeSet)>, + /// Published splits in object storage. + pub splits: BTreeSet, + /// All points ever ingested (ghost variable for DM-3). + pub all_ingested_points: BTreeSet, + pub next_split_id: u32, + pub next_request_id: u32, +} + +impl DataModelState { + fn pending_for(&self, node: Node) -> &BTreeSet { + for (n, set) in &self.pending { + if *n == node { + return set; + } + } + // Should not happen in well-formed model. + panic!("node not found in pending"); + } + + fn pending_for_mut(&mut self, node: Node) -> &mut BTreeSet { + for (n, set) in &mut self.pending { + if *n == node { + return set; + } + } + panic!("node not found in pending"); + } + + fn all_stored_rows(&self) -> BTreeSet { + self.splits.iter().flat_map(|s| s.rows.iter().cloned()).collect() + } + + fn all_pending_rows(&self) -> BTreeSet { + self.pending.iter().flat_map(|(_, set)| set.iter().cloned()).collect() + } +} + +/// Actions. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum DataModelAction { + IngestPoint { + node: Node, + metric_name: MetricName, + tags: TagSet, + timestamp: i64, + }, + FlushSplit { + node: Node, + }, + CompactSplits { + selected_ids: BTreeSet, + }, +} + +/// Model configuration. Mirrors TLA+ `CONSTANTS`. +#[derive(Clone, Debug)] +pub struct DataModelModel { + pub nodes: Vec, + pub metric_names: Vec, + pub tag_sets: Vec, + pub timestamps: Vec, + pub request_count_max: u32, +} + +impl DataModelModel { + /// Small model matching `ParquetDataModel_small.cfg`. + pub fn small() -> Self { + DataModelModel { + nodes: vec![Node(1)], + metric_names: vec![MetricName(1)], + tag_sets: vec![TagSet(1)], + timestamps: vec![1], + request_count_max: 3, + } + } +} + +/// Generate all subsets of size >= 2 from a set of split IDs. +fn subsets_ge2(ids: &[u32]) -> Vec> { + let n = ids.len(); + let mut result = Vec::new(); + for mask in 0..(1u32 << n) { + if mask.count_ones() >= 2 { + let mut subset = BTreeSet::new(); + for (i, &id) in ids.iter().enumerate() { + if mask & (1 << i) != 0 { + subset.insert(id); + } + } + result.push(subset); + } + } + result +} + +impl Model for DataModelModel { + type State = DataModelState; + type Action = DataModelAction; + + fn init_states(&self) -> Vec { + let pending: Vec<(Node, BTreeSet)> = + self.nodes.iter().map(|&n| (n, BTreeSet::new())).collect(); + vec![DataModelState { + pending, + splits: BTreeSet::new(), + all_ingested_points: BTreeSet::new(), + next_split_id: 1, + next_request_id: 1, + }] + } + + fn actions(&self, state: &Self::State, actions: &mut Vec) { + // IngestPoint + if state.next_request_id < self.request_count_max { + for &node in &self.nodes { + for &mn in &self.metric_names { + for &tags in &self.tag_sets { + for &ts in &self.timestamps { + actions.push(DataModelAction::IngestPoint { + node, + metric_name: mn, + tags, + timestamp: ts, + }); + } + } + } + } + } + + // FlushSplit + for &node in &self.nodes { + if !state.pending_for(node).is_empty() { + actions.push(DataModelAction::FlushSplit { node }); + } + } + + // CompactSplits + if state.splits.len() >= 2 { + let ids: Vec = state.splits.iter().map(|s| s.split_id).collect(); + for subset in subsets_ge2(&ids) { + actions.push(DataModelAction::CompactSplits { + selected_ids: subset, + }); + } + } + } + + fn next_state( + &self, + state: &Self::State, + action: Self::Action, + ) -> Option { + let mut next = state.clone(); + + match action { + DataModelAction::IngestPoint { + node, + metric_name, + tags, + timestamp, + } => { + let point = DataPoint { + metric_name, + tags, + timestamp, + value: 1, + request_id: next.next_request_id, + timeseries_id: tsid_hash(tags), + }; + next.pending_for_mut(node).insert(point.clone()); + next.all_ingested_points.insert(point); + next.next_request_id += 1; + } + DataModelAction::FlushSplit { node } => { + let rows = next.pending_for(node).clone(); + if rows.is_empty() { + return None; + } + let new_split = DataModelSplit { + split_id: next.next_split_id, + rows, + }; + next.splits.insert(new_split); + next.next_split_id += 1; + *next.pending_for_mut(node) = BTreeSet::new(); + } + DataModelAction::CompactSplits { selected_ids } => { + let selected: Vec = next + .splits + .iter() + .filter(|s| selected_ids.contains(&s.split_id)) + .cloned() + .collect(); + + if selected.len() < 2 { + return None; + } + + let merged_rows: BTreeSet = + selected.iter().flat_map(|s| s.rows.iter().cloned()).collect(); + let new_split = DataModelSplit { + split_id: next.next_split_id, + rows: merged_rows, + }; + + for s in &selected { + next.splits.remove(s); + } + next.splits.insert(new_split); + next.next_split_id += 1; + } + } + + Some(next) + } + + fn properties(&self) -> Vec> { + vec![ + // DM-1: Each row is exactly one data point. + // Every row has all required fields populated. + // Mirrors ParquetDataModel.tla lines 174-180 + Property::always( + "DM-1: point per row", + |model: &DataModelModel, state: &DataModelState| { + for s in &state.splits { + for row in &s.rows { + if !model.metric_names.contains(&row.metric_name) { + return false; + } + if !model.tag_sets.contains(&row.tags) { + return false; + } + if !model.timestamps.contains(&row.timestamp) { + return false; + } + if row.timeseries_id != tsid_hash(row.tags) { + return false; + } + } + } + true + }, + ), + // DM-2: No last-write-wins. + // If two points share (metric, tags, ts) with different request_id, + // and both have been flushed, both must be in storage. + // Mirrors ParquetDataModel.tla lines 198-208 + Property::always( + "DM-2: no LWW", + |_model: &DataModelModel, state: &DataModelState| { + let stored = state.all_stored_rows(); + let pending = state.all_pending_rows(); + for p1 in &state.all_ingested_points { + for p2 in &state.all_ingested_points { + if p1.metric_name == p2.metric_name + && p1.tags == p2.tags + && p1.timestamp == p2.timestamp + && p1.request_id != p2.request_id + && !pending.contains(p1) + && !pending.contains(p2) + && (!stored.contains(p1) || !stored.contains(p2)) + { + return false; + } + } + } + true + }, + ), + // DM-3: No interpolation. + // Storage only contains ingested points. + // Mirrors ParquetDataModel.tla lines 214-215 + Property::always( + "DM-3: no interpolation", + |_model: &DataModelModel, state: &DataModelState| { + let stored = state.all_stored_rows(); + stored.is_subset(&state.all_ingested_points) + }, + ), + // DM-4: Deterministic timeseries_id. + // Same tags => same timeseries_id. + // Mirrors ParquetDataModel.tla lines 221-224 + Property::always( + "DM-4: deterministic TSID", + |_model: &DataModelModel, state: &DataModelState| { + let stored = state.all_stored_rows(); + let pending = state.all_pending_rows(); + let all: BTreeSet<&DataPoint> = + stored.iter().chain(pending.iter()).collect(); + for r1 in &all { + for r2 in &all { + if r1.tags == r2.tags && r1.timeseries_id != r2.timeseries_id { + return false; + } + } + } + true + }, + ), + // DM-5: timeseries_id persists through compaction. + // Every stored row's timeseries_id equals TSIDHash(tags). + // Mirrors ParquetDataModel.tla lines 234-236 + Property::always( + "DM-5: TSID persistence", + |_model: &DataModelModel, state: &DataModelState| { + for row in state.all_stored_rows() { + if row.timeseries_id != tsid_hash(row.tags) { + return false; + } + } + true + }, + ), + ] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_data_model_small() { + let model = DataModelModel::small(); + model + .checker() + .spawn_bfs() + .join() + .assert_properties(); + } + + #[test] + fn tsid_hash_deterministic() { + let t1 = TagSet(42); + assert_eq!(tsid_hash(t1), tsid_hash(t1)); + assert_eq!(tsid_hash(TagSet(1)), tsid_hash(TagSet(1))); + } +} diff --git a/quickwit/quickwit-dst/src/models/sort_schema.rs b/quickwit/quickwit-dst/src/models/sort_schema.rs new file mode 100644 index 00000000000..89746360993 --- /dev/null +++ b/quickwit/quickwit-dst/src/models/sort_schema.rs @@ -0,0 +1,668 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Stateright model for Sort Schema invariants (ADR-002). +//! +//! Mirrors `docs/internals/specs/tla/SortSchema.tla`. +//! +//! # Invariants +//! - SS-1: All rows within a split are sorted according to the split's schema +//! - SS-2: Null values sort correctly per direction (nulls last asc, first desc) +//! - SS-3: Missing sort columns are treated as NULL +//! - SS-4: A split's sort schema never changes after write +//! - SS-5: Three copies of sort schema are identical per split + +use std::collections::BTreeMap; + +use stateright::*; + +/// Column identifier (small domain for model checking). +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum Column { + C1, +} + +impl Column { + pub const ALL: &[Column] = &[Column::C1]; +} + +/// Sort direction. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum Direction { + Asc, + Desc, +} + +/// A single sort column specification. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct SortColumn { + pub column: Column, + pub direction: Direction, +} + +/// A cell value, modeling TLA+ `ValuesWithNull`. +/// NULL is represented as `None`. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum Value { + Null, + Val(i8), +} + +impl Value { + /// Non-null values domain (matches TLA+ `Values == {1, 2, 3}`). + pub const NON_NULL: &[Value] = &[Value::Val(1), Value::Val(2), Value::Val(3)]; + + pub const ALL: &[Value] = &[Value::Null, Value::Val(1), Value::Val(2), Value::Val(3)]; + + fn is_null(self) -> bool { + matches!(self, Value::Null) + } +} + +/// A row: maps present columns to values. +/// Columns absent from the map are treated as NULL (SS-3). +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Row { + pub cells: BTreeMap, +} + +impl Row { + fn get_value(&self, col: Column) -> Value { + self.cells.get(&col).copied().unwrap_or(Value::Null) + } +} + +/// A split in object storage. +/// Mirrors the TLA+ split record with all three schema copies. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Split { + pub id: u32, + pub rows: Vec, + pub sort_schema: Vec, + pub metadata_sort_schema: Vec, + pub kv_sort_schema: Vec, + pub sorting_columns_schema: Vec, + pub columns_present: Vec, +} + +/// Sort schema model state. Mirrors TLA+ `VARIABLES`. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct SortSchemaState { + pub metastore_schema: Vec, + pub splits: Vec, + pub next_split_id: u32, + pub schema_change_count: u32, + pub split_schema_history: BTreeMap>, +} + +/// Actions. Mirrors TLA+ `Next`. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum SortSchemaAction { + /// Ingest a batch with specific columns present and specific row data. + IngestBatch { + columns_present: Vec, + rows: Vec, + }, + /// Change the metastore schema to a new value. + ChangeSchema { + new_schema: Vec, + }, + /// Compact two splits (identified by index in the splits vec). + CompactSplits { + s1_idx: usize, + s2_idx: usize, + merged_rows: Vec, + }, +} + +/// Model configuration. Mirrors TLA+ `CONSTANTS`. +#[derive(Clone, Debug)] +pub struct SortSchemaModel { + pub rows_per_split_max: usize, + pub splits_max: usize, + pub schema_changes_max: u32, +} + +impl SortSchemaModel { + /// Small model matching `SortSchema_small.cfg`. + pub fn small() -> Self { + SortSchemaModel { + rows_per_split_max: 2, + splits_max: 2, + schema_changes_max: 1, + } + } +} + +/// Compare two values with null ordering (SS-2). +/// Returns `Ordering`: Less, Equal, Greater. +/// Ascending: nulls sort AFTER non-null. +/// Descending: nulls sort BEFORE non-null. +/// +/// Delegates to the shared [`crate::invariants::sort::compare_with_null_ordering`] +/// for the null ordering logic, converting model-specific `Value` to `Option`. +fn compare_values(v1: Value, v2: Value, direction: Direction) -> std::cmp::Ordering { + let a = match v1 { + Value::Null => None, + Value::Val(v) => Some(v), + }; + let b = match v2 { + Value::Null => None, + Value::Val(v) => Some(v), + }; + let ascending = matches!(direction, Direction::Asc); + crate::invariants::sort::compare_with_null_ordering(a.as_ref(), b.as_ref(), ascending) +} + +/// Check if row1 <= row2 according to the schema (lexicographic). +/// Mirrors TLA+ `RowLEQ`. +fn row_leq(row1: &Row, row2: &Row, schema: &[SortColumn]) -> bool { + for sc in schema { + let v1 = row1.get_value(sc.column); + let v2 = row2.get_value(sc.column); + let cmp = compare_values(v1, v2, sc.direction); + match cmp { + std::cmp::Ordering::Less => return true, + std::cmp::Ordering::Greater => return false, + std::cmp::Ordering::Equal => continue, + } + } + true // all columns equal +} + +/// Check if rows are sorted. Mirrors TLA+ `IsSorted`. +fn is_sorted(rows: &[Row], schema: &[SortColumn]) -> bool { + rows.windows(2).all(|w| row_leq(&w[0], &w[1], schema)) +} + +/// Generate all possible sort schemas (length 0, 1, or 2). +/// Mirrors TLA+ `AllSortSchemas`. +fn all_sort_schemas() -> Vec> { + let directions = [Direction::Asc, Direction::Desc]; + let mut schemas = vec![vec![]]; // empty schema + + // Length 1 + for &col in Column::ALL { + for &dir in &directions { + schemas.push(vec![SortColumn { + column: col, + direction: dir, + }]); + } + } + + // Length 2 + for &c1 in Column::ALL { + for &d1 in &directions { + for &c2 in Column::ALL { + for &d2 in &directions { + schemas.push(vec![ + SortColumn { + column: c1, + direction: d1, + }, + SortColumn { + column: c2, + direction: d2, + }, + ]); + } + } + } + } + + schemas +} + +/// Generate all possible rows for a given column set with n rows. +fn all_row_sequences(columns_present: &[Column], n: usize) -> Vec> { + if n == 0 { + return vec![vec![]]; + } + + // Each row maps each present column to a value (including null). + let single_rows = all_single_rows(columns_present); + + // Generate all sequences of length n. + let mut result = vec![vec![]]; + for _ in 0..n { + let mut next = Vec::new(); + for prefix in &result { + for row in &single_rows { + let mut extended = prefix.clone(); + extended.push(row.clone()); + next.push(extended); + } + } + result = next; + } + result +} + +fn all_single_rows(columns_present: &[Column]) -> Vec { + if columns_present.is_empty() { + return vec![Row { + cells: BTreeMap::new(), + }]; + } + + // Generate all combinations of values for each column. + let mut rows = vec![BTreeMap::new()]; + for &col in columns_present { + let mut next = Vec::new(); + for partial in &rows { + for &val in Value::ALL { + let mut full = partial.clone(); + full.insert(col, val); + next.push(full); + } + } + rows = next; + } + rows.into_iter().map(|cells| Row { cells }).collect() +} + +/// Generate all subsets of columns. +fn all_column_subsets() -> Vec> { + let mut subsets = Vec::new(); + let cols = Column::ALL; + // 2^n subsets + for mask in 0..(1u32 << cols.len()) { + let mut subset = Vec::new(); + for (i, &col) in cols.iter().enumerate() { + if mask & (1 << i) != 0 { + subset.push(col); + } + } + subsets.push(subset); + } + subsets +} + +/// Check if merged_rows is a valid permutation of the union of s1 and s2 rows, +/// accounting for column extension (missing columns become NULL). +fn is_valid_merge( + merged_rows: &[Row], + s1: &Split, + s2: &Split, + merged_columns: &[Column], +) -> bool { + let total_rows = s1.rows.len() + s2.rows.len(); + if merged_rows.len() != total_rows { + return false; + } + + // Each merged row must come from either s1 or s2 (extended with NULLs). + // Build extended versions of input rows. + let extend_row = |row: &Row, merged_cols: &[Column]| -> Row { + let mut cells = BTreeMap::new(); + for &col in merged_cols { + cells.insert(col, row.get_value(col)); + } + Row { cells } + }; + + let s1_extended: Vec = s1.rows.iter().map(|r| extend_row(r, merged_columns)).collect(); + let s2_extended: Vec = s2.rows.iter().map(|r| extend_row(r, merged_columns)).collect(); + + // Check that merged_rows is a permutation of s1_extended ++ s2_extended. + let mut all_input: Vec = s1_extended; + all_input.extend(s2_extended); + all_input.sort(); + + let mut sorted_merged = merged_rows.to_vec(); + sorted_merged.sort(); + + sorted_merged == all_input +} + +impl Model for SortSchemaModel { + type State = SortSchemaState; + type Action = SortSchemaAction; + + fn init_states(&self) -> Vec { + // TLA+ Init: metastore_schema \in AllSortSchemas, splits = {}, etc. + all_sort_schemas() + .into_iter() + .map(|schema| SortSchemaState { + metastore_schema: schema, + splits: Vec::new(), + next_split_id: 1, + schema_change_count: 0, + split_schema_history: BTreeMap::new(), + }) + .collect() + } + + fn actions(&self, state: &Self::State, actions: &mut Vec) { + // IngestBatch: if splits < SplitsMax + if state.splits.len() < self.splits_max { + let current_schema = &state.metastore_schema; + + for columns_present in all_column_subsets() { + for n in 1..=self.rows_per_split_max { + for rows in all_row_sequences(&columns_present, n) { + // Only add if rows are sorted by current schema. + if is_sorted(&rows, current_schema) { + actions.push(SortSchemaAction::IngestBatch { + columns_present: columns_present.clone(), + rows, + }); + } + } + } + } + } + + // ChangeSchema: if schema_change_count < SchemaChangesMax + if state.schema_change_count < self.schema_changes_max { + for new_schema in all_sort_schemas() { + if new_schema != state.metastore_schema { + actions.push(SortSchemaAction::ChangeSchema { new_schema }); + } + } + } + + // CompactSplits: merge two splits with same sort_schema + for (i, s1) in state.splits.iter().enumerate() { + for (j, s2) in state.splits.iter().enumerate() { + if i >= j { + continue; + } + if s1.sort_schema != s2.sort_schema { + continue; + } + + let total_rows = s1.rows.len() + s2.rows.len(); + if total_rows > self.rows_per_split_max { + continue; + } + + let merged_schema = &s1.sort_schema; + let mut merged_columns: Vec = s1.columns_present.clone(); + for &col in &s2.columns_present { + if !merged_columns.contains(&col) { + merged_columns.push(col); + } + } + merged_columns.sort(); + + // Generate all valid merged row sequences. + for merged_rows in all_row_sequences(&merged_columns, total_rows) { + if is_sorted(&merged_rows, merged_schema) + && is_valid_merge(&merged_rows, s1, s2, &merged_columns) + { + actions.push(SortSchemaAction::CompactSplits { + s1_idx: i, + s2_idx: j, + merged_rows, + }); + } + } + } + } + } + + fn next_state( + &self, + state: &Self::State, + action: Self::Action, + ) -> Option { + let mut next = state.clone(); + + match action { + SortSchemaAction::IngestBatch { + columns_present, + rows, + } => { + let new_id = next.next_split_id; + let current_schema = next.metastore_schema.clone(); + let split = Split { + id: new_id, + rows, + sort_schema: current_schema.clone(), + metadata_sort_schema: current_schema.clone(), + kv_sort_schema: current_schema.clone(), + sorting_columns_schema: current_schema.clone(), + columns_present, + }; + next.splits.push(split); + next.splits.sort(); + next.next_split_id += 1; + next.split_schema_history.insert(new_id, current_schema); + } + SortSchemaAction::ChangeSchema { new_schema } => { + next.metastore_schema = new_schema; + next.schema_change_count += 1; + } + SortSchemaAction::CompactSplits { + s1_idx, + s2_idx, + merged_rows, + } => { + let s1 = &state.splits[s1_idx]; + let s2 = &state.splits[s2_idx]; + let merged_schema = s1.sort_schema.clone(); + let mut merged_columns: Vec = s1.columns_present.clone(); + for &col in &s2.columns_present { + if !merged_columns.contains(&col) { + merged_columns.push(col); + } + } + merged_columns.sort(); + + let new_id = next.next_split_id; + let new_split = Split { + id: new_id, + rows: merged_rows, + sort_schema: merged_schema.clone(), + metadata_sort_schema: merged_schema.clone(), + kv_sort_schema: merged_schema.clone(), + sorting_columns_schema: merged_schema.clone(), + columns_present: merged_columns, + }; + + // Remove old splits (higher index first to preserve indices). + let (lo, hi) = if s1_idx < s2_idx { + (s1_idx, s2_idx) + } else { + (s2_idx, s1_idx) + }; + next.splits.remove(hi); + next.splits.remove(lo); + next.splits.push(new_split); + next.splits.sort(); + next.next_split_id += 1; + next.split_schema_history.insert(new_id, merged_schema); + } + } + + Some(next) + } + + fn properties(&self) -> Vec> { + vec![ + // SS-1: All rows within a split are sorted according to its schema. + // Mirrors SortSchema.tla line 217-219 + Property::always( + "SS-1: rows sorted", + |_model: &SortSchemaModel, state: &SortSchemaState| { + state + .splits + .iter() + .all(|s| is_sorted(&s.rows, &s.sort_schema)) + }, + ), + // SS-2: Null values ordered correctly per direction. + // Ascending: null must NOT appear before non-null. + // Descending: non-null must NOT appear before null. + // Mirrors SortSchema.tla lines 228-247 + Property::always( + "SS-2: null ordering", + |_model: &SortSchemaModel, state: &SortSchemaState| { + for s in &state.splits { + for w in s.rows.windows(2) { + let (row_curr, row_next) = (&w[0], &w[1]); + for (k, sc) in s.sort_schema.iter().enumerate() { + let v_curr = row_curr.get_value(sc.column); + let v_next = row_next.get_value(sc.column); + + // Check only when earlier columns are equal. + let earlier_equal = + s.sort_schema[..k].iter().all(|prev_sc| { + row_curr.get_value(prev_sc.column) + == row_next.get_value(prev_sc.column) + }); + + if earlier_equal { + // Ascending: null must not appear before non-null. + if sc.direction == Direction::Asc + && v_curr.is_null() + && !v_next.is_null() + { + return false; + } + // Descending: non-null must not appear before null. + if sc.direction == Direction::Desc + && !v_curr.is_null() + && v_next.is_null() + { + return false; + } + } + } + } + } + true + }, + ), + // SS-3: Missing sort columns treated as NULL. + // Mirrors SortSchema.tla lines 253-259 + Property::always( + "SS-3: missing columns null", + |_model: &SortSchemaModel, state: &SortSchemaState| { + for s in &state.splits { + for sc in &s.sort_schema { + if !s.columns_present.contains(&sc.column) { + for row in &s.rows { + if row.get_value(sc.column) != Value::Null { + return false; + } + } + } + } + } + true + }, + ), + // SS-4: Schema immutable after write. + // Mirrors SortSchema.tla lines 263-266 + Property::always( + "SS-4: schema immutable", + |_model: &SortSchemaModel, state: &SortSchemaState| { + for s in &state.splits { + if let Some(historical) = state.split_schema_history.get(&s.id) + && *historical != s.sort_schema + { + return false; + } + } + true + }, + ), + // SS-5: Three copies of sort schema are identical. + // Mirrors SortSchema.tla lines 270-274 + Property::always( + "SS-5: three-copy consistency", + |_model: &SortSchemaModel, state: &SortSchemaState| { + state.splits.iter().all(|s| { + s.sort_schema == s.metadata_sort_schema + && s.sort_schema == s.kv_sort_schema + && s.sort_schema == s.sorting_columns_schema + }) + }, + ), + ] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_sort_schema_small() { + let model = SortSchemaModel::small(); + model + .checker() + .spawn_bfs() + .join() + .assert_properties(); + } + + #[test] + fn compare_values_null_ordering() { + // Ascending: null > non-null + assert_eq!( + compare_values(Value::Null, Value::Val(1), Direction::Asc), + std::cmp::Ordering::Greater + ); + assert_eq!( + compare_values(Value::Val(1), Value::Null, Direction::Asc), + std::cmp::Ordering::Less + ); + + // Descending: null < non-null + assert_eq!( + compare_values(Value::Null, Value::Val(1), Direction::Desc), + std::cmp::Ordering::Less + ); + assert_eq!( + compare_values(Value::Val(1), Value::Null, Direction::Desc), + std::cmp::Ordering::Greater + ); + } + + #[test] + fn is_sorted_basic() { + let schema = vec![SortColumn { + column: Column::C1, + direction: Direction::Asc, + }]; + let rows = vec![ + Row { + cells: [(Column::C1, Value::Val(1))].into(), + }, + Row { + cells: [(Column::C1, Value::Val(2))].into(), + }, + ]; + assert!(is_sorted(&rows, &schema)); + + let rows_unsorted = vec![ + Row { + cells: [(Column::C1, Value::Val(2))].into(), + }, + Row { + cells: [(Column::C1, Value::Val(1))].into(), + }, + ]; + assert!(!is_sorted(&rows_unsorted, &schema)); + } +} diff --git a/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs b/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs new file mode 100644 index 00000000000..0723f93e453 --- /dev/null +++ b/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs @@ -0,0 +1,635 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Stateright model for Time-Windowed Compaction invariants (ADR-003). +//! +//! Mirrors `docs/internals/specs/tla/TimeWindowedCompaction.tla`. +//! +//! # Invariants +//! - TW-1: Every split belongs to exactly one time window +//! - TW-2: window_duration evenly divides one hour +//! - TW-3: Data is never merged across window boundaries +//! - CS-1: Only splits sharing all six scope components may be merged +//! - CS-2: Within a scope, only same window_start splits merge +//! - CS-3: Splits before compaction_start_time are never compacted +//! - MC-1: Row multiset preserved through compaction +//! - MC-2: Row contents unchanged through compaction +//! - MC-3: Output is sorted according to sort schema +//! - MC-4: Column set is the union of input column sets + +use std::collections::{BTreeMap, BTreeSet}; + +use stateright::*; + +/// Scope identifier (abstract; in TLA+ this is a 6-tuple). +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Scope(pub u8); + +/// A column name. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum ColumnName { + M, + V, +} + +impl ColumnName { + pub const ALL: &[ColumnName] = &[ColumnName::M, ColumnName::V]; +} + +/// A row in a split. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct CompactionRow { + pub point_id: u32, + pub timestamp: i64, + pub sort_key: i64, + pub columns: BTreeSet, + /// Unique value per (point, column) for MC-2 tracking. + pub values: BTreeMap, +} + +/// A split in object storage. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct CompactionSplit { + pub id: u32, + pub scope: Scope, + pub window_start: i64, + pub rows: Vec, + pub columns: BTreeSet, + pub sorted: bool, +} + +/// Compaction log entry for invariant checking. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct CompactionLogEntry { + pub input_split_ids: BTreeSet, + pub output_split_id: u32, + pub input_point_ids: BTreeSet, + pub output_point_ids: BTreeSet, + pub input_scopes: BTreeMap, + pub input_window_starts: BTreeMap, + pub output_columns: BTreeSet, + pub input_column_union: BTreeSet, +} + +/// Ingest buffer key: (scope, window_start). +type BufferKey = (Scope, i64); + +/// Model state. Mirrors TLA+ `VARIABLES`. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct CompactionState { + pub current_time: i64, + pub object_storage: BTreeSet, + pub ingest_buffer: BTreeMap>, + pub next_split_id: u32, + pub next_point_id: u32, + pub points_ingested: u32, + pub compactions_performed: u32, + pub row_history: BTreeMap, + pub compaction_log: BTreeSet, +} + +/// Actions. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum CompactionAction { + AdvanceTime { new_time: i64 }, + IngestPoint { + timestamp: i64, + sort_key: i64, + scope: Scope, + columns: BTreeSet, + }, + FlushSplit { key: BufferKey }, + CompactWindow { + scope: Scope, + window_start: i64, + split_ids: BTreeSet, + }, +} + +/// Model configuration. Mirrors TLA+ `CONSTANTS`. +#[derive(Clone, Debug)] +pub struct CompactionModel { + pub timestamps: Vec, + pub scopes: Vec, + pub window_duration: i64, + pub hour_seconds: i64, + pub compaction_start_time: i64, + pub late_data_acceptance_window: i64, + pub max_time: i64, + pub max_points: u32, + pub max_compactions: u32, + pub sort_keys: Vec, +} + +impl CompactionModel { + /// Small model matching `TimeWindowedCompaction_small.cfg`. + pub fn small() -> Self { + CompactionModel { + timestamps: vec![0, 1], + scopes: vec![Scope(1)], + window_duration: 2, + hour_seconds: 4, + compaction_start_time: 0, + late_data_acceptance_window: 2, + max_time: 1, + max_points: 2, + max_compactions: 1, + sort_keys: vec![1, 2], + } + } +} + +/// Compute window_start for a timestamp. +/// Mirrors TLA+ `WindowStart(t) == t - (t % WindowDuration)`. +/// +/// Delegates to the shared [`crate::invariants::window::window_start_secs`]. +fn window_start(t: i64, window_duration: i64) -> i64 { + crate::invariants::window::window_start_secs(t, window_duration) +} + +/// Check if a sequence of rows is sorted by sort_key ascending. +fn is_sorted_by_key(rows: &[CompactionRow]) -> bool { + rows.windows(2).all(|w| w[0].sort_key <= w[1].sort_key) +} + +/// Merge-sort two sorted sequences by sort_key. +fn merge_sorted(s1: &[CompactionRow], s2: &[CompactionRow]) -> Vec { + let mut result = Vec::with_capacity(s1.len() + s2.len()); + let (mut i, mut j) = (0, 0); + while i < s1.len() && j < s2.len() { + if s1[i].sort_key <= s2[j].sort_key { + result.push(s1[i].clone()); + i += 1; + } else { + result.push(s2[j].clone()); + j += 1; + } + } + result.extend_from_slice(&s1[i..]); + result.extend_from_slice(&s2[j..]); + result +} + +/// Insertion sort by sort_key (for small sequences at flush time). +fn insertion_sort(rows: &[CompactionRow]) -> Vec { + let mut sorted = Vec::with_capacity(rows.len()); + for row in rows { + let pos = sorted + .iter() + .position(|r: &CompactionRow| r.sort_key > row.sort_key) + .unwrap_or(sorted.len()); + sorted.insert(pos, row.clone()); + } + sorted +} + +/// Generate all non-empty subsets of a set of column names. +fn all_nonempty_column_subsets() -> Vec> { + let cols = ColumnName::ALL; + let mut subsets = Vec::new(); + for mask in 1..(1u32 << cols.len()) { + let mut subset = BTreeSet::new(); + for (i, &col) in cols.iter().enumerate() { + if mask & (1 << i) != 0 { + subset.insert(col); + } + } + subsets.push(subset); + } + subsets +} + +/// Generate all subsets of size >= 2 from a set of split IDs. +fn subsets_of_size_ge2(ids: &BTreeSet) -> Vec> { + let id_vec: Vec = ids.iter().copied().collect(); + let n = id_vec.len(); + let mut result = Vec::new(); + for mask in 0..(1u32 << n) { + if mask.count_ones() >= 2 { + let mut subset = BTreeSet::new(); + for (i, &id) in id_vec.iter().enumerate() { + if mask & (1 << i) != 0 { + subset.insert(id); + } + } + result.push(subset); + } + } + result +} + +impl Model for CompactionModel { + type State = CompactionState; + type Action = CompactionAction; + + fn init_states(&self) -> Vec { + vec![CompactionState { + current_time: 0, + object_storage: BTreeSet::new(), + ingest_buffer: BTreeMap::new(), + next_split_id: 1, + next_point_id: 1, + points_ingested: 0, + compactions_performed: 0, + row_history: BTreeMap::new(), + compaction_log: BTreeSet::new(), + }] + } + + fn actions(&self, state: &Self::State, actions: &mut Vec) { + // AdvanceTime + if state.current_time < self.max_time { + for &t in &self.timestamps { + if t > state.current_time && t <= self.max_time { + actions.push(CompactionAction::AdvanceTime { new_time: t }); + } + } + } + + // IngestPoint + if state.points_ingested < self.max_points { + for &ts in &self.timestamps { + if ts > state.current_time { + continue; + } + if ts < state.current_time - self.late_data_acceptance_window { + continue; + } + for &sk in &self.sort_keys { + for &scope in &self.scopes { + for cols in all_nonempty_column_subsets() { + actions.push(CompactionAction::IngestPoint { + timestamp: ts, + sort_key: sk, + scope, + columns: cols, + }); + } + } + } + } + } + + // FlushSplit + for (key, buf) in &state.ingest_buffer { + if !buf.is_empty() { + actions.push(CompactionAction::FlushSplit { key: *key }); + } + } + + // CompactWindow + if state.compactions_performed < self.max_compactions { + for &scope in &self.scopes { + // Collect valid window starts from current splits. + let valid_ws: BTreeSet = state + .object_storage + .iter() + .filter(|s| s.scope == scope && s.window_start >= self.compaction_start_time) + .map(|s| s.window_start) + .collect(); + + for &ws in &valid_ws { + let candidate_ids: BTreeSet = state + .object_storage + .iter() + .filter(|s| s.scope == scope && s.window_start == ws) + .map(|s| s.id) + .collect(); + + if candidate_ids.len() < 2 { + continue; + } + + for subset in subsets_of_size_ge2(&candidate_ids) { + actions.push(CompactionAction::CompactWindow { + scope, + window_start: ws, + split_ids: subset, + }); + } + } + } + } + } + + fn next_state( + &self, + state: &Self::State, + action: Self::Action, + ) -> Option { + let mut next = state.clone(); + + match action { + CompactionAction::AdvanceTime { new_time } => { + next.current_time = new_time; + } + CompactionAction::IngestPoint { + timestamp, + sort_key, + scope, + columns, + } => { + let pid = next.next_point_id; + let values: BTreeMap = + columns.iter().map(|&c| (c, (pid, c))).collect(); + let row = CompactionRow { + point_id: pid, + timestamp, + sort_key, + columns: columns.clone(), + values, + }; + let ws = window_start(timestamp, self.window_duration); + let key = (scope, ws); + next.ingest_buffer + .entry(key) + .or_default() + .push(row.clone()); + next.next_point_id += 1; + next.points_ingested += 1; + next.row_history.insert(pid, row); + } + CompactionAction::FlushSplit { key } => { + let rows = next.ingest_buffer.remove(&key).unwrap_or_default(); + if rows.is_empty() { + return None; + } + let sorted_rows = insertion_sort(&rows); + let all_cols: BTreeSet = + rows.iter().flat_map(|r| r.columns.iter().copied()).collect(); + let new_split = CompactionSplit { + id: next.next_split_id, + scope: key.0, + window_start: key.1, + rows: sorted_rows, + columns: all_cols, + sorted: true, + }; + next.object_storage.insert(new_split); + next.next_split_id += 1; + } + CompactionAction::CompactWindow { + scope, + window_start: ws, + split_ids, + } => { + let merge_splits: Vec = next + .object_storage + .iter() + .filter(|s| split_ids.contains(&s.id)) + .cloned() + .collect(); + + if merge_splits.len() < 2 { + return None; + } + + // Multi-way sorted merge. + let mut merged_rows = Vec::new(); + for s in &merge_splits { + merged_rows = merge_sorted(&merged_rows, &s.rows); + } + + let all_cols: BTreeSet = merge_splits + .iter() + .flat_map(|s| s.columns.iter().copied()) + .collect(); + + let output_split = CompactionSplit { + id: next.next_split_id, + scope, + window_start: ws, + rows: merged_rows.clone(), + columns: all_cols.clone(), + sorted: true, + }; + + // Build compaction log entry. + let input_ids: BTreeSet = merge_splits.iter().map(|s| s.id).collect(); + let input_point_ids: BTreeSet = merge_splits + .iter() + .flat_map(|s| s.rows.iter().map(|r| r.point_id)) + .collect(); + let output_point_ids: BTreeSet = + merged_rows.iter().map(|r| r.point_id).collect(); + let input_scopes: BTreeMap = + merge_splits.iter().map(|s| (s.id, s.scope)).collect(); + let input_ws: BTreeMap = merge_splits + .iter() + .map(|s| (s.id, s.window_start)) + .collect(); + + let log_entry = CompactionLogEntry { + input_split_ids: input_ids, + output_split_id: next.next_split_id, + input_point_ids, + output_point_ids, + input_scopes, + input_window_starts: input_ws, + output_columns: all_cols.clone(), + input_column_union: all_cols, + }; + + // Remove input splits, add output. + for s in &merge_splits { + next.object_storage.remove(s); + } + next.object_storage.insert(output_split); + next.next_split_id += 1; + next.compactions_performed += 1; + next.compaction_log.insert(log_entry); + } + } + + Some(next) + } + + fn properties(&self) -> Vec> { + vec![ + // TW-1: Every split belongs to exactly one time window. + // All rows have the same window_start as the split metadata. + // Mirrors TimeWindowedCompaction.tla lines 274-277 + Property::always("TW-1: one window per split", |model: &CompactionModel, state: &CompactionState| { + let wd = model.window_duration; + state.object_storage.iter().all(|split| { + split + .rows + .iter() + .all(|row| window_start(row.timestamp, wd) == split.window_start) + }) + }), + // TW-2: window_duration evenly divides one hour. + // Mirrors TimeWindowedCompaction.tla lines 283-284 + Property::always("TW-2: duration divides hour", |model: &CompactionModel, _state: &CompactionState| { + model.hour_seconds % model.window_duration == 0 + }), + // TW-3: No cross-window merges. + // Mirrors TimeWindowedCompaction.tla lines 295-305 + Property::always("TW-3: no cross-window merge", |_model: &CompactionModel, state: &CompactionState| { + state.compaction_log.iter().all(|entry| { + // All input window_starts are identical. + let ws_values: BTreeSet = + entry.input_window_starts.values().copied().collect(); + if ws_values.len() > 1 { + return false; + } + // Output split (if in storage) matches. + for s in &state.object_storage { + if s.id == entry.output_split_id { + for &input_ws in entry.input_window_starts.values() { + if s.window_start != input_ws { + return false; + } + } + } + } + true + }) + }), + // CS-1: Only splits sharing scope may be merged. + // Mirrors TimeWindowedCompaction.tla lines 311-314 + Property::always("CS-1: scope compatibility", |_model: &CompactionModel, state: &CompactionState| { + state.compaction_log.iter().all(|entry| { + let scopes: BTreeSet = + entry.input_scopes.values().copied().collect(); + scopes.len() <= 1 + }) + }), + // CS-2: Same window_start within scope. + // Mirrors TimeWindowedCompaction.tla lines 320-323 + Property::always("CS-2: same window_start", |_model: &CompactionModel, state: &CompactionState| { + state.compaction_log.iter().all(|entry| { + let ws_values: BTreeSet = + entry.input_window_starts.values().copied().collect(); + ws_values.len() <= 1 + }) + }), + // CS-3: Splits before compaction_start_time never compacted. + // Mirrors TimeWindowedCompaction.tla lines 329-332 + Property::always("CS-3: compaction start time", |model: &CompactionModel, state: &CompactionState| { + let cst = model.compaction_start_time; + state.compaction_log.iter().all(|entry| { + entry + .input_window_starts + .values() + .all(|&ws| ws >= cst) + }) + }), + // MC-1: Row multiset preserved (no add/remove/duplicate). + // Mirrors TimeWindowedCompaction.tla lines 339-344 + Property::always("MC-1: row set preserved", |_model: &CompactionModel, state: &CompactionState| { + state.compaction_log.iter().all(|entry| { + entry.input_point_ids == entry.output_point_ids + }) + }), + // MC-2: Row contents unchanged through compaction. + // Mirrors TimeWindowedCompaction.tla lines 351-360 + Property::always("MC-2: row contents preserved", |_model: &CompactionModel, state: &CompactionState| { + for split in &state.object_storage { + for row in &split.rows { + if let Some(original) = state.row_history.get(&row.point_id) { + if row.timestamp != original.timestamp + || row.sort_key != original.sort_key + || row.columns != original.columns + || row.values != original.values + { + return false; + } + } else { + return false; + } + } + } + true + }), + // MC-3: Output is sorted. + // Mirrors TimeWindowedCompaction.tla lines 366-368 + Property::always("MC-3: sort order preserved", |_model: &CompactionModel, state: &CompactionState| { + state.object_storage.iter().all(|split| { + if split.sorted { + is_sorted_by_key(&split.rows) + } else { + true + } + }) + }), + // MC-4: Column set is the union of inputs. + // Mirrors TimeWindowedCompaction.tla lines 376-378 + Property::always("MC-4: column union", |_model: &CompactionModel, state: &CompactionState| { + state.compaction_log.iter().all(|entry| { + entry.output_columns == entry.input_column_union + }) + }), + ] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_compaction_small() { + let model = CompactionModel::small(); + model + .checker() + .spawn_bfs() + .join() + .assert_properties(); + } + + #[test] + fn window_start_computation() { + assert_eq!(window_start(0, 2), 0); + assert_eq!(window_start(1, 2), 0); + assert_eq!(window_start(2, 2), 2); + assert_eq!(window_start(3, 2), 2); + assert_eq!(window_start(5, 3), 3); + } + + #[test] + fn merge_sort_basic() { + let r1 = CompactionRow { + point_id: 1, + timestamp: 0, + sort_key: 1, + columns: BTreeSet::new(), + values: BTreeMap::new(), + }; + let r2 = CompactionRow { + point_id: 2, + timestamp: 0, + sort_key: 3, + columns: BTreeSet::new(), + values: BTreeMap::new(), + }; + let r3 = CompactionRow { + point_id: 3, + timestamp: 0, + sort_key: 2, + columns: BTreeSet::new(), + values: BTreeMap::new(), + }; + let merged = merge_sorted(&[r1.clone(), r2.clone()], std::slice::from_ref(&r3)); + assert_eq!(merged.len(), 3); + assert_eq!(merged[0].point_id, 1); + assert_eq!(merged[1].point_id, 3); + assert_eq!(merged[2].point_id, 2); + } +} diff --git a/quickwit/quickwit-dst/tests/stateright_models.rs b/quickwit/quickwit-dst/tests/stateright_models.rs new file mode 100644 index 00000000000..7b94fb38fd8 --- /dev/null +++ b/quickwit/quickwit-dst/tests/stateright_models.rs @@ -0,0 +1,77 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Integration tests for stateright models. +//! +//! Each test runs exhaustive BFS model checking against the small TLA+ config +//! equivalents and verifies all invariants hold. +//! +//! Requires `--features model-checking` to compile and run. + +#![cfg(feature = "model-checking")] + +use quickwit_dst::models::{ + parquet_data_model::DataModelModel, sort_schema::SortSchemaModel, + time_windowed_compaction::CompactionModel, +}; +use stateright::{Checker, Model}; + +/// SS-1..SS-5: Sort schema invariants (ADR-002). +/// Mirrors SortSchema_small.cfg: Columns={c1}, RowsPerSplitMax=2, +/// SplitsMax=2, SchemaChangesMax=1. +#[test] +fn exhaustive_sort_schema() { + let model = SortSchemaModel::small(); + let result = model.checker().spawn_bfs().join(); + result.assert_properties(); + println!( + "SortSchema: states={}, unique={}", + result.state_count(), + result.unique_state_count() + ); +} + +/// TW-1..TW-3, CS-1..CS-3, MC-1..MC-4: Compaction invariants (ADR-003). +/// Mirrors TimeWindowedCompaction_small.cfg. +#[test] +fn exhaustive_compaction() { + let model = CompactionModel::small(); + let result = model.checker().spawn_bfs().join(); + result.assert_properties(); + println!( + "Compaction: states={}, unique={}", + result.state_count(), + result.unique_state_count() + ); +} + +/// DM-1..DM-5: Data model invariants (ADR-001). +/// Mirrors ParquetDataModel_small.cfg: Nodes={n1}, MetricNames={m1}, +/// TagSets={tags1}, Timestamps={1}, RequestCountMax=3. +#[test] +fn exhaustive_data_model() { + let model = DataModelModel::small(); + let result = model.checker().spawn_bfs().join(); + result.assert_properties(); + println!( + "DataModel: states={}, unique={}", + result.state_count(), + result.unique_state_count() + ); +} diff --git a/quickwit/quickwit-parquet-engine/Cargo.toml b/quickwit/quickwit-parquet-engine/Cargo.toml index 39918c0948c..ca1e8d72150 100644 --- a/quickwit/quickwit-parquet-engine/Cargo.toml +++ b/quickwit/quickwit-parquet-engine/Cargo.toml @@ -18,6 +18,7 @@ chrono = { workspace = true } parquet = { workspace = true } prost = { workspace = true } quickwit-common = { workspace = true } +quickwit-dst = { workspace = true } quickwit-proto = { workspace = true } sea-query = { workspace = true, optional = true } serde = { workspace = true } diff --git a/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs b/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs index b1ad896e31f..d34a3ec15ea 100644 --- a/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs +++ b/quickwit/quickwit-parquet-engine/src/sort_fields/window.rs @@ -77,16 +77,19 @@ pub fn window_start( timestamp_secs: i64, duration_secs: i64, ) -> Result, SortFieldsError> { - debug_assert!(duration_secs > 0, "window duration must be positive"); - // TW-2 (ADR-003): window duration must evenly divide one hour. - // This ensures window boundaries align across hours and days. - debug_assert!( + use quickwit_dst::check_invariant; + use quickwit_dst::invariants::InvariantId; + + check_invariant!(InvariantId::TW2, duration_secs > 0, ": duration_secs must be positive"); + check_invariant!( + InvariantId::TW2, 3600 % duration_secs == 0, - "TW-2 violated: duration_secs={} does not divide 3600", - duration_secs + ": duration_secs={} does not divide 3600", duration_secs + ); + let start_secs = quickwit_dst::invariants::window::window_start_secs( + timestamp_secs, + duration_secs, ); - let remainder = timestamp_secs.rem_euclid(duration_secs); - let start_secs = timestamp_secs - remainder; DateTime::from_timestamp(start_secs, 0).ok_or(SortFieldsError::WindowStartOutOfRange { timestamp_secs: start_secs, }) diff --git a/quickwit/quickwit-parquet-engine/src/split/metadata.rs b/quickwit/quickwit-parquet-engine/src/split/metadata.rs index 5bf85ed987b..d280d846d8e 100644 --- a/quickwit/quickwit-parquet-engine/src/split/metadata.rs +++ b/quickwit/quickwit-parquet-engine/src/split/metadata.rs @@ -467,21 +467,21 @@ impl MetricsSplitMetadataBuilder { pub fn build(self) -> MetricsSplitMetadata { // TW-2 (ADR-003): window_duration must evenly divide 3600. // Enforced at build time so no invalid metadata propagates to storage. - debug_assert!( + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::TW2, self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0, - "TW-2 violated: window_duration_secs={} does not divide 3600", - self.window_duration_secs + ": window_duration_secs={} does not divide 3600", self.window_duration_secs ); // TW-1 (ADR-003, partial): window_start and window_duration_secs are paired. // If one is set, the other must be too. Pre-Phase-31 splits have both at defaults. - debug_assert!( + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::TW1, (self.window_start.is_none() && self.window_duration_secs == 0) || (self.window_start.is_some() && self.window_duration_secs > 0), - "TW-1 violated: window_start and window_duration_secs must be set together \ + ": window_start and window_duration_secs must be set together \ (window_start={:?}, window_duration_secs={})", - self.window_start, - self.window_duration_secs + self.window_start, self.window_duration_secs ); // Fuse the two builder fields into a single Range. diff --git a/quickwit/quickwit-parquet-engine/src/split/postgres.rs b/quickwit/quickwit-parquet-engine/src/split/postgres.rs index 6b63f46f17f..5da90cf319c 100644 --- a/quickwit/quickwit-parquet-engine/src/split/postgres.rs +++ b/quickwit/quickwit-parquet-engine/src/split/postgres.rs @@ -160,13 +160,33 @@ impl PgMetricsSplit { debug_assert_eq!(metadata.split_id.as_str(), self.split_id); debug_assert_eq!(metadata.time_range.start_secs, self.time_range_start as u64); debug_assert_eq!(metadata.time_range.end_secs, self.time_range_end as u64); - debug_assert_eq!(metadata.window_start(), self.window_start); - debug_assert_eq!( - metadata.window_duration_secs(), - self.window_duration_secs.unwrap_or(0) as u32 + + // SS-5 (SortSchema.tla): sort_fields must be identical in JSON metadata + // and the dedicated SQL column. Inconsistency would cause the compaction + // planner to select wrong splits or miss eligible ones. + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::SS5, + metadata.sort_fields == self.sort_fields, + ": sort_fields mismatch between JSON ('{}') and SQL column ('{}')", + metadata.sort_fields, self.sort_fields + ); + + // SS-5 continued: window_start must match between JSON and SQL column. + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::SS5, + metadata.window_start() == self.window_start, + ": window_start mismatch between JSON ({:?}) and SQL column ({:?})", + metadata.window_start(), self.window_start + ); + + // SS-4 (SortSchema.tla): sort_fields is immutable after write. + // We can't verify immutability at read time (no history available), but + // we verify the row_keys_proto round-trips consistently. + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::SS5, + metadata.row_keys_proto == self.row_keys, + ": row_keys_proto mismatch between JSON and SQL column" ); - debug_assert_eq!(metadata.sort_fields, self.sort_fields); - debug_assert_eq!(metadata.num_merge_ops, self.num_merge_ops as u32); Ok(metadata) } diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index 3ec633573bc..60b39dd34c5 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -52,10 +52,12 @@ pub(crate) const PARQUET_META_ROW_KEYS_JSON: &str = "qh.row_keys_json"; pub(crate) fn build_compaction_key_value_metadata( metadata: &MetricsSplitMetadata, ) -> Vec { - // TW-2: window_duration must divide 3600. - debug_assert!( + // TW-2: window_duration must divide 3600 (also checked at build time, + // but belt-and-suspenders at the serialization boundary). + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::TW2, metadata.window_duration_secs() == 0 || 3600 % metadata.window_duration_secs() == 0, - "TW-2 violated at Parquet write: window_duration_secs={} does not divide 3600", + " at Parquet write: window_duration_secs={} does not divide 3600", metadata.window_duration_secs() ); @@ -116,18 +118,18 @@ fn verify_ss5_kv_consistency(metadata: &MetricsSplitMetadata, kvs: &[KeyValue]) }; if !metadata.sort_fields.is_empty() { - debug_assert_eq!( - find_kv(PARQUET_META_SORT_FIELDS), - Some(metadata.sort_fields.as_str()), - "SS-5 violated: sort_fields in kv_metadata does not match MetricsSplitMetadata" + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::SS5, + find_kv(PARQUET_META_SORT_FIELDS) == Some(metadata.sort_fields.as_str()), + ": sort_fields in kv_metadata does not match MetricsSplitMetadata" ); } if let Some(ws) = metadata.window_start() { - debug_assert_eq!( - find_kv(PARQUET_META_WINDOW_START), - Some(ws.to_string()).as_deref(), - "SS-5 violated: window_start in kv_metadata does not match MetricsSplitMetadata" + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::SS5, + find_kv(PARQUET_META_WINDOW_START) == Some(ws.to_string()).as_deref(), + ": window_start in kv_metadata does not match MetricsSplitMetadata" ); } } @@ -283,11 +285,10 @@ impl ParquetWriter { let verify_indices = lexsort_to_indices(&verify_columns, None) .expect("SS-1 verification sort failed"); for i in 0..verify_indices.len() { - debug_assert_eq!( - verify_indices.value(i) as usize, - i, - "SS-1 violated: row {} is out of sort order after sort_batch()", - i + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::SS1, + verify_indices.value(i) as usize == i, + ": row {} is out of sort order after sort_batch()", i ); } } From 228e0a4cd334bd12a626f8056ebd56b8c313701f Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 12 Mar 2026 16:36:01 -0400 Subject: [PATCH 2/4] feat(31): check invariants in release builds, add pluggable recorder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The check_invariant! macro now always evaluates the condition — not just in debug builds. This implements Layer 4 (Production) of the verification stack: invariant checks run in release, with results forwarded to a pluggable InvariantRecorder for Datadog metrics emission. - Debug builds: panic on violation (debug_assert, Layer 3) - All builds: evaluate condition, call recorder (Layer 4) - set_invariant_recorder() wires up statsd at process startup - No recorder registered = no-op (single OnceLock load) Co-Authored-By: Claude Opus 4.6 (1M context) --- quickwit/quickwit-dst/src/invariants/check.rs | 36 +++-- quickwit/quickwit-dst/src/invariants/mod.rs | 2 + .../quickwit-dst/src/invariants/recorder.rs | 130 ++++++++++++++++++ 3 files changed, 157 insertions(+), 11 deletions(-) create mode 100644 quickwit/quickwit-dst/src/invariants/recorder.rs diff --git a/quickwit/quickwit-dst/src/invariants/check.rs b/quickwit/quickwit-dst/src/invariants/check.rs index 06c59ee9a3c..6c65377ae63 100644 --- a/quickwit/quickwit-dst/src/invariants/check.rs +++ b/quickwit/quickwit-dst/src/invariants/check.rs @@ -17,13 +17,23 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -//! Invariant checking macro. +//! Invariant checking macro — Layers 3 + 4 of the verification stack. //! -//! Wraps `debug_assert!` with the invariant ID, providing a single hook point -//! for future Datadog metrics emission (Layer 4 of the verification stack). +//! The condition is **always evaluated** (debug and release). Results are: +//! +//! - **Debug builds (Layer 3 — Prevention):** panics on violation via +//! `debug_assert!`, catching bugs during development and testing. +//! - **All builds (Layer 4 — Production):** forwards the result to the +//! registered [`InvariantRecorder`](super::recorder::InvariantRecorder) +//! for Datadog metrics emission. No-op if no recorder is set. -/// Check an invariant condition. In debug builds, panics on violation. -/// In release builds, currently a no-op (future: emit Datadog metric). +/// Check an invariant condition in all build profiles. +/// +/// The condition is always evaluated. In debug builds, a violation panics. +/// In all builds, the result is forwarded to the registered invariant +/// recorder for metrics emission (see [`set_invariant_recorder`]). +/// +/// [`set_invariant_recorder`]: crate::invariants::set_invariant_recorder /// /// # Examples /// @@ -36,10 +46,14 @@ /// ``` #[macro_export] macro_rules! check_invariant { - ($id:expr, $cond:expr) => { - debug_assert!($cond, "{} violated", $id); - }; - ($id:expr, $cond:expr, $fmt:literal $($arg:tt)*) => { - debug_assert!($cond, concat!("{} violated", $fmt), $id $($arg)*); - }; + ($id:expr, $cond:expr) => {{ + let passed = $cond; + $crate::invariants::record_invariant_check($id, passed); + debug_assert!(passed, "{} violated", $id); + }}; + ($id:expr, $cond:expr, $fmt:literal $($arg:tt)*) => {{ + let passed = $cond; + $crate::invariants::record_invariant_check($id, passed); + debug_assert!(passed, concat!("{} violated", $fmt), $id $($arg)*); + }}; } diff --git a/quickwit/quickwit-dst/src/invariants/mod.rs b/quickwit/quickwit-dst/src/invariants/mod.rs index b33ed7849c9..b9679683a7e 100644 --- a/quickwit/quickwit-dst/src/invariants/mod.rs +++ b/quickwit/quickwit-dst/src/invariants/mod.rs @@ -26,8 +26,10 @@ //! No external dependencies — only `std`. mod check; +pub mod recorder; pub mod registry; pub mod sort; pub mod window; +pub use recorder::{record_invariant_check, set_invariant_recorder}; pub use registry::InvariantId; diff --git a/quickwit/quickwit-dst/src/invariants/recorder.rs b/quickwit/quickwit-dst/src/invariants/recorder.rs new file mode 100644 index 00000000000..f3859e03849 --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/recorder.rs @@ -0,0 +1,130 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Pluggable invariant recorder — Layer 4 of the verification stack. +//! +//! Every call to [`check_invariant!`](crate::check_invariant) evaluates the +//! condition in **all** build profiles (debug and release). The result is +//! forwarded to a recorder function that can emit Datadog metrics, log +//! violations, or take any other action. +//! +//! # Wiring up Datadog metrics +//! +//! Call [`set_invariant_recorder`] once at process startup: +//! +//! ```rust +//! use quickwit_dst::invariants::{InvariantId, set_invariant_recorder}; +//! +//! fn my_recorder(id: InvariantId, passed: bool) { +//! // statsd.count("pomsky.invariant.checked", 1, &[&format!("name:{}", id)]); +//! // if !passed { +//! // statsd.count("pomsky.invariant.violated", 1, &[&format!("name:{}", id)]); +//! // } +//! if !passed { +//! eprintln!("{} violated in production", id); +//! } +//! } +//! +//! set_invariant_recorder(my_recorder); +//! ``` + +use std::sync::OnceLock; + +use super::InvariantId; + +/// Signature for an invariant recorder function. +/// +/// Called on every `check_invariant!` invocation with the invariant ID and +/// whether the check passed. Implementations must be cheap — this is called +/// on hot paths. +pub type InvariantRecorder = fn(InvariantId, bool); + +/// Global recorder. When unset, [`record_invariant_check`] is a no-op. +static RECORDER: OnceLock = OnceLock::new(); + +/// Register a global invariant recorder. +/// +/// Should be called once at process startup. Subsequent calls are ignored +/// (first writer wins). This is safe to call from any thread. +pub fn set_invariant_recorder(recorder: InvariantRecorder) { + // OnceLock::set returns Err if already initialized — that's fine. + let _ = RECORDER.set(recorder); +} + +/// Record an invariant check result. +/// +/// Called by [`check_invariant!`](crate::check_invariant) on every invocation, +/// in both debug and release builds. If no recorder has been registered via +/// [`set_invariant_recorder`], this is a no-op (single atomic load). +#[inline] +pub fn record_invariant_check(id: InvariantId, passed: bool) { + if let Some(recorder) = RECORDER.get() { + recorder(id, passed); + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + + use super::*; + + // Note: these tests use a process-global OnceLock, so only the first + // test to run can set the recorder. We test the no-recorder path and + // the recorder path in a single test to avoid ordering issues. + + #[test] + fn record_without_recorder_is_noop() { + // Before any recorder is set, this should not panic. + // (In the test binary, another test may have set it, so we just + // verify it doesn't panic either way.) + record_invariant_check(InvariantId::SS1, true); + record_invariant_check(InvariantId::SS1, false); + } + + #[test] + fn recorder_receives_calls() { + static CHECKS: AtomicU32 = AtomicU32::new(0); + static VIOLATIONS: AtomicU32 = AtomicU32::new(0); + + fn test_recorder(_id: InvariantId, passed: bool) { + CHECKS.fetch_add(1, Ordering::Relaxed); + if !passed { + VIOLATIONS.fetch_add(1, Ordering::Relaxed); + } + } + + // May fail if another test already set the recorder — that's OK, + // the test still verifies the function doesn't panic. + let _ = RECORDER.set(test_recorder); + + let before_checks = CHECKS.load(Ordering::Relaxed); + let before_violations = VIOLATIONS.load(Ordering::Relaxed); + + record_invariant_check(InvariantId::TW2, true); + record_invariant_check(InvariantId::TW2, false); + + // If our recorder was set, we should see the increments. + // If another recorder was set first, we can't assert on counts. + if RECORDER.get() == Some(&(test_recorder as InvariantRecorder)) { + assert_eq!(CHECKS.load(Ordering::Relaxed), before_checks + 2); + assert_eq!(VIOLATIONS.load(Ordering::Relaxed), before_violations + 1); + } + } +} From b764097b6f7afbac3a48301099955522392e3120 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 12 Mar 2026 16:50:49 -0400 Subject: [PATCH 3/4] feat(31): wire invariant recorder to DogStatsD metrics Emit cloudprem.pomsky.invariant.checked and .violated counters with invariant label via the metrics crate / DogStatsD exporter at process startup, completing Layer 4 of the verification stack. Co-Authored-By: Claude Opus 4.6 (1M context) --- quickwit/Cargo.lock | 160 +++++++++++++++++- quickwit/Cargo.toml | 2 + quickwit/quickwit-cli/Cargo.toml | 4 + quickwit/quickwit-cli/src/logger.rs | 59 +++++++ quickwit/quickwit-cli/src/main.rs | 6 + .../quickwit-dst/src/invariants/registry.rs | 60 ++++--- 6 files changed, 263 insertions(+), 28 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index df969d83d7e..020b281625f 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -407,6 +407,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "ascii-canvas" version = "4.0.0" @@ -1808,6 +1814,12 @@ dependencies = [ "zstd", ] +[[package]] +name = "choice" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3b71fc821deaf602a933ada5c845d088156d0cdf2ebf43ede390afe93466553" + [[package]] name = "chrono" version = "0.4.44" @@ -1832,6 +1844,12 @@ dependencies = [ "phf", ] +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + [[package]] name = "ciborium" version = "0.2.2" @@ -2883,6 +2901,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-iterator" version = "2.3.0" @@ -4183,6 +4207,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" +[[package]] +name = "id-set" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9633fadf6346456cf8531119ba4838bc6d82ac4ce84d9852126dd2aa34d49264" + [[package]] name = "ident_case" version = "1.0.1" @@ -4934,6 +4964,51 @@ dependencies = [ "libc", ] +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-dogstatsd" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "961f3712d8a7cfe14caaf74c3af503fe701cee6439ff49a7a3ebd04bf49c0502" +dependencies = [ + "bytes", + "itoa", + "metrics", + "metrics-util", + "ryu", + "thiserror 2.0.18", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap 2.13.0", + "metrics", + "ordered-float 5.3.0", + "quanta", + "radix_trie", + "rand 0.9.2", + "rand_xoshiro", + "sketches-ddsketch 0.3.1", +] + [[package]] name = "mime" version = "0.3.17" @@ -5124,6 +5199,15 @@ dependencies = [ "regex", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.26.4" @@ -5141,6 +5225,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65" +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "7.1.3" @@ -6975,6 +7065,8 @@ dependencies = [ "humantime", "indicatif", "itertools 0.14.0", + "metrics", + "metrics-exporter-dogstatsd", "numfmt", "once_cell", "openssl-probe 0.1.6", @@ -6987,6 +7079,7 @@ dependencies = [ "quickwit-cluster", "quickwit-common", "quickwit-config", + "quickwit-dst", "quickwit-index-management", "quickwit-indexing", "quickwit-ingest", @@ -7251,6 +7344,13 @@ dependencies = [ "utoipa", ] +[[package]] +name = "quickwit-dst" +version = "0.8.0" +dependencies = [ + "stateright", +] + [[package]] name = "quickwit-index-management" version = "0.8.0" @@ -7617,6 +7717,7 @@ dependencies = [ "proptest", "prost 0.14.3", "quickwit-common", + "quickwit-dst", "quickwit-proto", "sea-query", "serde", @@ -7988,6 +8089,16 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.7.3" @@ -8124,6 +8235,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.5", +] + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -9331,6 +9451,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.12" @@ -9662,6 +9788,26 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "stateright" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd37c74ff38ca9e5d370efb7af3c49ecab91cb5644affa23dc54a061d0f3a59" +dependencies = [ + "ahash", + "choice", + "crossbeam-utils", + "dashmap 5.5.3", + "id-set", + "log", + "nohash-hasher", + "parking_lot 0.12.5", + "rand 0.8.5", + "serde", + "serde_json", + "tiny_http", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -9873,7 +10019,7 @@ dependencies = [ "rustc-hash", "serde", "serde_json", - "sketches-ddsketch", + "sketches-ddsketch 0.3.0", "smallvec", "tantivy-bitpacker", "tantivy-columnar", @@ -10177,6 +10323,18 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + [[package]] name = "tinystr" version = "0.8.2" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index cfd732a61f7..fc73ec93684 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -156,6 +156,8 @@ libz-sys = "1.1" lru = "0.16" matches = "0.1" md5 = "0.8" +metrics = "0.24" +metrics-exporter-dogstatsd = "0.9" mime_guess = "2.0" mini-moka = "0.10.3" mockall = "0.14" diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 5d9dc955107..5af056d0820 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -56,10 +56,14 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } +metrics = { workspace = true } +metrics-exporter-dogstatsd = { workspace = true } + quickwit-actors = { workspace = true } quickwit-cluster = { workspace = true } quickwit-common = { workspace = true } quickwit-config = { workspace = true } +quickwit-dst = { workspace = true } quickwit-index-management = { workspace = true } quickwit-indexing = { workspace = true } quickwit-ingest = { workspace = true } diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index e1e60a14f93..fe91769b6cc 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -161,6 +161,65 @@ pub fn setup_logging_and_tracing( )) } +#[cfg(not(any(test, feature = "testsuite")))] +pub fn setup_dogstatsd_exporter(build_info: &BuildInfo) -> anyhow::Result<()> { + // Reading both `CLOUDPREM_*` and `CP_*` env vars for backward compatibility. The former is + // deprecated and can be removed after 2026-04-01. + let host: String = quickwit_common::get_from_env_opt("CLOUDPREM_DOGSTATSD_SERVER_HOST", false) + .unwrap_or_else(|| { + quickwit_common::get_from_env( + "CP_DOGSTATSD_SERVER_HOST", + "127.0.0.1".to_string(), + false, + ) + }); + let port: u16 = quickwit_common::get_from_env_opt("CLOUDPREM_DOGSTATSD_SERVER_PORT", false) + .unwrap_or_else(|| quickwit_common::get_from_env("CP_DOGSTATSD_SERVER_PORT", 8125, false)); + let addr = format!("{host}:{port}"); + + let mut global_labels = vec![::metrics::Label::new("version", build_info.version.clone())]; + let keys = [ + ("IMAGE_NAME", "image_name"), + ("IMAGE_TAG", "image_tag"), + ("KUBERNETES_COMPONENT", "kube_component"), + ("KUBERNETES_NAMESPACE", "kube_namespace"), + ("KUBERNETES_POD_NAME", "kube_pod_name"), + ("QW_CLUSTER_ID", "cloudprem_cluster_id"), + ("QW_NODE_ID", "cloudprem_node_id"), + ]; + for (env_var_key, label_key) in keys { + if let Some(label_val) = quickwit_common::get_from_env_opt::(env_var_key, false) { + global_labels.push(::metrics::Label::new(label_key, label_val)); + } + } + metrics_exporter_dogstatsd::DogStatsDBuilder::default() + .set_global_prefix("cloudprem") + .with_global_labels(global_labels) + .with_remote_address(addr) + .context("failed to parse DogStatsD server address")? + .install() + .context("failed to register DogStatsD exporter")?; + Ok(()) +} + +/// Register the invariant recorder that emits DogStatsD counters. +/// +/// Must be called after [`setup_dogstatsd_exporter`] so the `metrics` crate +/// has a registered recorder. +#[cfg(not(any(test, feature = "testsuite")))] +pub fn setup_invariant_recorder() { + quickwit_dst::invariants::set_invariant_recorder(invariant_recorder); +} + +#[cfg(not(any(test, feature = "testsuite")))] +fn invariant_recorder(id: quickwit_dst::invariants::InvariantId, passed: bool) { + let name = id.as_str(); + metrics::counter!("pomsky.invariant.checked", "invariant" => name).increment(1); + if !passed { + metrics::counter!("pomsky.invariant.violated", "invariant" => name).increment(1); + } +} + /// We do not rely on the RFC3339 implementation, because it has a nanosecond precision. /// See discussion here: https://github.com/time-rs/time/discussions/418 fn time_formatter() -> UtcTime>> { diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 4a1f9ce036e..38b696d5037 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -101,6 +101,12 @@ async fn main_impl() -> anyhow::Result<()> { let (env_filter_reload_fn, tracer_provider_opt) = setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; + #[cfg(not(any(test, feature = "testsuite")))] + quickwit_cli::logger::setup_dogstatsd_exporter(build_info)?; + + #[cfg(not(any(test, feature = "testsuite")))] + quickwit_cli::logger::setup_invariant_recorder(); + let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await { error!(error=%command_error, "command failed"); eprintln!( diff --git a/quickwit/quickwit-dst/src/invariants/registry.rs b/quickwit/quickwit-dst/src/invariants/registry.rs index 9704f0da891..cccc1480f6f 100644 --- a/quickwit/quickwit-dst/src/invariants/registry.rs +++ b/quickwit/quickwit-dst/src/invariants/registry.rs @@ -82,6 +82,38 @@ pub enum InvariantId { } impl InvariantId { + /// Short identifier string (e.g. `"SS-1"`). + /// + /// Returns a `&'static str` to avoid allocation on the hot path. + pub fn as_str(self) -> &'static str { + match self { + Self::SS1 => "SS-1", + Self::SS2 => "SS-2", + Self::SS3 => "SS-3", + Self::SS4 => "SS-4", + Self::SS5 => "SS-5", + + Self::TW1 => "TW-1", + Self::TW2 => "TW-2", + Self::TW3 => "TW-3", + + Self::CS1 => "CS-1", + Self::CS2 => "CS-2", + Self::CS3 => "CS-3", + + Self::MC1 => "MC-1", + Self::MC2 => "MC-2", + Self::MC3 => "MC-3", + Self::MC4 => "MC-4", + + Self::DM1 => "DM-1", + Self::DM2 => "DM-2", + Self::DM3 => "DM-3", + Self::DM4 => "DM-4", + Self::DM5 => "DM-5", + } + } + /// Human-readable description of this invariant. pub fn description(self) -> &'static str { match self { @@ -115,33 +147,7 @@ impl InvariantId { impl fmt::Display for InvariantId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match self { - Self::SS1 => "SS-1", - Self::SS2 => "SS-2", - Self::SS3 => "SS-3", - Self::SS4 => "SS-4", - Self::SS5 => "SS-5", - - Self::TW1 => "TW-1", - Self::TW2 => "TW-2", - Self::TW3 => "TW-3", - - Self::CS1 => "CS-1", - Self::CS2 => "CS-2", - Self::CS3 => "CS-3", - - Self::MC1 => "MC-1", - Self::MC2 => "MC-2", - Self::MC3 => "MC-3", - Self::MC4 => "MC-4", - - Self::DM1 => "DM-1", - Self::DM2 => "DM-2", - Self::DM3 => "DM-3", - Self::DM4 => "DM-4", - Self::DM5 => "DM-5", - }; - f.write_str(s) + f.write_str(self.as_str()) } } From 29cc07ff014660c3add8a074e49f2670427499ca Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 1 Apr 2026 14:59:03 -0400 Subject: [PATCH 4/4] fix: license headers + cfg(not(test)) for quickwit-dst and quickwit-cli --- quickwit/quickwit-cli/src/logger.rs | 6 ++--- quickwit/quickwit-cli/src/main.rs | 4 +-- quickwit/quickwit-dst/src/invariants/check.rs | 25 ++++++++----------- quickwit/quickwit-dst/src/invariants/mod.rs | 25 ++++++++----------- .../quickwit-dst/src/invariants/recorder.rs | 25 ++++++++----------- .../quickwit-dst/src/invariants/registry.rs | 25 ++++++++----------- quickwit/quickwit-dst/src/invariants/sort.rs | 25 ++++++++----------- .../quickwit-dst/src/invariants/window.rs | 25 ++++++++----------- quickwit/quickwit-dst/src/lib.rs | 25 ++++++++----------- quickwit/quickwit-dst/src/models/mod.rs | 25 ++++++++----------- .../src/models/parquet_data_model.rs | 25 ++++++++----------- .../quickwit-dst/src/models/sort_schema.rs | 25 ++++++++----------- .../src/models/time_windowed_compaction.rs | 25 ++++++++----------- 13 files changed, 115 insertions(+), 170 deletions(-) diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index fe91769b6cc..a1d1dda8197 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -161,7 +161,7 @@ pub fn setup_logging_and_tracing( )) } -#[cfg(not(any(test, feature = "testsuite")))] +#[cfg(not(test))] pub fn setup_dogstatsd_exporter(build_info: &BuildInfo) -> anyhow::Result<()> { // Reading both `CLOUDPREM_*` and `CP_*` env vars for backward compatibility. The former is // deprecated and can be removed after 2026-04-01. @@ -206,12 +206,12 @@ pub fn setup_dogstatsd_exporter(build_info: &BuildInfo) -> anyhow::Result<()> { /// /// Must be called after [`setup_dogstatsd_exporter`] so the `metrics` crate /// has a registered recorder. -#[cfg(not(any(test, feature = "testsuite")))] +#[cfg(not(test))] pub fn setup_invariant_recorder() { quickwit_dst::invariants::set_invariant_recorder(invariant_recorder); } -#[cfg(not(any(test, feature = "testsuite")))] +#[cfg(not(test))] fn invariant_recorder(id: quickwit_dst::invariants::InvariantId, passed: bool) { let name = id.as_str(); metrics::counter!("pomsky.invariant.checked", "invariant" => name).increment(1); diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 38b696d5037..a448d543eb9 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -101,10 +101,10 @@ async fn main_impl() -> anyhow::Result<()> { let (env_filter_reload_fn, tracer_provider_opt) = setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; - #[cfg(not(any(test, feature = "testsuite")))] + #[cfg(not(test))] quickwit_cli::logger::setup_dogstatsd_exporter(build_info)?; - #[cfg(not(any(test, feature = "testsuite")))] + #[cfg(not(test))] quickwit_cli::logger::setup_invariant_recorder(); let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await { diff --git a/quickwit/quickwit-dst/src/invariants/check.rs b/quickwit/quickwit-dst/src/invariants/check.rs index 6c65377ae63..3db376027b1 100644 --- a/quickwit/quickwit-dst/src/invariants/check.rs +++ b/quickwit/quickwit-dst/src/invariants/check.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Invariant checking macro — Layers 3 + 4 of the verification stack. //! diff --git a/quickwit/quickwit-dst/src/invariants/mod.rs b/quickwit/quickwit-dst/src/invariants/mod.rs index b9679683a7e..88a731eb730 100644 --- a/quickwit/quickwit-dst/src/invariants/mod.rs +++ b/quickwit/quickwit-dst/src/invariants/mod.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Shared invariant definitions — the single source of truth. //! diff --git a/quickwit/quickwit-dst/src/invariants/recorder.rs b/quickwit/quickwit-dst/src/invariants/recorder.rs index f3859e03849..09432ad9045 100644 --- a/quickwit/quickwit-dst/src/invariants/recorder.rs +++ b/quickwit/quickwit-dst/src/invariants/recorder.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Pluggable invariant recorder — Layer 4 of the verification stack. //! diff --git a/quickwit/quickwit-dst/src/invariants/registry.rs b/quickwit/quickwit-dst/src/invariants/registry.rs index cccc1480f6f..f0d9d2bf9d8 100644 --- a/quickwit/quickwit-dst/src/invariants/registry.rs +++ b/quickwit/quickwit-dst/src/invariants/registry.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Invariant ID catalog — single source of truth for all invariant definitions. //! diff --git a/quickwit/quickwit-dst/src/invariants/sort.rs b/quickwit/quickwit-dst/src/invariants/sort.rs index 68a4ad05cb7..a79be010de1 100644 --- a/quickwit/quickwit-dst/src/invariants/sort.rs +++ b/quickwit/quickwit-dst/src/invariants/sort.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Shared null-aware comparison for SS-2 (null ordering invariant). //! diff --git a/quickwit/quickwit-dst/src/invariants/window.rs b/quickwit/quickwit-dst/src/invariants/window.rs index 31276c1da6b..8f96ee23ef8 100644 --- a/quickwit/quickwit-dst/src/invariants/window.rs +++ b/quickwit/quickwit-dst/src/invariants/window.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Shared window math for time-windowed compaction. //! diff --git a/quickwit/quickwit-dst/src/lib.rs b/quickwit/quickwit-dst/src/lib.rs index 2fa71cbf47c..194af267b52 100644 --- a/quickwit/quickwit-dst/src/lib.rs +++ b/quickwit/quickwit-dst/src/lib.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Deterministic simulation testing and shared invariants for Quickhouse-Pomsky. //! diff --git a/quickwit/quickwit-dst/src/models/mod.rs b/quickwit/quickwit-dst/src/models/mod.rs index a46f5552d35..d2a74fa424a 100644 --- a/quickwit/quickwit-dst/src/models/mod.rs +++ b/quickwit/quickwit-dst/src/models/mod.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Stateright models mirroring the TLA+ specifications. diff --git a/quickwit/quickwit-dst/src/models/parquet_data_model.rs b/quickwit/quickwit-dst/src/models/parquet_data_model.rs index 321dbc5dfc5..08d9a27dc3c 100644 --- a/quickwit/quickwit-dst/src/models/parquet_data_model.rs +++ b/quickwit/quickwit-dst/src/models/parquet_data_model.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Stateright model for Parquet Data Model invariants (ADR-001). //! diff --git a/quickwit/quickwit-dst/src/models/sort_schema.rs b/quickwit/quickwit-dst/src/models/sort_schema.rs index 89746360993..d741ceddffa 100644 --- a/quickwit/quickwit-dst/src/models/sort_schema.rs +++ b/quickwit/quickwit-dst/src/models/sort_schema.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Stateright model for Sort Schema invariants (ADR-002). //! diff --git a/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs b/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs index 0723f93e453..b50d8a695b7 100644 --- a/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs +++ b/quickwit/quickwit-dst/src/models/time_windowed_compaction.rs @@ -1,21 +1,16 @@ -// Copyright (C) 2024 Quickwit, Inc. +// Copyright 2021-Present Datadog, Inc. // -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. +// http://www.apache.org/licenses/LICENSE-2.0 // -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Stateright model for Time-Windowed Compaction invariants (ADR-003). //!