From ff605b9e0d8d323e767d56fc1e9c29f087b32154 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Mon, 9 Mar 2026 15:08:26 -0400 Subject: [PATCH 1/9] feat(31): PostgreSQL migration 27 + compaction columns in stage/list/publish Add compaction metadata to the PostgreSQL metastore: Migration 27: - 6 new columns: window_start, window_duration_secs, sort_fields, num_merge_ops, row_keys, zonemap_regexes - Partial index idx_metrics_splits_compaction_scope on (index_uid, sort_fields, window_start) WHERE split_state = 'Published' stage_metrics_splits: - INSERT extended from 15 to 21 bind parameters for compaction columns - ON CONFLICT SET updates all compaction columns list_metrics_splits: - PgMetricsSplit construction includes compaction fields (defaults from JSON) Also fixes pre-existing compilation errors on upstream-10b-parquet-actors: - Missing StageMetricsSplitsRequestExt import - index_id vs index_uid type mismatches in publish/mark/delete - IndexUid binding (to_string() for sqlx) - ListMetricsSplitsResponseExt trait disambiguation Co-Authored-By: Claude Opus 4.6 (1M context) --- .../27_add-compaction-metadata.down.sql | 8 +++ .../27_add-compaction-metadata.up.sql | 14 ++++ .../src/metastore/postgres/metastore.rs | 69 +++++++++++++++++-- 3 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql new file mode 100644 index 00000000000..493ddc2e1cd --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql @@ -0,0 +1,8 @@ +-- Reverse Phase 31: Remove compaction metadata columns. +DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql new file mode 100644 index 00000000000..a83336e8295 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql @@ -0,0 +1,14 @@ +-- Phase 31: Add compaction metadata columns to metrics_splits. +-- These columns support time-windowed compaction planning and execution. +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT ''; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}'; + +-- Compaction scope index: supports the compaction planner's primary query pattern +-- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple." +CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope + ON metrics_splits (index_uid, sort_fields, window_start) + WHERE split_state = 'Published'; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index bfa08ff3b9d..a4b93b0f659 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -75,9 +75,9 @@ use crate::file_backed::MutationOccurred; use crate::metastore::postgres::model::Shards; use crate::metastore::postgres::utils::split_maturity_timestamp; use crate::metastore::{ - IndexesMetadataResponseExt, ListMetricsSplitsResponseExt, PublishMetricsSplitsRequestExt, - PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, StageMetricsSplitsRequestExt, - UpdateSourceRequestExt, use_shard_api, + IndexesMetadataResponseExt, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, + PublishMetricsSplitsRequestExt, PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, + StageMetricsSplitsRequestExt, UpdateSourceRequestExt, use_shard_api, }; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, @@ -1803,6 +1803,12 @@ impl MetastoreService for PostgresqlMetastore { let mut num_rows_list = Vec::with_capacity(splits_metadata.len()); let mut size_bytes_list = Vec::with_capacity(splits_metadata.len()); let mut split_metadata_jsons = Vec::with_capacity(splits_metadata.len()); + let mut window_starts: Vec> = Vec::with_capacity(splits_metadata.len()); + let mut window_duration_secs_list: Vec> = Vec::with_capacity(splits_metadata.len()); + let mut sort_fields_list: Vec = Vec::with_capacity(splits_metadata.len()); + let mut num_merge_ops_list: Vec = Vec::with_capacity(splits_metadata.len()); + let mut row_keys_list: Vec>> = Vec::with_capacity(splits_metadata.len()); + let mut zonemap_regexes_json_list: Vec = Vec::with_capacity(splits_metadata.len()); for metadata in &splits_metadata { let insertable = @@ -1837,6 +1843,16 @@ impl MetastoreService for PostgresqlMetastore { num_rows_list.push(insertable.num_rows); size_bytes_list.push(insertable.size_bytes); split_metadata_jsons.push(insertable.split_metadata_json); + window_starts.push(insertable.window_start); + window_duration_secs_list.push(if insertable.window_duration_secs == 0 { + None + } else { + Some(insertable.window_duration_secs) + }); + sort_fields_list.push(insertable.sort_fields); + num_merge_ops_list.push(insertable.num_merge_ops); + row_keys_list.push(insertable.row_keys); + zonemap_regexes_json_list.push(insertable.zonemap_regexes); } info!( @@ -1863,6 +1879,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows, size_bytes, split_metadata_json, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes, create_timestamp, update_timestamp ) @@ -1887,6 +1909,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows, size_bytes, split_metadata_json, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes_json::jsonb, (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'), (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') FROM UNNEST( @@ -1904,7 +1932,13 @@ impl MetastoreService for PostgresqlMetastore { $12::text[], $13::bigint[], $14::bigint[], - $15::text[] + $15::text[], + $16::bigint[], + $17::int[], + $18::text[], + $19::int[], + $20::bytea[], + $21::text[] ) AS staged( split_id, split_state, @@ -1920,7 +1954,13 @@ impl MetastoreService for PostgresqlMetastore { high_cardinality_tag_keys_json, num_rows, size_bytes, - split_metadata_json + split_metadata_json, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes_json ) ON CONFLICT (split_id) DO UPDATE SET @@ -1937,6 +1977,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows = EXCLUDED.num_rows, size_bytes = EXCLUDED.size_bytes, split_metadata_json = EXCLUDED.split_metadata_json, + window_start = EXCLUDED.window_start, + window_duration_secs = EXCLUDED.window_duration_secs, + sort_fields = EXCLUDED.sort_fields, + num_merge_ops = EXCLUDED.num_merge_ops, + row_keys = EXCLUDED.row_keys, + zonemap_regexes = EXCLUDED.zonemap_regexes, update_timestamp = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') WHERE metrics_splits.split_state = 'Staged' RETURNING split_id @@ -1962,6 +2008,12 @@ impl MetastoreService for PostgresqlMetastore { .bind(&num_rows_list) .bind(&size_bytes_list) .bind(&split_metadata_jsons) + .bind(&window_starts) + .bind(&window_duration_secs_list) + .bind(&sort_fields_list) + .bind(&num_merge_ops_list) + .bind(&row_keys_list) + .bind(&zonemap_regexes_json_list) .fetch_all(tx.as_mut()) .await .map_err(|sqlx_error| convert_sqlx_err(&index_id_for_err, sqlx_error)) @@ -2312,12 +2364,15 @@ impl MetastoreService for PostgresqlMetastore { size_bytes: row.13, split_metadata_json: row.14, update_timestamp: row.15, + // Compaction fields are read from the JSON blob via + // to_metadata() — the SQL columns are only used for + // filtering and SS-5 consistency checks. window_start: None, window_duration_secs: 0, sort_fields: String::new(), num_merge_ops: 0, row_keys: None, - zonemap_regexes: String::new(), + zonemap_regexes: "{}".to_string(), }; let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged); @@ -2325,7 +2380,7 @@ impl MetastoreService for PostgresqlMetastore { Some(MetricsSplitRecord { state, - update_timestamp: row.15, + update_timestamp: pg_split.update_timestamp, metadata, }) }) From 723168fe0ee6b823a0f93e11df09db23d4fa3a54 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Mon, 9 Mar 2026 16:59:47 -0400 Subject: [PATCH 2/9] =?UTF-8?q?fix(31):=20close=20port=20gaps=20=E2=80=94?= =?UTF-8?q?=20split=5Fwriter=20metadata,=20compaction=20scope,=20publish?= =?UTF-8?q?=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close critical gaps identified during port review: split_writer.rs: - Store table_config on ParquetSplitWriter (not just pass-through) - Compute window_start from batch time range using table_config.window_duration_secs - Populate sort_fields, window_duration_secs, parquet_files on metadata before write - Call write_to_file_with_metadata(Some(&metadata)) to embed KV metadata in Parquet - Update size_bytes after write completes metastore/mod.rs: - Add window_start and sort_fields fields to ListMetricsSplitsQuery - Add with_compaction_scope() builder method metastore/postgres/metastore.rs: - Add compaction scope filters (AND window_start = $N, AND sort_fields = $N) to list query - Add replaced_split_ids count verification in publish_metrics_splits - Bind compaction scope query parameters ingest/config.rs: - Add table_config: TableConfig field to ParquetIngestConfig Co-Authored-By: Claude Opus 4.6 (1M context) --- .../quickwit-metastore/src/metastore/mod.rs | 15 ++++++++++++++ .../src/metastore/postgres/metastore.rs | 20 +++++++++++++++++-- .../src/index/config.rs | 4 ++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index ddfee25afab..b1bc686107a 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -75,6 +75,10 @@ pub struct ListMetricsSplitsQuery { pub tag_region: Option, /// Host tag filter. pub tag_host: Option, + /// Window start filter for compaction scope queries. + pub window_start: Option, + /// Sort fields filter for compaction scope queries. + pub sort_fields: Option, /// Limit number of results. pub limit: Option, } @@ -107,6 +111,17 @@ impl ListMetricsSplitsQuery { self.metric_names = names; self } + + /// Filter by compaction scope (window_start + sort_fields). + pub fn with_compaction_scope( + mut self, + window_start: i64, + sort_fields: impl Into, + ) -> Self { + self.window_start = Some(window_start); + self.sort_fields = Some(sort_fields.into()); + self + } } /// Splits batch size returned by the stream splits API diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index a4b93b0f659..49da0df7e12 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2167,13 +2167,13 @@ impl MetastoreService for PostgresqlMetastore { return Err(MetastoreError::FailedPrecondition { entity, message }); } - // Verify all replaced splits were marked for deletion + // Verify all replaced splits were marked for deletion. if marked_count as usize != replaced_split_ids.len() { let entity = EntityKind::Splits { split_ids: replaced_split_ids.clone(), }; let message = format!( - "expected to mark {} splits for deletion, but only {} were in Published state", + "expected to replace {} splits, but only {} were in Published state", replaced_split_ids.len(), marked_count ); @@ -2273,6 +2273,16 @@ impl MetastoreService for PostgresqlMetastore { param_idx += 1; } + // Compaction scope filters + if query.window_start.is_some() { + sql.push_str(&format!(" AND window_start = ${}", param_idx)); + param_idx += 1; + } + if query.sort_fields.is_some() { + sql.push_str(&format!(" AND sort_fields = ${}", param_idx)); + param_idx += 1; + } + sql.push_str(" ORDER BY time_range_start ASC"); // Add limit @@ -2332,6 +2342,12 @@ impl MetastoreService for PostgresqlMetastore { if let Some(ref host) = query.tag_host { query_builder = query_builder.bind(host); } + if let Some(ws) = query.window_start { + query_builder = query_builder.bind(ws); + } + if let Some(ref sf) = query.sort_fields { + query_builder = query_builder.bind(sf); + } if let Some(limit) = query.limit { query_builder = query_builder.bind(limit as i64); } diff --git a/quickwit/quickwit-parquet-engine/src/index/config.rs b/quickwit/quickwit-parquet-engine/src/index/config.rs index adf59c992be..7712b2545f8 100644 --- a/quickwit/quickwit-parquet-engine/src/index/config.rs +++ b/quickwit/quickwit-parquet-engine/src/index/config.rs @@ -17,6 +17,7 @@ use std::sync::OnceLock; use crate::storage::ParquetWriterConfig; +use crate::table_config::TableConfig; /// Default maximum rows to accumulate before flushing to split. const DEFAULT_MAX_ROWS: usize = 1_000_000; @@ -58,6 +59,8 @@ pub struct ParquetIndexingConfig { pub max_bytes: usize, /// Parquet writer configuration for split creation. pub writer_config: ParquetWriterConfig, + /// Table-level configuration (sort fields, window duration, product type). + pub table_config: TableConfig, } impl Default for ParquetIndexingConfig { @@ -66,6 +69,7 @@ impl Default for ParquetIndexingConfig { max_rows: get_max_rows_from_env(), max_bytes: get_max_bytes_from_env(), writer_config: ParquetWriterConfig::default(), + table_config: TableConfig::default(), } } } From 9ca263ded043323e10d331ba14e26392b12c5137 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 10 Mar 2026 05:51:34 -0400 Subject: [PATCH 3/9] =?UTF-8?q?fix(31):=20final=20gap=20fixes=20=E2=80=94?= =?UTF-8?q?=20file-backed=20scope=20filter,=20META-07=20test,=20dead=20cod?= =?UTF-8?q?e=20removal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - file_backed_index/mod.rs: Add window_start and sort_fields filtering to metrics_split_matches_query() for compaction scope queries - writer.rs: Add test_meta07_self_describing_parquet_roundtrip test (writes compaction metadata to Parquet, reads back from cold file, verifies all fields roundtrip correctly) - fields.rs: Remove dead sort_order() method (replaced by TableConfig) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../file_backed/file_backed_index/mod.rs | 12 ++ .../src/schema/fields.rs | 143 +++++++++++++++++- .../quickwit-parquet-engine/src/schema/mod.rs | 4 +- .../src/storage/writer.rs | 82 ++++++++++ 4 files changed, 238 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index e35618b99f8..16d1e696ad5 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -979,6 +979,18 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp } } + // Filter by compaction scope + if let Some(ws) = query.window_start { + if split.metadata.window_start() != Some(ws) { + return false; + } + } + if let Some(ref sf) = query.sort_fields { + if split.metadata.sort_fields != *sf { + return false; + } + } + true } diff --git a/quickwit/quickwit-parquet-engine/src/schema/fields.rs b/quickwit/quickwit-parquet-engine/src/schema/fields.rs index 9f46dcf3b8c..b5d149a5b51 100644 --- a/quickwit/quickwit-parquet-engine/src/schema/fields.rs +++ b/quickwit/quickwit-parquet-engine/src/schema/fields.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Parquet field definitions with sort order constants and validation. +//! Parquet field definitions with column metadata, sort order constants, and validation. use anyhow::{Result, bail}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Field, Fields}; +use parquet::variant::VariantType; /// Required field names that must exist in every batch. pub const REQUIRED_FIELDS: &[&str] = &["metric_name", "metric_type", "timestamp_secs", "value"]; @@ -31,6 +32,144 @@ pub const SORT_ORDER: &[&str] = &[ "timestamp_secs", ]; +/// All fields in the parquet schema. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ParquetField { + MetricName, + MetricType, + MetricUnit, + TimestampSecs, + StartTimestampSecs, + Value, + TagService, + TagEnv, + TagDatacenter, + TagRegion, + TagHost, + Attributes, + ServiceName, + ResourceAttributes, +} + +impl ParquetField { + /// Field name as stored in Parquet. + pub fn name(&self) -> &'static str { + match self { + Self::MetricName => "metric_name", + Self::MetricType => "metric_type", + Self::MetricUnit => "metric_unit", + Self::TimestampSecs => "timestamp_secs", + Self::StartTimestampSecs => "start_timestamp_secs", + Self::Value => "value", + Self::TagService => "tag_service", + Self::TagEnv => "tag_env", + Self::TagDatacenter => "tag_datacenter", + Self::TagRegion => "tag_region", + Self::TagHost => "tag_host", + Self::Attributes => "attributes", + Self::ServiceName => "service_name", + Self::ResourceAttributes => "resource_attributes", + } + } + + /// Whether this field is nullable. + pub fn nullable(&self) -> bool { + matches!( + self, + Self::MetricUnit + | Self::StartTimestampSecs + | Self::TagService + | Self::TagEnv + | Self::TagDatacenter + | Self::TagRegion + | Self::TagHost + | Self::Attributes + | Self::ResourceAttributes + ) + } + + /// Arrow DataType for this field. + /// Use dictionary encoding for high-cardinality strings. + pub fn arrow_type(&self) -> DataType { + match self { + // Dictionary-encoded strings for high cardinality + Self::MetricName | Self::ServiceName => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + } + // Dictionary-encoded optional tags + Self::TagService + | Self::TagEnv + | Self::TagDatacenter + | Self::TagRegion + | Self::TagHost => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + } + // Enum stored as UInt8 + Self::MetricType => DataType::UInt8, + // Timestamps as UInt64 seconds + Self::TimestampSecs | Self::StartTimestampSecs => DataType::UInt64, + // Metric value + Self::Value => DataType::Float64, + // Plain string for metric unit + Self::MetricUnit => DataType::Utf8, + // VARIANT type for semi-structured attributes + // Uses the Parquet Variant binary encoding format + Self::Attributes | Self::ResourceAttributes => { + // VARIANT is stored as a struct with metadata and value BinaryView fields + // VariantArrayBuilder produces BinaryView, not Binary + DataType::Struct(Fields::from(vec![ + Field::new("metadata", DataType::BinaryView, false), + Field::new("value", DataType::BinaryView, false), + ])) + } + } + } + + /// Convert to Arrow Field. + pub fn to_arrow_field(&self) -> Field { + let field = Field::new(self.name(), self.arrow_type(), self.nullable()); + + // Add VARIANT extension type metadata for attributes fields + match self { + Self::Attributes | Self::ResourceAttributes => field.with_extension_type(VariantType), + _ => field, + } + } + + /// All fields in schema order. + pub fn all() -> &'static [ParquetField] { + &[ + Self::MetricName, + Self::MetricType, + Self::MetricUnit, + Self::TimestampSecs, + Self::StartTimestampSecs, + Self::Value, + Self::TagService, + Self::TagEnv, + Self::TagDatacenter, + Self::TagRegion, + Self::TagHost, + Self::Attributes, + Self::ServiceName, + Self::ResourceAttributes, + ] + } + + /// Get the column index in the schema. + pub fn column_index(&self) -> usize { + Self::all().iter().position(|f| f == self).unwrap() + } + + /// Look up a ParquetField by its Parquet column name. + /// + /// Used by the sort fields resolver to map sort schema column names + /// to physical schema columns. + pub fn from_name(name: &str) -> Option { + Self::all().iter().find(|f| f.name() == name).copied() + } +} + /// Arrow type for required fields by name. pub fn required_field_type(name: &str) -> Option { match name { diff --git a/quickwit/quickwit-parquet-engine/src/schema/mod.rs b/quickwit/quickwit-parquet-engine/src/schema/mod.rs index f9b5c06d9c4..71026ddf9c8 100644 --- a/quickwit/quickwit-parquet-engine/src/schema/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/schema/mod.rs @@ -20,5 +20,7 @@ mod fields; mod parquet; -pub use fields::{REQUIRED_FIELDS, SORT_ORDER, required_field_type, validate_required_fields}; +pub use fields::{ + ParquetField, REQUIRED_FIELDS, SORT_ORDER, required_field_type, validate_required_fields, +}; pub use parquet::ParquetSchema; diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index d45c0a0c9cb..06ff4be2061 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -702,6 +702,88 @@ mod tests { std::fs::remove_file(&path).ok(); } + /// META-07 compliance: Prove the Parquet file is truly self-describing by + /// writing compaction metadata, reading it back from a cold file (no in-memory + /// state), and reconstructing the MetricsSplitMetadata compaction fields from + /// ONLY the Parquet key_value_metadata. + #[test] + fn test_meta07_self_describing_parquet_roundtrip() { + use std::fs::File; + + use crate::split::{SplitId, TimeRange}; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let sort_schema_str = "metric_name|host|env|timestamp/V2"; + let window_start_secs: i64 = 1700006400; + let window_duration: u32 = 900; + let merge_ops: u32 = 7; + let row_keys_bytes: Vec = vec![0x0A, 0x03, 0x63, 0x70, 0x75]; + + let original = MetricsSplitMetadata::builder() + .split_id(SplitId::new("self-describing-test")) + .index_uid("metrics-prod:00000000000000000000000000") + .time_range(TimeRange::new(1700006400, 1700007300)) + .window_start_secs(window_start_secs) + .window_duration_secs(window_duration) + .sort_fields(sort_schema_str) + .num_merge_ops(merge_ops) + .row_keys_proto(row_keys_bytes.clone()) + .build(); + + let config = ParquetWriterConfig::default(); + let writer = ParquetWriter::new(config, &TableConfig::default()); + let batch = create_test_batch(); + + let temp_dir = std::env::temp_dir(); + let path = temp_dir.join("test_self_describing_roundtrip.parquet"); + writer + .write_to_file_with_metadata(&batch, &path, Some(&original)) + .unwrap(); + + // Read phase: open a cold file and reconstruct fields from kv_metadata. + let file = File::open(&path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let file_metadata = reader.metadata().file_metadata(); + let kv_metadata = file_metadata + .key_value_metadata() + .expect("self-describing file must have kv_metadata"); + + let find_kv = |key: &str| -> Option { + kv_metadata + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }; + + let recovered_sort_schema = find_kv(PARQUET_META_SORT_FIELDS) + .expect("self-describing file must contain qh.sort_fields"); + let recovered_window_start: i64 = find_kv(PARQUET_META_WINDOW_START) + .expect("self-describing file must contain qh.window_start") + .parse() + .expect("window_start must be parseable as i64"); + let recovered_window_duration: u32 = find_kv(PARQUET_META_WINDOW_DURATION) + .expect("self-describing file must contain qh.window_duration_secs") + .parse() + .expect("window_duration must be parseable as u32"); + let recovered_merge_ops: u32 = find_kv(PARQUET_META_NUM_MERGE_OPS) + .expect("self-describing file must contain qh.num_merge_ops") + .parse() + .expect("num_merge_ops must be parseable as u32"); + let recovered_row_keys_b64 = find_kv(PARQUET_META_ROW_KEYS) + .expect("self-describing file must contain qh.row_keys"); + let recovered_row_keys = BASE64 + .decode(&recovered_row_keys_b64) + .expect("row_keys must be valid base64"); + + assert_eq!(recovered_sort_schema, sort_schema_str); + assert_eq!(recovered_window_start, window_start_secs); + assert_eq!(recovered_window_duration, window_duration); + assert_eq!(recovered_merge_ops, merge_ops); + assert_eq!(recovered_row_keys, row_keys_bytes); + + std::fs::remove_file(&path).ok(); + } + #[test] fn test_build_compaction_kv_metadata_fully_populated() { use crate::split::{SplitId, TimeRange}; From 73a20eff41a9ff040fef12cdab3424b4bdf9b32c Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 10 Mar 2026 10:22:06 -0400 Subject: [PATCH 4/9] fix(31): correct postgres types for window_duration_secs and zonemap_regexes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gap 1: Change window_duration_secs from i32 to Option in both PgMetricsSplit and InsertableMetricsSplit. Pre-Phase-31 splits now correctly map 0 → NULL in PostgreSQL, enabling Phase 32 compaction queries to use `WHERE window_duration_secs IS NOT NULL` instead of the fragile `WHERE window_duration_secs > 0`. Gap 2: Change zonemap_regexes from String to serde_json::Value in both structs. This maps directly to JSONB in sqlx, avoiding ambiguity when PostgreSQL JSONB operators are used in Phase 34/35 zonemap pruning. Gap 3: Add two missing tests: - test_insertable_from_metadata_with_compaction_fields: verifies all 6 compaction fields round-trip through InsertableMetricsSplit - test_insertable_from_metadata_pre_phase31_defaults: verifies pre-Phase-31 metadata produces window_duration_secs: None, zonemap_regexes: json!({}) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/metastore/postgres/metastore.rs | 12 +-- .../src/split/postgres.rs | 79 ++++++++++++++++--- 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 49da0df7e12..11ad65c6753 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1844,15 +1844,11 @@ impl MetastoreService for PostgresqlMetastore { size_bytes_list.push(insertable.size_bytes); split_metadata_jsons.push(insertable.split_metadata_json); window_starts.push(insertable.window_start); - window_duration_secs_list.push(if insertable.window_duration_secs == 0 { - None - } else { - Some(insertable.window_duration_secs) - }); + window_duration_secs_list.push(insertable.window_duration_secs); sort_fields_list.push(insertable.sort_fields); num_merge_ops_list.push(insertable.num_merge_ops); row_keys_list.push(insertable.row_keys); - zonemap_regexes_json_list.push(insertable.zonemap_regexes); + zonemap_regexes_json_list.push(insertable.zonemap_regexes.to_string()); } info!( @@ -2384,11 +2380,11 @@ impl MetastoreService for PostgresqlMetastore { // to_metadata() — the SQL columns are only used for // filtering and SS-5 consistency checks. window_start: None, - window_duration_secs: 0, + window_duration_secs: None, sort_fields: String::new(), num_merge_ops: 0, row_keys: None, - zonemap_regexes: "{}".to_string(), + zonemap_regexes: serde_json::json!({}), }; let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged); diff --git a/quickwit/quickwit-parquet-engine/src/split/postgres.rs b/quickwit/quickwit-parquet-engine/src/split/postgres.rs index 0ea753181bb..6b63f46f17f 100644 --- a/quickwit/quickwit-parquet-engine/src/split/postgres.rs +++ b/quickwit/quickwit-parquet-engine/src/split/postgres.rs @@ -72,11 +72,11 @@ pub struct PgMetricsSplit { pub split_metadata_json: String, pub update_timestamp: i64, pub window_start: Option, - pub window_duration_secs: i32, + pub window_duration_secs: Option, pub sort_fields: String, pub num_merge_ops: i32, pub row_keys: Option>, - pub zonemap_regexes: String, + pub zonemap_regexes: serde_json::Value, } /// Insertable row for metrics_splits table. @@ -99,11 +99,11 @@ pub struct InsertableMetricsSplit { pub size_bytes: i64, pub split_metadata_json: String, pub window_start: Option, - pub window_duration_secs: i32, + pub window_duration_secs: Option, pub sort_fields: String, pub num_merge_ops: i32, pub row_keys: Option>, - pub zonemap_regexes: String, + pub zonemap_regexes: serde_json::Value, } impl InsertableMetricsSplit { @@ -114,12 +114,6 @@ impl InsertableMetricsSplit { ) -> Result { let split_metadata_json = serde_json::to_string(metadata)?; - let zonemap_regexes_json = if metadata.zonemap_regexes.is_empty() { - "{}".to_string() - } else { - serde_json::to_string(&metadata.zonemap_regexes)? - }; - Ok(Self { split_id: metadata.split_id.as_str().to_string(), split_state: state.as_str().to_string(), @@ -137,11 +131,15 @@ impl InsertableMetricsSplit { size_bytes: metadata.size_bytes as i64, split_metadata_json, window_start: metadata.window_start(), - window_duration_secs: metadata.window_duration_secs() as i32, + window_duration_secs: { + let dur = metadata.window_duration_secs(); + if dur > 0 { Some(dur as i32) } else { None } + }, sort_fields: metadata.sort_fields.clone(), num_merge_ops: metadata.num_merge_ops as i32, row_keys: metadata.row_keys_proto.clone(), - zonemap_regexes: zonemap_regexes_json, + zonemap_regexes: serde_json::to_value(&metadata.zonemap_regexes) + .unwrap_or_else(|_| serde_json::json!({})), }) } } @@ -165,7 +163,7 @@ impl PgMetricsSplit { debug_assert_eq!(metadata.window_start(), self.window_start); debug_assert_eq!( metadata.window_duration_secs(), - self.window_duration_secs as u32 + self.window_duration_secs.unwrap_or(0) as u32 ); debug_assert_eq!(metadata.sort_fields, self.sort_fields); debug_assert_eq!(metadata.num_merge_ops, self.num_merge_ops as u32); @@ -257,6 +255,61 @@ mod tests { assert_eq!(insertable.size_bytes, 1024 * 1024); } + #[test] + fn test_insertable_from_metadata_with_compaction_fields() { + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("compaction-test")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(500) + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .num_merge_ops(2) + .row_keys_proto(vec![0x08, 0x01]) + .add_zonemap_regex("metric_name", "cpu\\..*") + .build(); + + let insertable = + InsertableMetricsSplit::from_metadata(&metadata, MetricsSplitState::Published) + .expect("conversion should succeed"); + + assert_eq!(insertable.window_start, Some(1700000000)); + assert_eq!(insertable.window_duration_secs, Some(3600)); + assert_eq!(insertable.sort_fields, "metric_name|host|timestamp/V2"); + assert_eq!(insertable.num_merge_ops, 2); + assert_eq!(insertable.row_keys, Some(vec![0x08, 0x01])); + assert!(insertable.zonemap_regexes.is_object()); + assert_eq!( + insertable.zonemap_regexes["metric_name"], + serde_json::json!("cpu\\..*") + ); + } + + #[test] + fn test_insertable_from_metadata_pre_phase31_defaults() { + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("pre-phase31")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .build(); + + let insertable = + InsertableMetricsSplit::from_metadata(&metadata, MetricsSplitState::Staged) + .expect("conversion should succeed"); + + assert!(insertable.window_start.is_none()); + assert!( + insertable.window_duration_secs.is_none(), + "pre-Phase-31 splits should have NULL window_duration_secs" + ); + assert_eq!(insertable.sort_fields, ""); + assert_eq!(insertable.num_merge_ops, 0); + assert!(insertable.row_keys.is_none()); + assert_eq!(insertable.zonemap_regexes, serde_json::json!({})); + } + #[test] fn test_pg_split_to_metadata_roundtrip() { let original = MetricsSplitMetadata::builder() From 75c15a0814938273ac8a7041c22fd47565281514 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Fri, 27 Mar 2026 09:43:59 -0400 Subject: [PATCH 5/9] style: rustfmt Co-Authored-By: Claude Opus 4.6 (1M context) --- .../file_backed/file_backed_index/mod.rs | 16 ++++++++-------- .../src/metastore/postgres/metastore.rs | 9 +++++---- .../src/storage/writer.rs | 7 ++++--- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 16d1e696ad5..152dc0cd84f 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -980,15 +980,15 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp } // Filter by compaction scope - if let Some(ws) = query.window_start { - if split.metadata.window_start() != Some(ws) { - return false; - } + if let Some(ws) = query.window_start + && split.metadata.window_start() != Some(ws) + { + return false; } - if let Some(ref sf) = query.sort_fields { - if split.metadata.sort_fields != *sf { - return false; - } + if let Some(ref sf) = query.sort_fields + && split.metadata.sort_fields != *sf + { + return false; } true diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 11ad65c6753..b09eb4afd97 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -75,9 +75,9 @@ use crate::file_backed::MutationOccurred; use crate::metastore::postgres::model::Shards; use crate::metastore::postgres::utils::split_maturity_timestamp; use crate::metastore::{ - IndexesMetadataResponseExt, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, - PublishMetricsSplitsRequestExt, PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, - StageMetricsSplitsRequestExt, UpdateSourceRequestExt, use_shard_api, + IndexesMetadataResponseExt, ListMetricsSplitsResponseExt, PublishMetricsSplitsRequestExt, + PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, StageMetricsSplitsRequestExt, + UpdateSourceRequestExt, use_shard_api, }; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, @@ -1804,7 +1804,8 @@ impl MetastoreService for PostgresqlMetastore { let mut size_bytes_list = Vec::with_capacity(splits_metadata.len()); let mut split_metadata_jsons = Vec::with_capacity(splits_metadata.len()); let mut window_starts: Vec> = Vec::with_capacity(splits_metadata.len()); - let mut window_duration_secs_list: Vec> = Vec::with_capacity(splits_metadata.len()); + let mut window_duration_secs_list: Vec> = + Vec::with_capacity(splits_metadata.len()); let mut sort_fields_list: Vec = Vec::with_capacity(splits_metadata.len()); let mut num_merge_ops_list: Vec = Vec::with_capacity(splits_metadata.len()); let mut row_keys_list: Vec>> = Vec::with_capacity(splits_metadata.len()); diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index 06ff4be2061..3ec633573bc 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -710,9 +710,10 @@ mod tests { fn test_meta07_self_describing_parquet_roundtrip() { use std::fs::File; - use crate::split::{SplitId, TimeRange}; use parquet::file::reader::{FileReader, SerializedFileReader}; + use crate::split::{SplitId, TimeRange}; + let sort_schema_str = "metric_name|host|env|timestamp/V2"; let window_start_secs: i64 = 1700006400; let window_duration: u32 = 900; @@ -769,8 +770,8 @@ mod tests { .expect("self-describing file must contain qh.num_merge_ops") .parse() .expect("num_merge_ops must be parseable as u32"); - let recovered_row_keys_b64 = find_kv(PARQUET_META_ROW_KEYS) - .expect("self-describing file must contain qh.row_keys"); + let recovered_row_keys_b64 = + find_kv(PARQUET_META_ROW_KEYS).expect("self-describing file must contain qh.row_keys"); let recovered_row_keys = BASE64 .decode(&recovered_row_keys_b64) .expect("row_keys must be valid base64"); From ef218594746df4f195cf6c01b8aab2a23ee8b3d1 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 1 Apr 2026 11:20:16 -0400 Subject: [PATCH 6/9] test(31): add metrics split test suite to shared metastore_test_suite! macro 11 tests covering the full metrics split lifecycle: - stage (happy path + non-existent index error) - stage upsert (ON CONFLICT update) - list by state, time range, metric name, compaction scope - publish (happy path + non-existent split error) - mark for deletion - delete (happy path + idempotent non-existent) Tests are generic and run against both file-backed and PostgreSQL backends. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../quickwit-metastore/src/tests/metrics.rs | 652 ++++++++++++++++++ quickwit/quickwit-metastore/src/tests/mod.rs | 86 +++ 2 files changed, 738 insertions(+) create mode 100644 quickwit/quickwit-metastore/src/tests/metrics.rs diff --git a/quickwit/quickwit-metastore/src/tests/metrics.rs b/quickwit/quickwit-metastore/src/tests/metrics.rs new file mode 100644 index 00000000000..7288cbdfa13 --- /dev/null +++ b/quickwit/quickwit-metastore/src/tests/metrics.rs @@ -0,0 +1,652 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_common::rand::append_random_suffix; +use quickwit_config::IndexConfig; +use quickwit_parquet_engine::split::{ + MetricsSplitMetadata, MetricsSplitState, SplitId, TimeRange, TAG_ENV, TAG_HOST, TAG_SERVICE, +}; +use quickwit_proto::metastore::{ + CreateIndexRequest, DeleteMetricsSplitsRequest, EntityKind, ListMetricsSplitsRequest, + MarkMetricsSplitsForDeletionRequest, MetastoreError, PublishMetricsSplitsRequest, + StageMetricsSplitsRequest, +}; +use quickwit_proto::types::IndexUid; + +use super::DefaultForTest; +use crate::tests::cleanup_index; +use crate::{ + CreateIndexRequestExt, ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, + ListMetricsSplitsResponseExt, MetastoreServiceExt, StageMetricsSplitsRequestExt, +}; + +/// Helper to create a test index and return the actual IndexUid assigned by the metastore. +async fn create_test_index( + metastore: &mut dyn MetastoreServiceExt, + index_id: &str, +) -> IndexUid { + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(index_id, &index_uri); + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone() +} + +/// Build a simple MetricsSplitMetadata for tests. +fn build_test_split( + split_id: &str, + index_uid: &IndexUid, + time_range: TimeRange, +) -> MetricsSplitMetadata { + MetricsSplitMetadata::builder() + .split_id(SplitId::new(split_id)) + .index_uid(index_uid.to_string()) + .time_range(time_range) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file(format!("{split_id}.parquet")) + .build() +} + +pub async fn test_metastore_stage_metrics_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-stage-metrics-splits"); + + // Stage on a non-existent index should fail. + { + let fake_uid = IndexUid::new_with_random_ulid("index-not-found"); + let split = build_test_split("split-1", &fake_uid, TimeRange::new(1000, 2000)); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(fake_uid, &[split]).unwrap(); + let error = metastore.stage_metrics_splits(request).await.unwrap_err(); + assert!(matches!( + error, + MetastoreError::NotFound(EntityKind::Index { .. }) + )); + } + + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage two splits. + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Verify both splits are listed in Staged state. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + + for split in &splits { + assert_eq!(split.state, MetricsSplitState::Staged); + } + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_stage_metrics_splits_upsert< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-stage-metrics-upsert"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id = format!("{index_id}--split-1"); + + // Stage a split with 100 rows. + let split_v1 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(&split_id)) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file(format!("{split_id}.parquet")) + .build(); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_v1], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Stage the same split_id again with 200 rows (upsert). + let split_v2 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(&split_id)) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(200) + .size_bytes(8192) + .add_metric_name("cpu.usage") + .parquet_file(format!("{split_id}.parquet")) + .build(); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_v2], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Verify only one split exists and it has the updated num_rows. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.num_rows, 200); + assert_eq!(splits[0].metadata.size_bytes, 8192); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_state< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-by-state"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage two splits. + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Publish split_1 only. + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + // List only Published splits. + { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_1); + assert_eq!(splits[0].state, MetricsSplitState::Published); + } + + // List only Staged splits. + { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_2); + assert_eq!(splits[0].state, MetricsSplitState::Staged); + } + + // List both states. + { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string(), "Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + } + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_time_range< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-time-range"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage splits at different time ranges. + let split_1 = build_test_split( + &format!("{index_id}--split-1"), + &index_uid, + TimeRange::new(1000, 2000), + ); + let split_2 = build_test_split( + &format!("{index_id}--split-2"), + &index_uid, + TimeRange::new(3000, 4000), + ); + let split_3 = build_test_split( + &format!("{index_id}--split-3"), + &index_uid, + TimeRange::new(5000, 6000), + ); + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2, split_3], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Query for time range that overlaps only the first two splits. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]) + .with_time_range(1500, 3500); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + // Should match splits whose time range overlaps [1500, 3500]. + // split_1: [1000,2000) overlaps [1500,3500] => yes + // split_2: [3000,4000) overlaps [1500,3500] => yes + // split_3: [5000,6000) overlaps [1500,3500] => no + assert_eq!(splits.len(), 2); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_metric_name< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-by-name"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage splits with different metric names. + let split_1 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-1"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-1.parquet") + .build(); + + let split_2 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-2"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("memory.used") + .parquet_file("split-2.parquet") + .build(); + + let split_3 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-3"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .add_metric_name("memory.used") + .parquet_file("split-3.parquet") + .build(); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2, split_3], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Query for "cpu.usage" should return split_1 and split_3. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]) + .with_metric_names(vec!["cpu.usage".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_compaction_scope< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-compaction"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage splits with different compaction scopes. + let split_1 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-1"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-1.parquet") + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .build(); + + let split_2 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-2"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-2.parquet") + .window_start_secs(1700003600) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .build(); + + let split_3 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-3"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-3.parquet") + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("different_schema/V1") + .build(); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2, split_3], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Query by compaction scope: window_start=1700000000, sort_fields matching split_1. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]) + .with_compaction_scope(1700000000, "metric_name|host|timestamp/V2"); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + // Only split_1 matches both window_start and sort_fields. + assert_eq!(splits.len(), 1); + assert_eq!( + splits[0].metadata.split_id.as_str(), + format!("{index_id}--split-1") + ); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_publish_metrics_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-publish-metrics-splits"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + + // Stage both. + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Publish both. + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + // Verify they are now Published. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + + for split in &splits { + assert_eq!(split.state, MetricsSplitState::Published); + } + + // Verify no Staged splits remain. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 0); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_publish_metrics_splits_nonexistent< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-publish-metrics-nonexistent"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Publish a split_id that was never staged. + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec!["nonexistent-split".to_string()], + ..Default::default() + }; + let error = metastore + .publish_metrics_splits(publish_request) + .await + .unwrap_err(); + assert!( + matches!( + error, + MetastoreError::NotFound(EntityKind::Splits { .. }) + ), + "expected NotFound(Splits), got: {error:?}" + ); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_mark_metrics_splits_for_deletion< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-mark-metrics-deletion"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + + // Stage and publish. + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + // Mark split_1 for deletion. + let mark_request = MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec![split_id_1.clone()], + }; + metastore + .mark_metrics_splits_for_deletion(mark_request) + .await + .unwrap(); + + // Verify split_1 is MarkedForDeletion. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["MarkedForDeletion".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_1); + + // Verify split_2 is still Published. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_2); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_delete_metrics_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-delete-metrics-splits"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id_1 = format!("{index_id}--split-1"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + + // Stage, publish, mark for deletion. + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + let mark_request = MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec![split_id_1.clone()], + }; + metastore + .mark_metrics_splits_for_deletion(mark_request) + .await + .unwrap(); + + // Delete. + let delete_request = DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec![split_id_1.clone()], + }; + metastore + .delete_metrics_splits(delete_request) + .await + .unwrap(); + + // Verify it is gone (list all states). + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()).with_split_states(vec![ + "Staged".to_string(), + "Published".to_string(), + "MarkedForDeletion".to_string(), + ]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 0); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_delete_metrics_splits_nonexistent< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-delete-metrics-nonexistent"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Delete a split_id that doesn't exist — should succeed (idempotent). + let delete_request = DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec!["nonexistent-split".to_string()], + }; + metastore + .delete_metrics_splits(delete_request) + .await + .unwrap(); + + cleanup_index(&mut metastore, index_uid).await; +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index d6e549baf25..4433221a87d 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -29,6 +29,7 @@ pub(crate) mod delete_task; pub(crate) mod get_identity; pub(crate) mod index; pub(crate) mod list_splits; +pub(crate) mod metrics; pub(crate) mod shard; pub(crate) mod source; pub(crate) mod split; @@ -575,6 +576,91 @@ macro_rules! metastore_test_suite { let _ = tracing_subscriber::fmt::try_init(); $crate::tests::get_identity::test_metastore_get_identity::<$metastore_type>().await; } + + // Metrics Split API tests + // + // - stage_metrics_splits + // - publish_metrics_splits + // - list_metrics_splits + // - mark_metrics_splits_for_deletion + // - delete_metrics_splits + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_stage_metrics_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_stage_metrics_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_stage_metrics_splits_upsert() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_stage_metrics_splits_upsert::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_state() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_state::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_time_range() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_time_range::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_metric_name() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_metric_name::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_compaction_scope() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_compaction_scope::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_publish_metrics_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_publish_metrics_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_publish_metrics_splits_nonexistent() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_publish_metrics_splits_nonexistent::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_mark_metrics_splits_for_deletion() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_mark_metrics_splits_for_deletion::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_delete_metrics_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_delete_metrics_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_delete_metrics_splits_nonexistent() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_delete_metrics_splits_nonexistent::<$metastore_type>().await; + } } }; } From b4dac4644182577bafb35a5795e1ce93d81d33bc Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 1 Apr 2026 11:29:42 -0400 Subject: [PATCH 7/9] fix(31): read compaction columns in list_metrics_splits, fix cleanup_index FK --- .../src/metastore/postgres/metastore.rs | 79 ++++++++----------- .../quickwit-metastore/src/tests/metrics.rs | 17 ++-- quickwit/quickwit-metastore/src/tests/mod.rs | 47 +++++++++++ 3 files changed, 91 insertions(+), 52 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index b09eb4afd97..646299ae62d 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2218,7 +2218,13 @@ impl MetastoreService for PostgresqlMetastore { num_rows, size_bytes, split_metadata_json, - EXTRACT(EPOCH FROM update_timestamp)::bigint as update_timestamp + EXTRACT(EPOCH FROM update_timestamp)::bigint as update_timestamp, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes FROM metrics_splits WHERE index_uid = $1 "#, @@ -2288,27 +2294,7 @@ impl MetastoreService for PostgresqlMetastore { } // Execute query with bindings - let mut query_builder = sqlx::query_as::< - _, - ( - String, // split_id - String, // split_state - String, // index_uid - i64, // time_range_start - i64, // time_range_end - Vec, // metric_names - Option>, // tag_service - Option>, // tag_env - Option>, // tag_datacenter - Option>, // tag_region - Option>, // tag_host - Vec, // high_cardinality_tag_keys - i64, // num_rows - i64, // size_bytes - String, // split_metadata_json - i64, // update_timestamp - ), - >(&sql); + let mut query_builder = sqlx::query(&sql); query_builder = query_builder.bind(query.index_uid.to_string()); @@ -2360,32 +2346,31 @@ impl MetastoreService for PostgresqlMetastore { .filter_map(|row| { use quickwit_parquet_engine::split::{MetricsSplitState, PgMetricsSplit}; + use sqlx::Row as _; + let pg_split = PgMetricsSplit { - split_id: row.0, - split_state: row.1, - index_uid: row.2, - time_range_start: row.3, - time_range_end: row.4, - metric_names: row.5, - tag_service: row.6, - tag_env: row.7, - tag_datacenter: row.8, - tag_region: row.9, - tag_host: row.10, - high_cardinality_tag_keys: row.11, - num_rows: row.12, - size_bytes: row.13, - split_metadata_json: row.14, - update_timestamp: row.15, - // Compaction fields are read from the JSON blob via - // to_metadata() — the SQL columns are only used for - // filtering and SS-5 consistency checks. - window_start: None, - window_duration_secs: None, - sort_fields: String::new(), - num_merge_ops: 0, - row_keys: None, - zonemap_regexes: serde_json::json!({}), + split_id: row.get("split_id"), + split_state: row.get("split_state"), + index_uid: row.get("index_uid"), + time_range_start: row.get("time_range_start"), + time_range_end: row.get("time_range_end"), + metric_names: row.get("metric_names"), + tag_service: row.get("tag_service"), + tag_env: row.get("tag_env"), + tag_datacenter: row.get("tag_datacenter"), + tag_region: row.get("tag_region"), + tag_host: row.get("tag_host"), + high_cardinality_tag_keys: row.get("high_cardinality_tag_keys"), + num_rows: row.get("num_rows"), + size_bytes: row.get("size_bytes"), + split_metadata_json: row.get("split_metadata_json"), + update_timestamp: row.get("update_timestamp"), + window_start: row.get("window_start"), + window_duration_secs: row.get("window_duration_secs"), + sort_fields: row.get("sort_fields"), + num_merge_ops: row.get("num_merge_ops"), + row_keys: row.get("row_keys"), + zonemap_regexes: row.get("zonemap_regexes"), }; let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged); diff --git a/quickwit/quickwit-metastore/src/tests/metrics.rs b/quickwit/quickwit-metastore/src/tests/metrics.rs index 7288cbdfa13..b44fd8ecc22 100644 --- a/quickwit/quickwit-metastore/src/tests/metrics.rs +++ b/quickwit/quickwit-metastore/src/tests/metrics.rs @@ -495,10 +495,12 @@ pub async fn test_metastore_publish_metrics_splits_nonexistent< .publish_metrics_splits(publish_request) .await .unwrap_err(); + // File-backed: NotFound. Postgres: FailedPrecondition (count mismatch). assert!( matches!( error, MetastoreError::NotFound(EntityKind::Splits { .. }) + | MetastoreError::FailedPrecondition { .. } ), "expected NotFound(Splits), got: {error:?}" ); @@ -638,15 +640,20 @@ pub async fn test_metastore_delete_metrics_splits_nonexistent< let index_id = append_random_suffix("test-delete-metrics-nonexistent"); let index_uid = create_test_index(&mut metastore, &index_id).await; - // Delete a split_id that doesn't exist — should succeed (idempotent). + // Delete a split_id that doesn't exist. + // File-backed: succeeds silently (idempotent). + // Postgres: may return FailedPrecondition ("not marked for deletion"). + // Both behaviors are acceptable. let delete_request = DeleteMetricsSplitsRequest { index_uid: Some(index_uid.clone()), split_ids: vec!["nonexistent-split".to_string()], }; - metastore - .delete_metrics_splits(delete_request) - .await - .unwrap(); + let result = metastore.delete_metrics_splits(delete_request).await; + match &result { + Ok(_) => {} // file-backed: idempotent success + Err(MetastoreError::FailedPrecondition { .. }) => {} // postgres: not marked + Err(other) => panic!("unexpected error: {other:?}"), + } cleanup_index(&mut metastore, index_uid).await; } diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 4433221a87d..fd384670438 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -22,6 +22,8 @@ use quickwit_proto::metastore::{ DeleteIndexRequest, DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreServiceClient, MetastoreServiceGrpcClientAdapter, }; + +use crate::metastore::{ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt}; use quickwit_proto::tonic::transport::Channel; use quickwit_proto::types::IndexUid; @@ -154,6 +156,51 @@ async fn cleanup_index(metastore: &mut dyn MetastoreServiceExt, index_uid: Index .await .unwrap(); } + // Also clean up any metrics splits (they have a separate FK constraint). + let metrics_query = + crate::metastore::ListMetricsSplitsQuery::for_index(index_uid.clone()).with_split_states( + vec![ + "Staged".to_string(), + "Published".to_string(), + "MarkedForDeletion".to_string(), + ], + ); + if let Ok(list_request) = quickwit_proto::metastore::ListMetricsSplitsRequest::try_from_query( + index_uid.clone(), + &metrics_query, + ) { + if let Ok(response) = metastore.list_metrics_splits(list_request).await { + if let Ok(splits) = response.deserialize_splits() { + if !splits.is_empty() { + let split_ids: Vec = splits + .iter() + .map(|s| s.metadata.split_id.as_str().to_string()) + .collect(); + + // Mark for deletion first. + let _ = metastore + .mark_metrics_splits_for_deletion( + quickwit_proto::metastore::MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: split_ids.clone(), + }, + ) + .await; + + // Delete. + let _ = metastore + .delete_metrics_splits( + quickwit_proto::metastore::DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids, + }, + ) + .await; + } + } + } + } + // Delete index. metastore .delete_index(DeleteIndexRequest { From db51a96e063b019222dfb5d450ab01049cbfae58 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 1 Apr 2026 11:34:09 -0400 Subject: [PATCH 8/9] fix(31): correct error types for non-existent metrics splits - publish_metrics_splits: return NotFound (not FailedPrecondition) when staged splits don't exist - delete_metrics_splits: succeed silently (idempotent) for non-existent splits instead of returning FailedPrecondition - Tests now assert the correct error types on both backends Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/metastore/postgres/metastore.rs | 25 +++++++------------ .../quickwit-metastore/src/tests/metrics.rs | 22 +++++----------- 2 files changed, 15 insertions(+), 32 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 646299ae62d..bd19d8e5f14 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2151,17 +2151,13 @@ impl MetastoreService for PostgresqlMetastore { .await .map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?; - // Verify all staged splits were published + // Verify all staged splits were published. + // If some splits are missing, it means they don't exist (NotFound), + // not that they're in the wrong state (FailedPrecondition). if published_count as usize != staged_split_ids.len() { - let entity = EntityKind::Splits { + return Err(MetastoreError::NotFound(EntityKind::Splits { split_ids: staged_split_ids.clone(), - }; - let message = format!( - "expected to publish {} splits, but only {} were in Staged state", - staged_split_ids.len(), - published_count - ); - return Err(MetastoreError::FailedPrecondition { entity, message }); + })); } // Verify all replaced splits were marked for deletion. @@ -2461,7 +2457,9 @@ impl MetastoreService for PostgresqlMetastore { .await .map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?; - // Check if any splits could not be deleted + // Log if some splits were not deleted (either non-existent or not + // in MarkedForDeletion state). Delete is idempotent — we don't error + // for missing splits. if deleted_split_ids.len() != request.split_ids.len() { let not_deleted: Vec = request .split_ids @@ -2474,13 +2472,8 @@ impl MetastoreService for PostgresqlMetastore { warn!( index_uid = %request.index_uid(), not_deleted = ?not_deleted, - "some metrics splits were not in MarkedForDeletion state" + "some metrics splits were not deleted (non-existent or not marked for deletion)" ); - let entity = EntityKind::Splits { - split_ids: not_deleted, - }; - let message = "splits are not marked for deletion".to_string(); - return Err(MetastoreError::FailedPrecondition { entity, message }); } } diff --git a/quickwit/quickwit-metastore/src/tests/metrics.rs b/quickwit/quickwit-metastore/src/tests/metrics.rs index b44fd8ecc22..f478b386e8a 100644 --- a/quickwit/quickwit-metastore/src/tests/metrics.rs +++ b/quickwit/quickwit-metastore/src/tests/metrics.rs @@ -495,13 +495,8 @@ pub async fn test_metastore_publish_metrics_splits_nonexistent< .publish_metrics_splits(publish_request) .await .unwrap_err(); - // File-backed: NotFound. Postgres: FailedPrecondition (count mismatch). assert!( - matches!( - error, - MetastoreError::NotFound(EntityKind::Splits { .. }) - | MetastoreError::FailedPrecondition { .. } - ), + matches!(error, MetastoreError::NotFound(EntityKind::Splits { .. })), "expected NotFound(Splits), got: {error:?}" ); @@ -640,20 +635,15 @@ pub async fn test_metastore_delete_metrics_splits_nonexistent< let index_id = append_random_suffix("test-delete-metrics-nonexistent"); let index_uid = create_test_index(&mut metastore, &index_id).await; - // Delete a split_id that doesn't exist. - // File-backed: succeeds silently (idempotent). - // Postgres: may return FailedPrecondition ("not marked for deletion"). - // Both behaviors are acceptable. + // Delete a split_id that doesn't exist — should succeed (idempotent). let delete_request = DeleteMetricsSplitsRequest { index_uid: Some(index_uid.clone()), split_ids: vec!["nonexistent-split".to_string()], }; - let result = metastore.delete_metrics_splits(delete_request).await; - match &result { - Ok(_) => {} // file-backed: idempotent success - Err(MetastoreError::FailedPrecondition { .. }) => {} // postgres: not marked - Err(other) => panic!("unexpected error: {other:?}"), - } + metastore + .delete_metrics_splits(delete_request) + .await + .unwrap(); cleanup_index(&mut metastore, index_uid).await; } From 605708eade513a158aced5566e3be4e8a09a92ed Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 1 Apr 2026 14:17:22 -0400 Subject: [PATCH 9/9] style: rustfmt metastore tests and postgres --- .../src/metastore/postgres/metastore.rs | 1 - .../quickwit-metastore/src/tests/metrics.rs | 97 +++++++------------ quickwit/quickwit-metastore/src/tests/mod.rs | 73 +++++++------- 3 files changed, 66 insertions(+), 105 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index bd19d8e5f14..3bbbe3c2f27 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2341,7 +2341,6 @@ impl MetastoreService for PostgresqlMetastore { .into_iter() .filter_map(|row| { use quickwit_parquet_engine::split::{MetricsSplitState, PgMetricsSplit}; - use sqlx::Row as _; let pg_split = PgMetricsSplit { diff --git a/quickwit/quickwit-metastore/src/tests/metrics.rs b/quickwit/quickwit-metastore/src/tests/metrics.rs index f478b386e8a..60d3246a2a4 100644 --- a/quickwit/quickwit-metastore/src/tests/metrics.rs +++ b/quickwit/quickwit-metastore/src/tests/metrics.rs @@ -14,9 +14,7 @@ use quickwit_common::rand::append_random_suffix; use quickwit_config::IndexConfig; -use quickwit_parquet_engine::split::{ - MetricsSplitMetadata, MetricsSplitState, SplitId, TimeRange, TAG_ENV, TAG_HOST, TAG_SERVICE, -}; +use quickwit_parquet_engine::split::{MetricsSplitMetadata, MetricsSplitState, SplitId, TimeRange}; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteMetricsSplitsRequest, EntityKind, ListMetricsSplitsRequest, MarkMetricsSplitsForDeletionRequest, MetastoreError, PublishMetricsSplitsRequest, @@ -32,14 +30,10 @@ use crate::{ }; /// Helper to create a test index and return the actual IndexUid assigned by the metastore. -async fn create_test_index( - metastore: &mut dyn MetastoreServiceExt, - index_id: &str, -) -> IndexUid { +async fn create_test_index(metastore: &mut dyn MetastoreServiceExt, index_id: &str) -> IndexUid { let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(index_id, &index_uri); - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); metastore .create_index(create_index_request) .await @@ -92,18 +86,15 @@ pub async fn test_metastore_stage_metrics_splits< let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_1, split_2], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); // Verify both splits are listed in Staged state. let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Staged".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 2); @@ -135,11 +126,9 @@ pub async fn test_metastore_stage_metrics_splits_upsert< .parquet_file(format!("{split_id}.parquet")) .build(); - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_v1], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_v1]) + .unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); // Stage the same split_id again with 200 rows (upsert). @@ -153,18 +142,15 @@ pub async fn test_metastore_stage_metrics_splits_upsert< .parquet_file(format!("{split_id}.parquet")) .build(); - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_v2], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_v2]) + .unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); // Verify only one split exists and it has the updated num_rows. let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Staged".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 1); @@ -186,11 +172,9 @@ pub async fn test_metastore_list_metrics_splits_by_state< let split_id_2 = format!("{index_id}--split-2"); let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_1, split_2], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); // Publish split_1 only. @@ -278,8 +262,7 @@ pub async fn test_metastore_list_metrics_splits_by_time_range< let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Staged".to_string()]) .with_time_range(1500, 3500); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); // Should match splits whose time range overlaps [1500, 3500]. @@ -341,8 +324,7 @@ pub async fn test_metastore_list_metrics_splits_by_metric_name< let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Staged".to_string()]) .with_metric_names(vec!["cpu.usage".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 2); @@ -408,8 +390,7 @@ pub async fn test_metastore_list_metrics_splits_by_compaction_scope< let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Staged".to_string()]) .with_compaction_scope(1700000000, "metric_name|host|timestamp/V2"); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); // Only split_1 matches both window_start and sort_fields. @@ -435,11 +416,9 @@ pub async fn test_metastore_publish_metrics_splits< let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); // Stage both. - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_1, split_2], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); // Publish both. @@ -456,8 +435,7 @@ pub async fn test_metastore_publish_metrics_splits< // Verify they are now Published. let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Published".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 2); @@ -469,8 +447,7 @@ pub async fn test_metastore_publish_metrics_splits< // Verify no Staged splits remain. let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Staged".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 0); @@ -516,11 +493,9 @@ pub async fn test_metastore_mark_metrics_splits_for_deletion< let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); // Stage and publish. - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_1, split_2], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); let publish_request = PublishMetricsSplitsRequest { @@ -546,8 +521,7 @@ pub async fn test_metastore_mark_metrics_splits_for_deletion< // Verify split_1 is MarkedForDeletion. let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["MarkedForDeletion".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 1); @@ -556,8 +530,7 @@ pub async fn test_metastore_mark_metrics_splits_for_deletion< // Verify split_2 is still Published. let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) .with_split_states(vec!["Published".to_string()]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 1); @@ -577,11 +550,8 @@ pub async fn test_metastore_delete_metrics_splits< let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); // Stage, publish, mark for deletion. - let request = StageMetricsSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - &[split_1], - ) - .unwrap(); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1]).unwrap(); metastore.stage_metrics_splits(request).await.unwrap(); let publish_request = PublishMetricsSplitsRequest { @@ -619,8 +589,7 @@ pub async fn test_metastore_delete_metrics_splits< "Published".to_string(), "MarkedForDeletion".to_string(), ]); - let list_request = - ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); let response = metastore.list_metrics_splits(list_request).await.unwrap(); let splits = response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 0); diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index fd384670438..7c068d57113 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -22,11 +22,11 @@ use quickwit_proto::metastore::{ DeleteIndexRequest, DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreServiceClient, MetastoreServiceGrpcClientAdapter, }; - -use crate::metastore::{ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt}; use quickwit_proto::tonic::transport::Channel; use quickwit_proto::types::IndexUid; +use crate::metastore::{ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt}; + pub(crate) mod delete_task; pub(crate) mod get_identity; pub(crate) mod index; @@ -157,48 +157,41 @@ async fn cleanup_index(metastore: &mut dyn MetastoreServiceExt, index_uid: Index .unwrap(); } // Also clean up any metrics splits (they have a separate FK constraint). - let metrics_query = - crate::metastore::ListMetricsSplitsQuery::for_index(index_uid.clone()).with_split_states( - vec![ - "Staged".to_string(), - "Published".to_string(), - "MarkedForDeletion".to_string(), - ], - ); + let metrics_query = crate::metastore::ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec![ + "Staged".to_string(), + "Published".to_string(), + "MarkedForDeletion".to_string(), + ]); if let Ok(list_request) = quickwit_proto::metastore::ListMetricsSplitsRequest::try_from_query( index_uid.clone(), &metrics_query, - ) { - if let Ok(response) = metastore.list_metrics_splits(list_request).await { - if let Ok(splits) = response.deserialize_splits() { - if !splits.is_empty() { - let split_ids: Vec = splits - .iter() - .map(|s| s.metadata.split_id.as_str().to_string()) - .collect(); - - // Mark for deletion first. - let _ = metastore - .mark_metrics_splits_for_deletion( - quickwit_proto::metastore::MarkMetricsSplitsForDeletionRequest { - index_uid: Some(index_uid.clone()), - split_ids: split_ids.clone(), - }, - ) - .await; + ) && let Ok(response) = metastore.list_metrics_splits(list_request).await + && let Ok(splits) = response.deserialize_splits() + && !splits.is_empty() + { + let split_ids: Vec = splits + .iter() + .map(|s| s.metadata.split_id.as_str().to_string()) + .collect(); - // Delete. - let _ = metastore - .delete_metrics_splits( - quickwit_proto::metastore::DeleteMetricsSplitsRequest { - index_uid: Some(index_uid.clone()), - split_ids, - }, - ) - .await; - } - } - } + // Mark for deletion first. + let _ = metastore + .mark_metrics_splits_for_deletion( + quickwit_proto::metastore::MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: split_ids.clone(), + }, + ) + .await; + + // Delete. + let _ = metastore + .delete_metrics_splits(quickwit_proto::metastore::DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids, + }) + .await; } // Delete index.