From b6eb5958d7d56fc2065c7dd01c513a28035d44f5 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 31 Mar 2026 17:32:41 -0400 Subject: [PATCH] =?UTF-8?q?feat(31):=20compaction=20metadata=20types=20?= =?UTF-8?q?=E2=80=94=20extend=20split=20metadata,=20postgres=20model,=20fi?= =?UTF-8?q?eld=20lookup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/metastore/postgres/metastore.rs | 6 + .../src/split/metadata.rs | 305 ++++++++++++++++++ .../src/split/postgres.rs | 39 ++- 3 files changed, 349 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index d459ceb243b..bfa08ff3b9d 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2312,6 +2312,12 @@ impl MetastoreService for PostgresqlMetastore { size_bytes: row.13, split_metadata_json: row.14, update_timestamp: row.15, + window_start: None, + window_duration_secs: 0, + sort_fields: String::new(), + num_merge_ops: 0, + row_keys: None, + zonemap_regexes: String::new(), }; let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged); diff --git a/quickwit/quickwit-parquet-engine/src/split/metadata.rs b/quickwit/quickwit-parquet-engine/src/split/metadata.rs index 992775be862..5bf85ed987b 100644 --- a/quickwit/quickwit-parquet-engine/src/split/metadata.rs +++ b/quickwit/quickwit-parquet-engine/src/split/metadata.rs @@ -15,6 +15,7 @@ //! Metrics split metadata definitions. use std::collections::{HashMap, HashSet}; +use std::ops::Range; use std::time::SystemTime; use serde::{Deserialize, Serialize}; @@ -120,7 +121,12 @@ impl std::fmt::Display for MetricsSplitState { } /// Metadata for a metrics split. +/// +/// The `window` field stores the time window as `[start, start + duration)`. +/// For JSON serialization, it is decomposed into `window_start` and +/// `window_duration_secs` for backward compatibility with pre-Phase-31 code. #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(from = "MetricsSplitMetadataSerde", into = "MetricsSplitMetadataSerde")] pub struct MetricsSplitMetadata { /// Unique split identifier. pub split_id: SplitId, @@ -153,6 +159,117 @@ pub struct MetricsSplitMetadata { /// When this split was created. pub created_at: SystemTime, + + /// Parquet file path relative to storage root. + pub parquet_file: String, + + /// Time window as `[start, start + duration)` in epoch seconds. + /// None for pre-Phase-31 splits (backward compat). + pub window: Option>, + + /// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2"). + /// Empty string for pre-Phase-31 splits. + pub sort_fields: String, + + /// Number of merge operations this split has been through. + /// 0 for newly ingested splits. + pub num_merge_ops: u32, + + /// RowKeys (sort-key min/max boundaries) as proto bytes. + /// None for pre-Phase-31 splits or splits without sort schema. + pub row_keys_proto: Option>, + + /// Per-column zonemap regex strings, keyed by column name. + /// Empty for pre-Phase-31 splits. + pub zonemap_regexes: HashMap, +} + +/// Serde helper struct that uses `window_start` / `window_duration_secs` field +/// names for JSON backward compatibility while the in-memory representation uses +/// `Option>`. +#[derive(Serialize, Deserialize)] +struct MetricsSplitMetadataSerde { + split_id: SplitId, + index_uid: String, + time_range: TimeRange, + num_rows: u64, + size_bytes: u64, + metric_names: HashSet, + low_cardinality_tags: HashMap>, + high_cardinality_tag_keys: HashSet, + created_at: SystemTime, + parquet_file: String, + + #[serde(default, skip_serializing_if = "Option::is_none")] + window_start: Option, + + #[serde(default)] + window_duration_secs: u32, + + #[serde(default)] + sort_fields: String, + + #[serde(default)] + num_merge_ops: u32, + + #[serde(default, skip_serializing_if = "Option::is_none")] + row_keys_proto: Option>, + + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + zonemap_regexes: HashMap, +} + +impl From for MetricsSplitMetadata { + fn from(s: MetricsSplitMetadataSerde) -> Self { + let window = match (s.window_start, s.window_duration_secs) { + (Some(start), dur) if dur > 0 => Some(start..start + dur as i64), + _ => None, + }; + Self { + split_id: s.split_id, + index_uid: s.index_uid, + time_range: s.time_range, + num_rows: s.num_rows, + size_bytes: s.size_bytes, + metric_names: s.metric_names, + low_cardinality_tags: s.low_cardinality_tags, + high_cardinality_tag_keys: s.high_cardinality_tag_keys, + created_at: s.created_at, + parquet_file: s.parquet_file, + window, + sort_fields: s.sort_fields, + num_merge_ops: s.num_merge_ops, + row_keys_proto: s.row_keys_proto, + zonemap_regexes: s.zonemap_regexes, + } + } +} + +impl From for MetricsSplitMetadataSerde { + fn from(m: MetricsSplitMetadata) -> Self { + let (window_start, window_duration_secs) = match &m.window { + Some(w) => (Some(w.start), (w.end - w.start) as u32), + None => (None, 0), + }; + Self { + split_id: m.split_id, + index_uid: m.index_uid, + time_range: m.time_range, + num_rows: m.num_rows, + size_bytes: m.size_bytes, + metric_names: m.metric_names, + low_cardinality_tags: m.low_cardinality_tags, + high_cardinality_tag_keys: m.high_cardinality_tag_keys, + created_at: m.created_at, + parquet_file: m.parquet_file, + window_start, + window_duration_secs, + sort_fields: m.sort_fields, + num_merge_ops: m.num_merge_ops, + row_keys_proto: m.row_keys_proto, + zonemap_regexes: m.zonemap_regexes, + } + } } impl MetricsSplitMetadata { @@ -167,6 +284,19 @@ impl MetricsSplitMetadata { /// Tags with >= CARDINALITY_THRESHOLD unique values use Parquet bloom filters. pub const CARDINALITY_THRESHOLD: usize = 1000; + /// Returns the window start in epoch seconds, or `None` for pre-Phase-31 splits. + pub fn window_start(&self) -> Option { + self.window.as_ref().map(|w| w.start) + } + + /// Returns the window duration in seconds, or 0 for pre-Phase-31 splits. + pub fn window_duration_secs(&self) -> u32 { + match &self.window { + Some(w) => (w.end - w.start) as u32, + None => 0, + } + } + /// Create a new MetricsSplitMetadata builder. pub fn builder() -> MetricsSplitMetadataBuilder { MetricsSplitMetadataBuilder::default() @@ -221,8 +351,19 @@ pub struct MetricsSplitMetadataBuilder { metric_names: HashSet, low_cardinality_tags: HashMap>, high_cardinality_tag_keys: HashSet, + parquet_file: String, + window_start: Option, + window_duration_secs: u32, + sort_fields: String, + num_merge_ops: u32, + row_keys_proto: Option>, + zonemap_regexes: HashMap, } +// The builder still accepts window_start and window_duration_secs separately +// to remain compatible with callers that compute them independently (e.g., +// split_writer). The `build()` method fuses them into `Option>`. + impl MetricsSplitMetadataBuilder { pub fn split_id(mut self, id: SplitId) -> Self { self.split_id = Some(id); @@ -284,7 +425,71 @@ impl MetricsSplitMetadataBuilder { self } + pub fn parquet_file(mut self, path: impl Into) -> Self { + self.parquet_file = path.into(); + self + } + + pub fn window_start_secs(mut self, epoch_secs: i64) -> Self { + self.window_start = Some(epoch_secs); + self + } + + pub fn window_duration_secs(mut self, dur: u32) -> Self { + self.window_duration_secs = dur; + self + } + + pub fn sort_fields(mut self, schema: impl Into) -> Self { + self.sort_fields = schema.into(); + self + } + + pub fn num_merge_ops(mut self, ops: u32) -> Self { + self.num_merge_ops = ops; + self + } + + pub fn row_keys_proto(mut self, bytes: Vec) -> Self { + self.row_keys_proto = Some(bytes); + self + } + + pub fn add_zonemap_regex( + mut self, + column: impl Into, + regex: impl Into, + ) -> Self { + self.zonemap_regexes.insert(column.into(), regex.into()); + self + } + pub fn build(self) -> MetricsSplitMetadata { + // TW-2 (ADR-003): window_duration must evenly divide 3600. + // Enforced at build time so no invalid metadata propagates to storage. + debug_assert!( + self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0, + "TW-2 violated: window_duration_secs={} does not divide 3600", + self.window_duration_secs + ); + + // TW-1 (ADR-003, partial): window_start and window_duration_secs are paired. + // If one is set, the other must be too. Pre-Phase-31 splits have both at defaults. + debug_assert!( + (self.window_start.is_none() && self.window_duration_secs == 0) + || (self.window_start.is_some() && self.window_duration_secs > 0), + "TW-1 violated: window_start and window_duration_secs must be set together \ + (window_start={:?}, window_duration_secs={})", + self.window_start, + self.window_duration_secs + ); + + // Fuse the two builder fields into a single Range. + let window = match (self.window_start, self.window_duration_secs) { + (Some(start), dur) if dur > 0 => Some(start..start + dur as i64), + _ => None, + }; + MetricsSplitMetadata { split_id: self.split_id.unwrap_or_else(SplitId::generate), index_uid: self.index_uid.expect("index_uid is required"), @@ -295,6 +500,12 @@ impl MetricsSplitMetadataBuilder { low_cardinality_tags: self.low_cardinality_tags, high_cardinality_tag_keys: self.high_cardinality_tag_keys, created_at: SystemTime::now(), + parquet_file: self.parquet_file, + window, + sort_fields: self.sort_fields, + num_merge_ops: self.num_merge_ops, + row_keys_proto: self.row_keys_proto, + zonemap_regexes: self.zonemap_regexes, } } } @@ -401,4 +612,98 @@ mod tests { ); assert_eq!(format!("{}", MetricsSplitState::Published), "Published"); } + + #[test] + fn test_backward_compat_deserialize_pre_phase31_json() { + // Simulate a JSON string from pre-Phase-31 code (no compaction fields). + let pre_phase31_json = r#"{ + "split_id": "metrics_abc123", + "index_uid": "test-index:00000000000000000000000000", + "time_range": {"start_secs": 1000, "end_secs": 2000}, + "num_rows": 500, + "size_bytes": 1024, + "metric_names": ["cpu.usage"], + "low_cardinality_tags": {}, + "high_cardinality_tag_keys": [], + "created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0}, + "parquet_file": "split1.parquet" + }"#; + + let metadata: MetricsSplitMetadata = + serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON"); + + // New fields should be at their defaults. + assert!(metadata.window.is_none()); + assert!(metadata.window_start().is_none()); + assert_eq!(metadata.window_duration_secs(), 0); + assert_eq!(metadata.sort_fields, ""); + assert_eq!(metadata.num_merge_ops, 0); + assert!(metadata.row_keys_proto.is_none()); + assert!(metadata.zonemap_regexes.is_empty()); + + // Existing fields should be intact. + assert_eq!(metadata.split_id.as_str(), "metrics_abc123"); + assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000"); + assert_eq!(metadata.num_rows, 500); + } + + #[test] + fn test_round_trip_with_compaction_fields() { + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("roundtrip-compaction")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(500) + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .num_merge_ops(3) + .row_keys_proto(vec![0x08, 0x01, 0x10, 0x02]) + .add_zonemap_regex("metric_name", "cpu\\..*") + .add_zonemap_regex("host", "host-\\d+") + .build(); + + let json = serde_json::to_string(&metadata).expect("should serialize"); + let recovered: MetricsSplitMetadata = + serde_json::from_str(&json).expect("should deserialize"); + + assert_eq!(recovered.window, Some(1700000000..1700003600)); + assert_eq!(recovered.window_start(), Some(1700000000)); + assert_eq!(recovered.window_duration_secs(), 3600); + assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2"); + assert_eq!(recovered.num_merge_ops, 3); + assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02])); + assert_eq!(recovered.zonemap_regexes.len(), 2); + assert_eq!( + recovered.zonemap_regexes.get("metric_name").unwrap(), + "cpu\\..*" + ); + assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+"); + } + + #[test] + fn test_skip_serializing_empty_compaction_fields() { + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("skip-test")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .build(); + + let json = serde_json::to_string(&metadata).expect("should serialize"); + + // Optional fields with skip_serializing_if should be absent. + assert!( + !json.contains("\"window_start\""), + "window_start should not appear when None" + ); + assert!( + !json.contains("\"row_keys_proto\""), + "row_keys_proto should not appear when None" + ); + assert!( + !json.contains("\"zonemap_regexes\""), + "zonemap_regexes should not appear when empty" + ); + } } diff --git a/quickwit/quickwit-parquet-engine/src/split/postgres.rs b/quickwit/quickwit-parquet-engine/src/split/postgres.rs index 8b2c8767846..0ea753181bb 100644 --- a/quickwit/quickwit-parquet-engine/src/split/postgres.rs +++ b/quickwit/quickwit-parquet-engine/src/split/postgres.rs @@ -71,6 +71,12 @@ pub struct PgMetricsSplit { pub size_bytes: i64, pub split_metadata_json: String, pub update_timestamp: i64, + pub window_start: Option, + pub window_duration_secs: i32, + pub sort_fields: String, + pub num_merge_ops: i32, + pub row_keys: Option>, + pub zonemap_regexes: String, } /// Insertable row for metrics_splits table. @@ -92,6 +98,12 @@ pub struct InsertableMetricsSplit { pub num_rows: i64, pub size_bytes: i64, pub split_metadata_json: String, + pub window_start: Option, + pub window_duration_secs: i32, + pub sort_fields: String, + pub num_merge_ops: i32, + pub row_keys: Option>, + pub zonemap_regexes: String, } impl InsertableMetricsSplit { @@ -102,6 +114,12 @@ impl InsertableMetricsSplit { ) -> Result { let split_metadata_json = serde_json::to_string(metadata)?; + let zonemap_regexes_json = if metadata.zonemap_regexes.is_empty() { + "{}".to_string() + } else { + serde_json::to_string(&metadata.zonemap_regexes)? + }; + Ok(Self { split_id: metadata.split_id.as_str().to_string(), split_state: state.as_str().to_string(), @@ -118,6 +136,12 @@ impl InsertableMetricsSplit { num_rows: metadata.num_rows as i64, size_bytes: metadata.size_bytes as i64, split_metadata_json, + window_start: metadata.window_start(), + window_duration_secs: metadata.window_duration_secs() as i32, + sort_fields: metadata.sort_fields.clone(), + num_merge_ops: metadata.num_merge_ops as i32, + row_keys: metadata.row_keys_proto.clone(), + zonemap_regexes: zonemap_regexes_json, }) } } @@ -134,10 +158,17 @@ impl PgMetricsSplit { // Primary path: deserialize from JSON (authoritative) let metadata: MetricsSplitMetadata = serde_json::from_str(&self.split_metadata_json)?; - // Overlay database columns (for consistency verification in debug builds) + // SS-5: Verify consistency between JSON blob and SQL columns. debug_assert_eq!(metadata.split_id.as_str(), self.split_id); debug_assert_eq!(metadata.time_range.start_secs, self.time_range_start as u64); debug_assert_eq!(metadata.time_range.end_secs, self.time_range_end as u64); + debug_assert_eq!(metadata.window_start(), self.window_start); + debug_assert_eq!( + metadata.window_duration_secs(), + self.window_duration_secs as u32 + ); + debug_assert_eq!(metadata.sort_fields, self.sort_fields); + debug_assert_eq!(metadata.num_merge_ops, self.num_merge_ops as u32); Ok(metadata) } @@ -259,6 +290,12 @@ mod tests { size_bytes: insertable.size_bytes, split_metadata_json: insertable.split_metadata_json, update_timestamp: 1704067200, + window_start: insertable.window_start, + window_duration_secs: insertable.window_duration_secs, + sort_fields: insertable.sort_fields, + num_merge_ops: insertable.num_merge_ops, + row_keys: insertable.row_keys, + zonemap_regexes: insertable.zonemap_regexes, }; let recovered = pg_row.to_metadata().expect("should deserialize");