Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,12 @@ impl MetastoreService for PostgresqlMetastore {
size_bytes: row.13,
split_metadata_json: row.14,
update_timestamp: row.15,
window_start: None,
window_duration_secs: 0,
sort_fields: String::new(),
num_merge_ops: 0,
row_keys: None,
zonemap_regexes: String::new(),
};

let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);
Expand Down
305 changes: 305 additions & 0 deletions quickwit/quickwit-parquet-engine/src/split/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Metrics split metadata definitions.

use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::time::SystemTime;

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -120,7 +121,12 @@ impl std::fmt::Display for MetricsSplitState {
}

/// Metadata for a metrics split.
///
/// The `window` field stores the time window as `[start, start + duration)`.
/// For JSON serialization, it is decomposed into `window_start` and
/// `window_duration_secs` for backward compatibility with pre-Phase-31 code.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(from = "MetricsSplitMetadataSerde", into = "MetricsSplitMetadataSerde")]
pub struct MetricsSplitMetadata {
/// Unique split identifier.
pub split_id: SplitId,
Expand Down Expand Up @@ -153,6 +159,117 @@ pub struct MetricsSplitMetadata {

/// When this split was created.
pub created_at: SystemTime,

/// Parquet file path relative to storage root.
pub parquet_file: String,

/// Time window as `[start, start + duration)` in epoch seconds.
/// None for pre-Phase-31 splits (backward compat).
pub window: Option<Range<i64>>,

/// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2").
/// Empty string for pre-Phase-31 splits.
pub sort_fields: String,

/// Number of merge operations this split has been through.
/// 0 for newly ingested splits.
pub num_merge_ops: u32,

/// RowKeys (sort-key min/max boundaries) as proto bytes.
/// None for pre-Phase-31 splits or splits without sort schema.
pub row_keys_proto: Option<Vec<u8>>,

/// Per-column zonemap regex strings, keyed by column name.
/// Empty for pre-Phase-31 splits.
pub zonemap_regexes: HashMap<String, String>,
}

/// Serde helper struct that uses `window_start` / `window_duration_secs` field
/// names for JSON backward compatibility while the in-memory representation uses
/// `Option<Range<i64>>`.
#[derive(Serialize, Deserialize)]
struct MetricsSplitMetadataSerde {
split_id: SplitId,
index_uid: String,
time_range: TimeRange,
num_rows: u64,
size_bytes: u64,
metric_names: HashSet<String>,
low_cardinality_tags: HashMap<String, HashSet<String>>,
high_cardinality_tag_keys: HashSet<String>,
created_at: SystemTime,
parquet_file: String,

#[serde(default, skip_serializing_if = "Option::is_none")]
window_start: Option<i64>,

#[serde(default)]
window_duration_secs: u32,

#[serde(default)]
sort_fields: String,

#[serde(default)]
num_merge_ops: u32,

#[serde(default, skip_serializing_if = "Option::is_none")]
row_keys_proto: Option<Vec<u8>>,

#[serde(default, skip_serializing_if = "HashMap::is_empty")]
zonemap_regexes: HashMap<String, String>,
}

impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata {
fn from(s: MetricsSplitMetadataSerde) -> Self {
let window = match (s.window_start, s.window_duration_secs) {
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
_ => None,
};
Self {
split_id: s.split_id,
index_uid: s.index_uid,
time_range: s.time_range,
num_rows: s.num_rows,
size_bytes: s.size_bytes,
metric_names: s.metric_names,
low_cardinality_tags: s.low_cardinality_tags,
high_cardinality_tag_keys: s.high_cardinality_tag_keys,
created_at: s.created_at,
parquet_file: s.parquet_file,
window,
sort_fields: s.sort_fields,
num_merge_ops: s.num_merge_ops,
row_keys_proto: s.row_keys_proto,
zonemap_regexes: s.zonemap_regexes,
}
}
}

impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde {
fn from(m: MetricsSplitMetadata) -> Self {
let (window_start, window_duration_secs) = match &m.window {
Some(w) => (Some(w.start), (w.end - w.start) as u32),
None => (None, 0),
};
Self {
split_id: m.split_id,
index_uid: m.index_uid,
time_range: m.time_range,
num_rows: m.num_rows,
size_bytes: m.size_bytes,
metric_names: m.metric_names,
low_cardinality_tags: m.low_cardinality_tags,
high_cardinality_tag_keys: m.high_cardinality_tag_keys,
created_at: m.created_at,
parquet_file: m.parquet_file,
window_start,
window_duration_secs,
sort_fields: m.sort_fields,
num_merge_ops: m.num_merge_ops,
row_keys_proto: m.row_keys_proto,
zonemap_regexes: m.zonemap_regexes,
}
}
}

impl MetricsSplitMetadata {
Expand All @@ -167,6 +284,19 @@ impl MetricsSplitMetadata {
/// Tags with >= CARDINALITY_THRESHOLD unique values use Parquet bloom filters.
pub const CARDINALITY_THRESHOLD: usize = 1000;

/// Returns the window start in epoch seconds, or `None` for pre-Phase-31 splits.
pub fn window_start(&self) -> Option<i64> {
self.window.as_ref().map(|w| w.start)
}

/// Returns the window duration in seconds, or 0 for pre-Phase-31 splits.
pub fn window_duration_secs(&self) -> u32 {
match &self.window {
Some(w) => (w.end - w.start) as u32,
None => 0,
}
}

/// Create a new MetricsSplitMetadata builder.
pub fn builder() -> MetricsSplitMetadataBuilder {
MetricsSplitMetadataBuilder::default()
Expand Down Expand Up @@ -221,8 +351,19 @@ pub struct MetricsSplitMetadataBuilder {
metric_names: HashSet<String>,
low_cardinality_tags: HashMap<String, HashSet<String>>,
high_cardinality_tag_keys: HashSet<String>,
parquet_file: String,
window_start: Option<i64>,
window_duration_secs: u32,
sort_fields: String,
num_merge_ops: u32,
row_keys_proto: Option<Vec<u8>>,
zonemap_regexes: HashMap<String, String>,
}

// The builder still accepts window_start and window_duration_secs separately
// to remain compatible with callers that compute them independently (e.g.,
// split_writer). The `build()` method fuses them into `Option<Range<i64>>`.

impl MetricsSplitMetadataBuilder {
pub fn split_id(mut self, id: SplitId) -> Self {
self.split_id = Some(id);
Expand Down Expand Up @@ -284,7 +425,71 @@ impl MetricsSplitMetadataBuilder {
self
}

pub fn parquet_file(mut self, path: impl Into<String>) -> Self {
self.parquet_file = path.into();
self
}

pub fn window_start_secs(mut self, epoch_secs: i64) -> Self {
self.window_start = Some(epoch_secs);
self
}

pub fn window_duration_secs(mut self, dur: u32) -> Self {
self.window_duration_secs = dur;
self
}

pub fn sort_fields(mut self, schema: impl Into<String>) -> Self {
self.sort_fields = schema.into();
self
}

pub fn num_merge_ops(mut self, ops: u32) -> Self {
self.num_merge_ops = ops;
self
}

pub fn row_keys_proto(mut self, bytes: Vec<u8>) -> Self {
self.row_keys_proto = Some(bytes);
self
}

pub fn add_zonemap_regex(
mut self,
column: impl Into<String>,
regex: impl Into<String>,
) -> Self {
self.zonemap_regexes.insert(column.into(), regex.into());
self
}

pub fn build(self) -> MetricsSplitMetadata {
// TW-2 (ADR-003): window_duration must evenly divide 3600.
// Enforced at build time so no invalid metadata propagates to storage.
debug_assert!(
self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0,
"TW-2 violated: window_duration_secs={} does not divide 3600",
self.window_duration_secs
);

// TW-1 (ADR-003, partial): window_start and window_duration_secs are paired.
// If one is set, the other must be too. Pre-Phase-31 splits have both at defaults.
debug_assert!(
(self.window_start.is_none() && self.window_duration_secs == 0)
|| (self.window_start.is_some() && self.window_duration_secs > 0),
"TW-1 violated: window_start and window_duration_secs must be set together \
(window_start={:?}, window_duration_secs={})",
self.window_start,
self.window_duration_secs
);

// Fuse the two builder fields into a single Range.
let window = match (self.window_start, self.window_duration_secs) {
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
_ => None,
};

MetricsSplitMetadata {
split_id: self.split_id.unwrap_or_else(SplitId::generate),
index_uid: self.index_uid.expect("index_uid is required"),
Expand All @@ -295,6 +500,12 @@ impl MetricsSplitMetadataBuilder {
low_cardinality_tags: self.low_cardinality_tags,
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
created_at: SystemTime::now(),
parquet_file: self.parquet_file,
window,
sort_fields: self.sort_fields,
num_merge_ops: self.num_merge_ops,
row_keys_proto: self.row_keys_proto,
zonemap_regexes: self.zonemap_regexes,
}
}
}
Expand Down Expand Up @@ -401,4 +612,98 @@ mod tests {
);
assert_eq!(format!("{}", MetricsSplitState::Published), "Published");
}

#[test]
fn test_backward_compat_deserialize_pre_phase31_json() {
// Simulate a JSON string from pre-Phase-31 code (no compaction fields).
let pre_phase31_json = r#"{
"split_id": "metrics_abc123",
"index_uid": "test-index:00000000000000000000000000",
"time_range": {"start_secs": 1000, "end_secs": 2000},
"num_rows": 500,
"size_bytes": 1024,
"metric_names": ["cpu.usage"],
"low_cardinality_tags": {},
"high_cardinality_tag_keys": [],
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
"parquet_file": "split1.parquet"
}"#;

let metadata: MetricsSplitMetadata =
serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON");

// New fields should be at their defaults.
assert!(metadata.window.is_none());
assert!(metadata.window_start().is_none());
assert_eq!(metadata.window_duration_secs(), 0);
assert_eq!(metadata.sort_fields, "");
assert_eq!(metadata.num_merge_ops, 0);
assert!(metadata.row_keys_proto.is_none());
assert!(metadata.zonemap_regexes.is_empty());

// Existing fields should be intact.
assert_eq!(metadata.split_id.as_str(), "metrics_abc123");
assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000");
assert_eq!(metadata.num_rows, 500);
}

#[test]
fn test_round_trip_with_compaction_fields() {
let metadata = MetricsSplitMetadata::builder()
.split_id(SplitId::new("roundtrip-compaction"))
.index_uid("test-index:00000000000000000000000000")
.time_range(TimeRange::new(1000, 2000))
.num_rows(100)
.size_bytes(500)
.window_start_secs(1700000000)
.window_duration_secs(3600)
.sort_fields("metric_name|host|timestamp/V2")
.num_merge_ops(3)
.row_keys_proto(vec![0x08, 0x01, 0x10, 0x02])
.add_zonemap_regex("metric_name", "cpu\\..*")
.add_zonemap_regex("host", "host-\\d+")
.build();

let json = serde_json::to_string(&metadata).expect("should serialize");
let recovered: MetricsSplitMetadata =
serde_json::from_str(&json).expect("should deserialize");

assert_eq!(recovered.window, Some(1700000000..1700003600));
assert_eq!(recovered.window_start(), Some(1700000000));
assert_eq!(recovered.window_duration_secs(), 3600);
assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2");
assert_eq!(recovered.num_merge_ops, 3);
assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02]));
assert_eq!(recovered.zonemap_regexes.len(), 2);
assert_eq!(
recovered.zonemap_regexes.get("metric_name").unwrap(),
"cpu\\..*"
);
assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+");
}

#[test]
fn test_skip_serializing_empty_compaction_fields() {
let metadata = MetricsSplitMetadata::builder()
.split_id(SplitId::new("skip-test"))
.index_uid("test-index:00000000000000000000000000")
.time_range(TimeRange::new(1000, 2000))
.build();

let json = serde_json::to_string(&metadata).expect("should serialize");

// Optional fields with skip_serializing_if should be absent.
assert!(
!json.contains("\"window_start\""),
"window_start should not appear when None"
);
assert!(
!json.contains("\"row_keys_proto\""),
"row_keys_proto should not appear when None"
);
assert!(
!json.contains("\"zonemap_regexes\""),
"zonemap_regexes should not appear when empty"
);
}
}
Loading
Loading