diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql new file mode 100644 index 00000000000..493ddc2e1cd --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.down.sql @@ -0,0 +1,8 @@ +-- Reverse Phase 31: Remove compaction metadata columns. +DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs; +ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql new file mode 100644 index 00000000000..a83336e8295 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_add-compaction-metadata.up.sql @@ -0,0 +1,14 @@ +-- Phase 31: Add compaction metadata columns to metrics_splits. +-- These columns support time-windowed compaction planning and execution. +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT ''; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA; +ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}'; + +-- Compaction scope index: supports the compaction planner's primary query pattern +-- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple." +CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope + ON metrics_splits (index_uid, sort_fields, window_start) + WHERE split_state = 'Published'; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index e35618b99f8..152dc0cd84f 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -979,6 +979,18 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp } } + // Filter by compaction scope + if let Some(ws) = query.window_start + && split.metadata.window_start() != Some(ws) + { + return false; + } + if let Some(ref sf) = query.sort_fields + && split.metadata.sort_fields != *sf + { + return false; + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index ddfee25afab..b1bc686107a 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -75,6 +75,10 @@ pub struct ListMetricsSplitsQuery { pub tag_region: Option, /// Host tag filter. pub tag_host: Option, + /// Window start filter for compaction scope queries. + pub window_start: Option, + /// Sort fields filter for compaction scope queries. + pub sort_fields: Option, /// Limit number of results. pub limit: Option, } @@ -107,6 +111,17 @@ impl ListMetricsSplitsQuery { self.metric_names = names; self } + + /// Filter by compaction scope (window_start + sort_fields). + pub fn with_compaction_scope( + mut self, + window_start: i64, + sort_fields: impl Into, + ) -> Self { + self.window_start = Some(window_start); + self.sort_fields = Some(sort_fields.into()); + self + } } /// Splits batch size returned by the stream splits API diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index bfa08ff3b9d..3bbbe3c2f27 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1803,6 +1803,13 @@ impl MetastoreService for PostgresqlMetastore { let mut num_rows_list = Vec::with_capacity(splits_metadata.len()); let mut size_bytes_list = Vec::with_capacity(splits_metadata.len()); let mut split_metadata_jsons = Vec::with_capacity(splits_metadata.len()); + let mut window_starts: Vec> = Vec::with_capacity(splits_metadata.len()); + let mut window_duration_secs_list: Vec> = + Vec::with_capacity(splits_metadata.len()); + let mut sort_fields_list: Vec = Vec::with_capacity(splits_metadata.len()); + let mut num_merge_ops_list: Vec = Vec::with_capacity(splits_metadata.len()); + let mut row_keys_list: Vec>> = Vec::with_capacity(splits_metadata.len()); + let mut zonemap_regexes_json_list: Vec = Vec::with_capacity(splits_metadata.len()); for metadata in &splits_metadata { let insertable = @@ -1837,6 +1844,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows_list.push(insertable.num_rows); size_bytes_list.push(insertable.size_bytes); split_metadata_jsons.push(insertable.split_metadata_json); + window_starts.push(insertable.window_start); + window_duration_secs_list.push(insertable.window_duration_secs); + sort_fields_list.push(insertable.sort_fields); + num_merge_ops_list.push(insertable.num_merge_ops); + row_keys_list.push(insertable.row_keys); + zonemap_regexes_json_list.push(insertable.zonemap_regexes.to_string()); } info!( @@ -1863,6 +1876,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows, size_bytes, split_metadata_json, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes, create_timestamp, update_timestamp ) @@ -1887,6 +1906,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows, size_bytes, split_metadata_json, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes_json::jsonb, (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'), (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') FROM UNNEST( @@ -1904,7 +1929,13 @@ impl MetastoreService for PostgresqlMetastore { $12::text[], $13::bigint[], $14::bigint[], - $15::text[] + $15::text[], + $16::bigint[], + $17::int[], + $18::text[], + $19::int[], + $20::bytea[], + $21::text[] ) AS staged( split_id, split_state, @@ -1920,7 +1951,13 @@ impl MetastoreService for PostgresqlMetastore { high_cardinality_tag_keys_json, num_rows, size_bytes, - split_metadata_json + split_metadata_json, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes_json ) ON CONFLICT (split_id) DO UPDATE SET @@ -1937,6 +1974,12 @@ impl MetastoreService for PostgresqlMetastore { num_rows = EXCLUDED.num_rows, size_bytes = EXCLUDED.size_bytes, split_metadata_json = EXCLUDED.split_metadata_json, + window_start = EXCLUDED.window_start, + window_duration_secs = EXCLUDED.window_duration_secs, + sort_fields = EXCLUDED.sort_fields, + num_merge_ops = EXCLUDED.num_merge_ops, + row_keys = EXCLUDED.row_keys, + zonemap_regexes = EXCLUDED.zonemap_regexes, update_timestamp = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') WHERE metrics_splits.split_state = 'Staged' RETURNING split_id @@ -1962,6 +2005,12 @@ impl MetastoreService for PostgresqlMetastore { .bind(&num_rows_list) .bind(&size_bytes_list) .bind(&split_metadata_jsons) + .bind(&window_starts) + .bind(&window_duration_secs_list) + .bind(&sort_fields_list) + .bind(&num_merge_ops_list) + .bind(&row_keys_list) + .bind(&zonemap_regexes_json_list) .fetch_all(tx.as_mut()) .await .map_err(|sqlx_error| convert_sqlx_err(&index_id_for_err, sqlx_error)) @@ -2102,26 +2151,22 @@ impl MetastoreService for PostgresqlMetastore { .await .map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?; - // Verify all staged splits were published + // Verify all staged splits were published. + // If some splits are missing, it means they don't exist (NotFound), + // not that they're in the wrong state (FailedPrecondition). if published_count as usize != staged_split_ids.len() { - let entity = EntityKind::Splits { + return Err(MetastoreError::NotFound(EntityKind::Splits { split_ids: staged_split_ids.clone(), - }; - let message = format!( - "expected to publish {} splits, but only {} were in Staged state", - staged_split_ids.len(), - published_count - ); - return Err(MetastoreError::FailedPrecondition { entity, message }); + })); } - // Verify all replaced splits were marked for deletion + // Verify all replaced splits were marked for deletion. if marked_count as usize != replaced_split_ids.len() { let entity = EntityKind::Splits { split_ids: replaced_split_ids.clone(), }; let message = format!( - "expected to mark {} splits for deletion, but only {} were in Published state", + "expected to replace {} splits, but only {} were in Published state", replaced_split_ids.len(), marked_count ); @@ -2169,7 +2214,13 @@ impl MetastoreService for PostgresqlMetastore { num_rows, size_bytes, split_metadata_json, - EXTRACT(EPOCH FROM update_timestamp)::bigint as update_timestamp + EXTRACT(EPOCH FROM update_timestamp)::bigint as update_timestamp, + window_start, + window_duration_secs, + sort_fields, + num_merge_ops, + row_keys, + zonemap_regexes FROM metrics_splits WHERE index_uid = $1 "#, @@ -2221,6 +2272,16 @@ impl MetastoreService for PostgresqlMetastore { param_idx += 1; } + // Compaction scope filters + if query.window_start.is_some() { + sql.push_str(&format!(" AND window_start = ${}", param_idx)); + param_idx += 1; + } + if query.sort_fields.is_some() { + sql.push_str(&format!(" AND sort_fields = ${}", param_idx)); + param_idx += 1; + } + sql.push_str(" ORDER BY time_range_start ASC"); // Add limit @@ -2229,27 +2290,7 @@ impl MetastoreService for PostgresqlMetastore { } // Execute query with bindings - let mut query_builder = sqlx::query_as::< - _, - ( - String, // split_id - String, // split_state - String, // index_uid - i64, // time_range_start - i64, // time_range_end - Vec, // metric_names - Option>, // tag_service - Option>, // tag_env - Option>, // tag_datacenter - Option>, // tag_region - Option>, // tag_host - Vec, // high_cardinality_tag_keys - i64, // num_rows - i64, // size_bytes - String, // split_metadata_json - i64, // update_timestamp - ), - >(&sql); + let mut query_builder = sqlx::query(&sql); query_builder = query_builder.bind(query.index_uid.to_string()); @@ -2280,6 +2321,12 @@ impl MetastoreService for PostgresqlMetastore { if let Some(ref host) = query.tag_host { query_builder = query_builder.bind(host); } + if let Some(ws) = query.window_start { + query_builder = query_builder.bind(ws); + } + if let Some(ref sf) = query.sort_fields { + query_builder = query_builder.bind(sf); + } if let Some(limit) = query.limit { query_builder = query_builder.bind(limit as i64); } @@ -2294,30 +2341,31 @@ impl MetastoreService for PostgresqlMetastore { .into_iter() .filter_map(|row| { use quickwit_parquet_engine::split::{MetricsSplitState, PgMetricsSplit}; + use sqlx::Row as _; let pg_split = PgMetricsSplit { - split_id: row.0, - split_state: row.1, - index_uid: row.2, - time_range_start: row.3, - time_range_end: row.4, - metric_names: row.5, - tag_service: row.6, - tag_env: row.7, - tag_datacenter: row.8, - tag_region: row.9, - tag_host: row.10, - high_cardinality_tag_keys: row.11, - num_rows: row.12, - size_bytes: row.13, - split_metadata_json: row.14, - update_timestamp: row.15, - window_start: None, - window_duration_secs: 0, - sort_fields: String::new(), - num_merge_ops: 0, - row_keys: None, - zonemap_regexes: String::new(), + split_id: row.get("split_id"), + split_state: row.get("split_state"), + index_uid: row.get("index_uid"), + time_range_start: row.get("time_range_start"), + time_range_end: row.get("time_range_end"), + metric_names: row.get("metric_names"), + tag_service: row.get("tag_service"), + tag_env: row.get("tag_env"), + tag_datacenter: row.get("tag_datacenter"), + tag_region: row.get("tag_region"), + tag_host: row.get("tag_host"), + high_cardinality_tag_keys: row.get("high_cardinality_tag_keys"), + num_rows: row.get("num_rows"), + size_bytes: row.get("size_bytes"), + split_metadata_json: row.get("split_metadata_json"), + update_timestamp: row.get("update_timestamp"), + window_start: row.get("window_start"), + window_duration_secs: row.get("window_duration_secs"), + sort_fields: row.get("sort_fields"), + num_merge_ops: row.get("num_merge_ops"), + row_keys: row.get("row_keys"), + zonemap_regexes: row.get("zonemap_regexes"), }; let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged); @@ -2325,7 +2373,7 @@ impl MetastoreService for PostgresqlMetastore { Some(MetricsSplitRecord { state, - update_timestamp: row.15, + update_timestamp: pg_split.update_timestamp, metadata, }) }) @@ -2408,7 +2456,9 @@ impl MetastoreService for PostgresqlMetastore { .await .map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?; - // Check if any splits could not be deleted + // Log if some splits were not deleted (either non-existent or not + // in MarkedForDeletion state). Delete is idempotent — we don't error + // for missing splits. if deleted_split_ids.len() != request.split_ids.len() { let not_deleted: Vec = request .split_ids @@ -2421,13 +2471,8 @@ impl MetastoreService for PostgresqlMetastore { warn!( index_uid = %request.index_uid(), not_deleted = ?not_deleted, - "some metrics splits were not in MarkedForDeletion state" + "some metrics splits were not deleted (non-existent or not marked for deletion)" ); - let entity = EntityKind::Splits { - split_ids: not_deleted, - }; - let message = "splits are not marked for deletion".to_string(); - return Err(MetastoreError::FailedPrecondition { entity, message }); } } diff --git a/quickwit/quickwit-metastore/src/tests/metrics.rs b/quickwit/quickwit-metastore/src/tests/metrics.rs new file mode 100644 index 00000000000..60d3246a2a4 --- /dev/null +++ b/quickwit/quickwit-metastore/src/tests/metrics.rs @@ -0,0 +1,618 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_common::rand::append_random_suffix; +use quickwit_config::IndexConfig; +use quickwit_parquet_engine::split::{MetricsSplitMetadata, MetricsSplitState, SplitId, TimeRange}; +use quickwit_proto::metastore::{ + CreateIndexRequest, DeleteMetricsSplitsRequest, EntityKind, ListMetricsSplitsRequest, + MarkMetricsSplitsForDeletionRequest, MetastoreError, PublishMetricsSplitsRequest, + StageMetricsSplitsRequest, +}; +use quickwit_proto::types::IndexUid; + +use super::DefaultForTest; +use crate::tests::cleanup_index; +use crate::{ + CreateIndexRequestExt, ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, + ListMetricsSplitsResponseExt, MetastoreServiceExt, StageMetricsSplitsRequestExt, +}; + +/// Helper to create a test index and return the actual IndexUid assigned by the metastore. +async fn create_test_index(metastore: &mut dyn MetastoreServiceExt, index_id: &str) -> IndexUid { + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(index_id, &index_uri); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone() +} + +/// Build a simple MetricsSplitMetadata for tests. +fn build_test_split( + split_id: &str, + index_uid: &IndexUid, + time_range: TimeRange, +) -> MetricsSplitMetadata { + MetricsSplitMetadata::builder() + .split_id(SplitId::new(split_id)) + .index_uid(index_uid.to_string()) + .time_range(time_range) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file(format!("{split_id}.parquet")) + .build() +} + +pub async fn test_metastore_stage_metrics_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-stage-metrics-splits"); + + // Stage on a non-existent index should fail. + { + let fake_uid = IndexUid::new_with_random_ulid("index-not-found"); + let split = build_test_split("split-1", &fake_uid, TimeRange::new(1000, 2000)); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(fake_uid, &[split]).unwrap(); + let error = metastore.stage_metrics_splits(request).await.unwrap_err(); + assert!(matches!( + error, + MetastoreError::NotFound(EntityKind::Index { .. }) + )); + } + + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage two splits. + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Verify both splits are listed in Staged state. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + + for split in &splits { + assert_eq!(split.state, MetricsSplitState::Staged); + } + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_stage_metrics_splits_upsert< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-stage-metrics-upsert"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id = format!("{index_id}--split-1"); + + // Stage a split with 100 rows. + let split_v1 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(&split_id)) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file(format!("{split_id}.parquet")) + .build(); + + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_v1]) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Stage the same split_id again with 200 rows (upsert). + let split_v2 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(&split_id)) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(200) + .size_bytes(8192) + .add_metric_name("cpu.usage") + .parquet_file(format!("{split_id}.parquet")) + .build(); + + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_v2]) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Verify only one split exists and it has the updated num_rows. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.num_rows, 200); + assert_eq!(splits[0].metadata.size_bytes, 8192); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_state< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-by-state"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage two splits. + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Publish split_1 only. + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + // List only Published splits. + { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_1); + assert_eq!(splits[0].state, MetricsSplitState::Published); + } + + // List only Staged splits. + { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_2); + assert_eq!(splits[0].state, MetricsSplitState::Staged); + } + + // List both states. + { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string(), "Staged".to_string()]); + let list_request = + ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + } + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_time_range< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-time-range"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage splits at different time ranges. + let split_1 = build_test_split( + &format!("{index_id}--split-1"), + &index_uid, + TimeRange::new(1000, 2000), + ); + let split_2 = build_test_split( + &format!("{index_id}--split-2"), + &index_uid, + TimeRange::new(3000, 4000), + ); + let split_3 = build_test_split( + &format!("{index_id}--split-3"), + &index_uid, + TimeRange::new(5000, 6000), + ); + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2, split_3], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Query for time range that overlaps only the first two splits. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]) + .with_time_range(1500, 3500); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + // Should match splits whose time range overlaps [1500, 3500]. + // split_1: [1000,2000) overlaps [1500,3500] => yes + // split_2: [3000,4000) overlaps [1500,3500] => yes + // split_3: [5000,6000) overlaps [1500,3500] => no + assert_eq!(splits.len(), 2); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_metric_name< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-by-name"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage splits with different metric names. + let split_1 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-1"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-1.parquet") + .build(); + + let split_2 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-2"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("memory.used") + .parquet_file("split-2.parquet") + .build(); + + let split_3 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-3"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .add_metric_name("memory.used") + .parquet_file("split-3.parquet") + .build(); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2, split_3], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Query for "cpu.usage" should return split_1 and split_3. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]) + .with_metric_names(vec!["cpu.usage".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_list_metrics_splits_by_compaction_scope< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-list-metrics-compaction"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Stage splits with different compaction scopes. + let split_1 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-1"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-1.parquet") + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .build(); + + let split_2 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-2"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-2.parquet") + .window_start_secs(1700003600) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .build(); + + let split_3 = MetricsSplitMetadata::builder() + .split_id(SplitId::new(format!("{index_id}--split-3"))) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(4096) + .add_metric_name("cpu.usage") + .parquet_file("split-3.parquet") + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("different_schema/V1") + .build(); + + let request = StageMetricsSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[split_1, split_2, split_3], + ) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Query by compaction scope: window_start=1700000000, sort_fields matching split_1. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]) + .with_compaction_scope(1700000000, "metric_name|host|timestamp/V2"); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + // Only split_1 matches both window_start and sort_fields. + assert_eq!(splits.len(), 1); + assert_eq!( + splits[0].metadata.split_id.as_str(), + format!("{index_id}--split-1") + ); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_publish_metrics_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-publish-metrics-splits"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + + // Stage both. + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + // Publish both. + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + // Verify they are now Published. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 2); + + for split in &splits { + assert_eq!(split.state, MetricsSplitState::Published); + } + + // Verify no Staged splits remain. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Staged".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 0); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_publish_metrics_splits_nonexistent< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-publish-metrics-nonexistent"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Publish a split_id that was never staged. + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec!["nonexistent-split".to_string()], + ..Default::default() + }; + let error = metastore + .publish_metrics_splits(publish_request) + .await + .unwrap_err(); + assert!( + matches!(error, MetastoreError::NotFound(EntityKind::Splits { .. })), + "expected NotFound(Splits), got: {error:?}" + ); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_mark_metrics_splits_for_deletion< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-mark-metrics-deletion"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id_1 = format!("{index_id}--split-1"); + let split_id_2 = format!("{index_id}--split-2"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + let split_2 = build_test_split(&split_id_2, &index_uid, TimeRange::new(2000, 3000)); + + // Stage and publish. + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1, split_2]) + .unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + // Mark split_1 for deletion. + let mark_request = MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec![split_id_1.clone()], + }; + metastore + .mark_metrics_splits_for_deletion(mark_request) + .await + .unwrap(); + + // Verify split_1 is MarkedForDeletion. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["MarkedForDeletion".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_1); + + // Verify split_2 is still Published. + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec!["Published".to_string()]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].metadata.split_id.as_str(), &split_id_2); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_delete_metrics_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-delete-metrics-splits"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + let split_id_1 = format!("{index_id}--split-1"); + let split_1 = build_test_split(&split_id_1, &index_uid, TimeRange::new(1000, 2000)); + + // Stage, publish, mark for deletion. + let request = + StageMetricsSplitsRequest::try_from_splits_metadata(index_uid.clone(), &[split_1]).unwrap(); + metastore.stage_metrics_splits(request).await.unwrap(); + + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_metrics_splits(publish_request) + .await + .unwrap(); + + let mark_request = MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec![split_id_1.clone()], + }; + metastore + .mark_metrics_splits_for_deletion(mark_request) + .await + .unwrap(); + + // Delete. + let delete_request = DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec![split_id_1.clone()], + }; + metastore + .delete_metrics_splits(delete_request) + .await + .unwrap(); + + // Verify it is gone (list all states). + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()).with_split_states(vec![ + "Staged".to_string(), + "Published".to_string(), + "MarkedForDeletion".to_string(), + ]); + let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap(); + let response = metastore.list_metrics_splits(list_request).await.unwrap(); + let splits = response.deserialize_splits().unwrap(); + assert_eq!(splits.len(), 0); + + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_delete_metrics_splits_nonexistent< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + let index_id = append_random_suffix("test-delete-metrics-nonexistent"); + let index_uid = create_test_index(&mut metastore, &index_id).await; + + // Delete a split_id that doesn't exist — should succeed (idempotent). + let delete_request = DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids: vec!["nonexistent-split".to_string()], + }; + metastore + .delete_metrics_splits(delete_request) + .await + .unwrap(); + + cleanup_index(&mut metastore, index_uid).await; +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index d6e549baf25..7c068d57113 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -25,10 +25,13 @@ use quickwit_proto::metastore::{ use quickwit_proto::tonic::transport::Channel; use quickwit_proto::types::IndexUid; +use crate::metastore::{ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt}; + pub(crate) mod delete_task; pub(crate) mod get_identity; pub(crate) mod index; pub(crate) mod list_splits; +pub(crate) mod metrics; pub(crate) mod shard; pub(crate) mod source; pub(crate) mod split; @@ -153,6 +156,44 @@ async fn cleanup_index(metastore: &mut dyn MetastoreServiceExt, index_uid: Index .await .unwrap(); } + // Also clean up any metrics splits (they have a separate FK constraint). + let metrics_query = crate::metastore::ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec![ + "Staged".to_string(), + "Published".to_string(), + "MarkedForDeletion".to_string(), + ]); + if let Ok(list_request) = quickwit_proto::metastore::ListMetricsSplitsRequest::try_from_query( + index_uid.clone(), + &metrics_query, + ) && let Ok(response) = metastore.list_metrics_splits(list_request).await + && let Ok(splits) = response.deserialize_splits() + && !splits.is_empty() + { + let split_ids: Vec = splits + .iter() + .map(|s| s.metadata.split_id.as_str().to_string()) + .collect(); + + // Mark for deletion first. + let _ = metastore + .mark_metrics_splits_for_deletion( + quickwit_proto::metastore::MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: split_ids.clone(), + }, + ) + .await; + + // Delete. + let _ = metastore + .delete_metrics_splits(quickwit_proto::metastore::DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids, + }) + .await; + } + // Delete index. metastore .delete_index(DeleteIndexRequest { @@ -575,6 +616,91 @@ macro_rules! metastore_test_suite { let _ = tracing_subscriber::fmt::try_init(); $crate::tests::get_identity::test_metastore_get_identity::<$metastore_type>().await; } + + // Metrics Split API tests + // + // - stage_metrics_splits + // - publish_metrics_splits + // - list_metrics_splits + // - mark_metrics_splits_for_deletion + // - delete_metrics_splits + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_stage_metrics_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_stage_metrics_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_stage_metrics_splits_upsert() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_stage_metrics_splits_upsert::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_state() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_state::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_time_range() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_time_range::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_metric_name() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_metric_name::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_metrics_splits_by_compaction_scope() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_list_metrics_splits_by_compaction_scope::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_publish_metrics_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_publish_metrics_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_publish_metrics_splits_nonexistent() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_publish_metrics_splits_nonexistent::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_mark_metrics_splits_for_deletion() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_mark_metrics_splits_for_deletion::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_delete_metrics_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_delete_metrics_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_delete_metrics_splits_nonexistent() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::metrics::test_metastore_delete_metrics_splits_nonexistent::<$metastore_type>().await; + } } }; } diff --git a/quickwit/quickwit-parquet-engine/src/index/config.rs b/quickwit/quickwit-parquet-engine/src/index/config.rs index adf59c992be..7712b2545f8 100644 --- a/quickwit/quickwit-parquet-engine/src/index/config.rs +++ b/quickwit/quickwit-parquet-engine/src/index/config.rs @@ -17,6 +17,7 @@ use std::sync::OnceLock; use crate::storage::ParquetWriterConfig; +use crate::table_config::TableConfig; /// Default maximum rows to accumulate before flushing to split. const DEFAULT_MAX_ROWS: usize = 1_000_000; @@ -58,6 +59,8 @@ pub struct ParquetIndexingConfig { pub max_bytes: usize, /// Parquet writer configuration for split creation. pub writer_config: ParquetWriterConfig, + /// Table-level configuration (sort fields, window duration, product type). + pub table_config: TableConfig, } impl Default for ParquetIndexingConfig { @@ -66,6 +69,7 @@ impl Default for ParquetIndexingConfig { max_rows: get_max_rows_from_env(), max_bytes: get_max_bytes_from_env(), writer_config: ParquetWriterConfig::default(), + table_config: TableConfig::default(), } } } diff --git a/quickwit/quickwit-parquet-engine/src/schema/fields.rs b/quickwit/quickwit-parquet-engine/src/schema/fields.rs index 9f46dcf3b8c..b5d149a5b51 100644 --- a/quickwit/quickwit-parquet-engine/src/schema/fields.rs +++ b/quickwit/quickwit-parquet-engine/src/schema/fields.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Parquet field definitions with sort order constants and validation. +//! Parquet field definitions with column metadata, sort order constants, and validation. use anyhow::{Result, bail}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Field, Fields}; +use parquet::variant::VariantType; /// Required field names that must exist in every batch. pub const REQUIRED_FIELDS: &[&str] = &["metric_name", "metric_type", "timestamp_secs", "value"]; @@ -31,6 +32,144 @@ pub const SORT_ORDER: &[&str] = &[ "timestamp_secs", ]; +/// All fields in the parquet schema. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ParquetField { + MetricName, + MetricType, + MetricUnit, + TimestampSecs, + StartTimestampSecs, + Value, + TagService, + TagEnv, + TagDatacenter, + TagRegion, + TagHost, + Attributes, + ServiceName, + ResourceAttributes, +} + +impl ParquetField { + /// Field name as stored in Parquet. + pub fn name(&self) -> &'static str { + match self { + Self::MetricName => "metric_name", + Self::MetricType => "metric_type", + Self::MetricUnit => "metric_unit", + Self::TimestampSecs => "timestamp_secs", + Self::StartTimestampSecs => "start_timestamp_secs", + Self::Value => "value", + Self::TagService => "tag_service", + Self::TagEnv => "tag_env", + Self::TagDatacenter => "tag_datacenter", + Self::TagRegion => "tag_region", + Self::TagHost => "tag_host", + Self::Attributes => "attributes", + Self::ServiceName => "service_name", + Self::ResourceAttributes => "resource_attributes", + } + } + + /// Whether this field is nullable. + pub fn nullable(&self) -> bool { + matches!( + self, + Self::MetricUnit + | Self::StartTimestampSecs + | Self::TagService + | Self::TagEnv + | Self::TagDatacenter + | Self::TagRegion + | Self::TagHost + | Self::Attributes + | Self::ResourceAttributes + ) + } + + /// Arrow DataType for this field. + /// Use dictionary encoding for high-cardinality strings. + pub fn arrow_type(&self) -> DataType { + match self { + // Dictionary-encoded strings for high cardinality + Self::MetricName | Self::ServiceName => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + } + // Dictionary-encoded optional tags + Self::TagService + | Self::TagEnv + | Self::TagDatacenter + | Self::TagRegion + | Self::TagHost => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + } + // Enum stored as UInt8 + Self::MetricType => DataType::UInt8, + // Timestamps as UInt64 seconds + Self::TimestampSecs | Self::StartTimestampSecs => DataType::UInt64, + // Metric value + Self::Value => DataType::Float64, + // Plain string for metric unit + Self::MetricUnit => DataType::Utf8, + // VARIANT type for semi-structured attributes + // Uses the Parquet Variant binary encoding format + Self::Attributes | Self::ResourceAttributes => { + // VARIANT is stored as a struct with metadata and value BinaryView fields + // VariantArrayBuilder produces BinaryView, not Binary + DataType::Struct(Fields::from(vec![ + Field::new("metadata", DataType::BinaryView, false), + Field::new("value", DataType::BinaryView, false), + ])) + } + } + } + + /// Convert to Arrow Field. + pub fn to_arrow_field(&self) -> Field { + let field = Field::new(self.name(), self.arrow_type(), self.nullable()); + + // Add VARIANT extension type metadata for attributes fields + match self { + Self::Attributes | Self::ResourceAttributes => field.with_extension_type(VariantType), + _ => field, + } + } + + /// All fields in schema order. + pub fn all() -> &'static [ParquetField] { + &[ + Self::MetricName, + Self::MetricType, + Self::MetricUnit, + Self::TimestampSecs, + Self::StartTimestampSecs, + Self::Value, + Self::TagService, + Self::TagEnv, + Self::TagDatacenter, + Self::TagRegion, + Self::TagHost, + Self::Attributes, + Self::ServiceName, + Self::ResourceAttributes, + ] + } + + /// Get the column index in the schema. + pub fn column_index(&self) -> usize { + Self::all().iter().position(|f| f == self).unwrap() + } + + /// Look up a ParquetField by its Parquet column name. + /// + /// Used by the sort fields resolver to map sort schema column names + /// to physical schema columns. + pub fn from_name(name: &str) -> Option { + Self::all().iter().find(|f| f.name() == name).copied() + } +} + /// Arrow type for required fields by name. pub fn required_field_type(name: &str) -> Option { match name { diff --git a/quickwit/quickwit-parquet-engine/src/schema/mod.rs b/quickwit/quickwit-parquet-engine/src/schema/mod.rs index f9b5c06d9c4..71026ddf9c8 100644 --- a/quickwit/quickwit-parquet-engine/src/schema/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/schema/mod.rs @@ -20,5 +20,7 @@ mod fields; mod parquet; -pub use fields::{REQUIRED_FIELDS, SORT_ORDER, required_field_type, validate_required_fields}; +pub use fields::{ + ParquetField, REQUIRED_FIELDS, SORT_ORDER, required_field_type, validate_required_fields, +}; pub use parquet::ParquetSchema; diff --git a/quickwit/quickwit-parquet-engine/src/split/postgres.rs b/quickwit/quickwit-parquet-engine/src/split/postgres.rs index 0ea753181bb..6b63f46f17f 100644 --- a/quickwit/quickwit-parquet-engine/src/split/postgres.rs +++ b/quickwit/quickwit-parquet-engine/src/split/postgres.rs @@ -72,11 +72,11 @@ pub struct PgMetricsSplit { pub split_metadata_json: String, pub update_timestamp: i64, pub window_start: Option, - pub window_duration_secs: i32, + pub window_duration_secs: Option, pub sort_fields: String, pub num_merge_ops: i32, pub row_keys: Option>, - pub zonemap_regexes: String, + pub zonemap_regexes: serde_json::Value, } /// Insertable row for metrics_splits table. @@ -99,11 +99,11 @@ pub struct InsertableMetricsSplit { pub size_bytes: i64, pub split_metadata_json: String, pub window_start: Option, - pub window_duration_secs: i32, + pub window_duration_secs: Option, pub sort_fields: String, pub num_merge_ops: i32, pub row_keys: Option>, - pub zonemap_regexes: String, + pub zonemap_regexes: serde_json::Value, } impl InsertableMetricsSplit { @@ -114,12 +114,6 @@ impl InsertableMetricsSplit { ) -> Result { let split_metadata_json = serde_json::to_string(metadata)?; - let zonemap_regexes_json = if metadata.zonemap_regexes.is_empty() { - "{}".to_string() - } else { - serde_json::to_string(&metadata.zonemap_regexes)? - }; - Ok(Self { split_id: metadata.split_id.as_str().to_string(), split_state: state.as_str().to_string(), @@ -137,11 +131,15 @@ impl InsertableMetricsSplit { size_bytes: metadata.size_bytes as i64, split_metadata_json, window_start: metadata.window_start(), - window_duration_secs: metadata.window_duration_secs() as i32, + window_duration_secs: { + let dur = metadata.window_duration_secs(); + if dur > 0 { Some(dur as i32) } else { None } + }, sort_fields: metadata.sort_fields.clone(), num_merge_ops: metadata.num_merge_ops as i32, row_keys: metadata.row_keys_proto.clone(), - zonemap_regexes: zonemap_regexes_json, + zonemap_regexes: serde_json::to_value(&metadata.zonemap_regexes) + .unwrap_or_else(|_| serde_json::json!({})), }) } } @@ -165,7 +163,7 @@ impl PgMetricsSplit { debug_assert_eq!(metadata.window_start(), self.window_start); debug_assert_eq!( metadata.window_duration_secs(), - self.window_duration_secs as u32 + self.window_duration_secs.unwrap_or(0) as u32 ); debug_assert_eq!(metadata.sort_fields, self.sort_fields); debug_assert_eq!(metadata.num_merge_ops, self.num_merge_ops as u32); @@ -257,6 +255,61 @@ mod tests { assert_eq!(insertable.size_bytes, 1024 * 1024); } + #[test] + fn test_insertable_from_metadata_with_compaction_fields() { + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("compaction-test")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(500) + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .num_merge_ops(2) + .row_keys_proto(vec![0x08, 0x01]) + .add_zonemap_regex("metric_name", "cpu\\..*") + .build(); + + let insertable = + InsertableMetricsSplit::from_metadata(&metadata, MetricsSplitState::Published) + .expect("conversion should succeed"); + + assert_eq!(insertable.window_start, Some(1700000000)); + assert_eq!(insertable.window_duration_secs, Some(3600)); + assert_eq!(insertable.sort_fields, "metric_name|host|timestamp/V2"); + assert_eq!(insertable.num_merge_ops, 2); + assert_eq!(insertable.row_keys, Some(vec![0x08, 0x01])); + assert!(insertable.zonemap_regexes.is_object()); + assert_eq!( + insertable.zonemap_regexes["metric_name"], + serde_json::json!("cpu\\..*") + ); + } + + #[test] + fn test_insertable_from_metadata_pre_phase31_defaults() { + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("pre-phase31")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .build(); + + let insertable = + InsertableMetricsSplit::from_metadata(&metadata, MetricsSplitState::Staged) + .expect("conversion should succeed"); + + assert!(insertable.window_start.is_none()); + assert!( + insertable.window_duration_secs.is_none(), + "pre-Phase-31 splits should have NULL window_duration_secs" + ); + assert_eq!(insertable.sort_fields, ""); + assert_eq!(insertable.num_merge_ops, 0); + assert!(insertable.row_keys.is_none()); + assert_eq!(insertable.zonemap_regexes, serde_json::json!({})); + } + #[test] fn test_pg_split_to_metadata_roundtrip() { let original = MetricsSplitMetadata::builder() diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index d45c0a0c9cb..3ec633573bc 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -702,6 +702,89 @@ mod tests { std::fs::remove_file(&path).ok(); } + /// META-07 compliance: Prove the Parquet file is truly self-describing by + /// writing compaction metadata, reading it back from a cold file (no in-memory + /// state), and reconstructing the MetricsSplitMetadata compaction fields from + /// ONLY the Parquet key_value_metadata. + #[test] + fn test_meta07_self_describing_parquet_roundtrip() { + use std::fs::File; + + use parquet::file::reader::{FileReader, SerializedFileReader}; + + use crate::split::{SplitId, TimeRange}; + + let sort_schema_str = "metric_name|host|env|timestamp/V2"; + let window_start_secs: i64 = 1700006400; + let window_duration: u32 = 900; + let merge_ops: u32 = 7; + let row_keys_bytes: Vec = vec![0x0A, 0x03, 0x63, 0x70, 0x75]; + + let original = MetricsSplitMetadata::builder() + .split_id(SplitId::new("self-describing-test")) + .index_uid("metrics-prod:00000000000000000000000000") + .time_range(TimeRange::new(1700006400, 1700007300)) + .window_start_secs(window_start_secs) + .window_duration_secs(window_duration) + .sort_fields(sort_schema_str) + .num_merge_ops(merge_ops) + .row_keys_proto(row_keys_bytes.clone()) + .build(); + + let config = ParquetWriterConfig::default(); + let writer = ParquetWriter::new(config, &TableConfig::default()); + let batch = create_test_batch(); + + let temp_dir = std::env::temp_dir(); + let path = temp_dir.join("test_self_describing_roundtrip.parquet"); + writer + .write_to_file_with_metadata(&batch, &path, Some(&original)) + .unwrap(); + + // Read phase: open a cold file and reconstruct fields from kv_metadata. + let file = File::open(&path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let file_metadata = reader.metadata().file_metadata(); + let kv_metadata = file_metadata + .key_value_metadata() + .expect("self-describing file must have kv_metadata"); + + let find_kv = |key: &str| -> Option { + kv_metadata + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }; + + let recovered_sort_schema = find_kv(PARQUET_META_SORT_FIELDS) + .expect("self-describing file must contain qh.sort_fields"); + let recovered_window_start: i64 = find_kv(PARQUET_META_WINDOW_START) + .expect("self-describing file must contain qh.window_start") + .parse() + .expect("window_start must be parseable as i64"); + let recovered_window_duration: u32 = find_kv(PARQUET_META_WINDOW_DURATION) + .expect("self-describing file must contain qh.window_duration_secs") + .parse() + .expect("window_duration must be parseable as u32"); + let recovered_merge_ops: u32 = find_kv(PARQUET_META_NUM_MERGE_OPS) + .expect("self-describing file must contain qh.num_merge_ops") + .parse() + .expect("num_merge_ops must be parseable as u32"); + let recovered_row_keys_b64 = + find_kv(PARQUET_META_ROW_KEYS).expect("self-describing file must contain qh.row_keys"); + let recovered_row_keys = BASE64 + .decode(&recovered_row_keys_b64) + .expect("row_keys must be valid base64"); + + assert_eq!(recovered_sort_schema, sort_schema_str); + assert_eq!(recovered_window_start, window_start_secs); + assert_eq!(recovered_window_duration, window_duration); + assert_eq!(recovered_merge_ops, merge_ops); + assert_eq!(recovered_row_keys, row_keys_bytes); + + std::fs::remove_file(&path).ok(); + } + #[test] fn test_build_compaction_kv_metadata_fully_populated() { use crate::split::{SplitId, TimeRange};