Skip to content

Commit afdec4e

Browse files
committed
review: parquet_file singular, proto doc link, fix metastore accessor
1 parent fc5eaef commit afdec4e

4 files changed

Lines changed: 75 additions & 39 deletions

File tree

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
981981

982982
// Filter by compaction scope
983983
if let Some(ws) = query.window_start
984-
&& split.metadata.window_start != Some(ws)
984+
&& split.metadata.window_start() != Some(ws)
985985
{
986986
return false;
987987
}

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2441,45 +2441,80 @@ impl MetastoreService for PostgresqlMetastore {
24412441
);
24422442

24432443
// Only delete splits that are marked for deletion
2444+
// Match the non-metrics delete_splits pattern: distinguish
2445+
// "not found" (warn + succeed) from "not deletable" (FailedPrecondition).
24442446
const DELETE_SPLITS_QUERY: &str = r#"
2445-
DELETE FROM metrics_splits
2446-
WHERE
2447-
index_uid = $1
2448-
AND split_id = ANY($2)
2449-
AND split_state = 'MarkedForDeletion'
2450-
RETURNING split_id
2447+
WITH input_splits AS (
2448+
SELECT input_splits.split_id, metrics_splits.split_state
2449+
FROM UNNEST($2::text[]) AS input_splits(split_id)
2450+
LEFT JOIN metrics_splits
2451+
ON metrics_splits.index_uid = $1
2452+
AND metrics_splits.split_id = input_splits.split_id
2453+
),
2454+
deleted AS (
2455+
DELETE FROM metrics_splits
2456+
USING input_splits
2457+
WHERE
2458+
metrics_splits.index_uid = $1
2459+
AND metrics_splits.split_id = input_splits.split_id
2460+
AND NOT EXISTS (
2461+
SELECT 1 FROM input_splits
2462+
WHERE split_state IN ('Staged', 'Published')
2463+
)
2464+
RETURNING metrics_splits.split_id
2465+
)
2466+
SELECT
2467+
(SELECT COUNT(*) FROM input_splits WHERE split_state IS NOT NULL) as num_found,
2468+
(SELECT COUNT(*) FROM deleted) as num_deleted,
2469+
COALESCE(
2470+
(SELECT ARRAY_AGG(split_id) FROM input_splits
2471+
WHERE split_state IN ('Staged', 'Published')),
2472+
ARRAY[]::text[]
2473+
) as not_deletable,
2474+
COALESCE(
2475+
(SELECT ARRAY_AGG(split_id) FROM input_splits
2476+
WHERE split_state IS NULL),
2477+
ARRAY[]::text[]
2478+
) as not_found
24512479
"#;
24522480

2453-
let deleted_split_ids: Vec<String> = sqlx::query_scalar(DELETE_SPLITS_QUERY)
2481+
let (num_found, num_deleted, not_deletable_ids, not_found_ids): (
2482+
i64,
2483+
i64,
2484+
Vec<String>,
2485+
Vec<String>,
2486+
) = sqlx::query_as(DELETE_SPLITS_QUERY)
24542487
.bind(request.index_uid())
24552488
.bind(&request.split_ids)
2456-
.fetch_all(&self.connection_pool)
2489+
.fetch_one(&self.connection_pool)
24572490
.await
24582491
.map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?;
24592492

2460-
// Log if some splits were not deleted (either non-existent or not
2461-
// in MarkedForDeletion state). Delete is idempotent — we don't error
2462-
// for missing splits.
2463-
if deleted_split_ids.len() != request.split_ids.len() {
2464-
let not_deleted: Vec<String> = request
2465-
.split_ids
2466-
.iter()
2467-
.filter(|id| !deleted_split_ids.contains(id))
2468-
.cloned()
2469-
.collect();
2493+
if !not_deletable_ids.is_empty() {
2494+
let message = format!(
2495+
"splits `{}` are not deletable",
2496+
not_deletable_ids.join(", ")
2497+
);
2498+
let entity = EntityKind::Splits {
2499+
split_ids: not_deletable_ids,
2500+
};
2501+
return Err(MetastoreError::FailedPrecondition { entity, message });
2502+
}
24702503

2471-
if !not_deleted.is_empty() {
2472-
warn!(
2473-
index_uid = %request.index_uid(),
2474-
not_deleted = ?not_deleted,
2475-
"some metrics splits were not deleted (non-existent or not marked for deletion)"
2476-
);
2477-
}
2504+
if !not_found_ids.is_empty() {
2505+
warn!(
2506+
index_uid = %request.index_uid(),
2507+
not_found = ?not_found_ids,
2508+
"{} metrics splits were not found and could not be deleted",
2509+
not_found_ids.len()
2510+
);
24782511
}
24792512

2513+
let _ = (num_found, num_deleted); // used by the CTE logic
2514+
24802515
info!(
24812516
index_uid = %request.index_uid(),
2482-
deleted_count = deleted_split_ids.len(),
2517+
num_deleted,
24832518
"deleted metrics splits successfully"
24842519
);
24852520
Ok(EmptyResponse {})

quickwit/quickwit-parquet-engine/src/split/metadata.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ pub struct MetricsSplitMetadata {
160160
/// When this split was created.
161161
pub created_at: SystemTime,
162162

163-
/// Parquet file path(s) relative to storage root.
164-
pub parquet_files: Vec<String>,
163+
/// Parquet file path relative to storage root.
164+
pub parquet_file: String,
165165

166166
/// Time window as `[start, start + duration)` in epoch seconds.
167167
/// None for pre-Phase-31 splits (backward compat).
@@ -175,7 +175,8 @@ pub struct MetricsSplitMetadata {
175175
/// 0 for newly ingested splits.
176176
pub num_merge_ops: u32,
177177

178-
/// RowKeys (sort-key min/max boundaries) as proto bytes.
178+
/// RowKeys (sort-key min/max boundaries) as serialized proto bytes
179+
/// ([`sortschema::RowKeys`](../../quickwit-proto/protos/event_store_sortschema/event_store_sortschema.proto)).
179180
/// None for pre-Phase-31 splits or splits without sort schema.
180181
pub row_keys_proto: Option<Vec<u8>>,
181182

@@ -198,7 +199,7 @@ struct MetricsSplitMetadataSerde {
198199
low_cardinality_tags: HashMap<String, HashSet<String>>,
199200
high_cardinality_tag_keys: HashSet<String>,
200201
created_at: SystemTime,
201-
parquet_files: Vec<String>,
202+
parquet_file: String,
202203

203204
#[serde(default, skip_serializing_if = "Option::is_none")]
204205
window_start: Option<i64>,
@@ -235,7 +236,7 @@ impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata {
235236
low_cardinality_tags: s.low_cardinality_tags,
236237
high_cardinality_tag_keys: s.high_cardinality_tag_keys,
237238
created_at: s.created_at,
238-
parquet_files: s.parquet_files,
239+
parquet_file: s.parquet_file,
239240
window,
240241
sort_fields: s.sort_fields,
241242
num_merge_ops: s.num_merge_ops,
@@ -261,7 +262,7 @@ impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde {
261262
low_cardinality_tags: m.low_cardinality_tags,
262263
high_cardinality_tag_keys: m.high_cardinality_tag_keys,
263264
created_at: m.created_at,
264-
parquet_files: m.parquet_files,
265+
parquet_file: m.parquet_file,
265266
window_start,
266267
window_duration_secs,
267268
sort_fields: m.sort_fields,
@@ -351,7 +352,7 @@ pub struct MetricsSplitMetadataBuilder {
351352
metric_names: HashSet<String>,
352353
low_cardinality_tags: HashMap<String, HashSet<String>>,
353354
high_cardinality_tag_keys: HashSet<String>,
354-
parquet_files: Vec<String>,
355+
parquet_file: String,
355356
window_start: Option<i64>,
356357
window_duration_secs: u32,
357358
sort_fields: String,
@@ -425,8 +426,8 @@ impl MetricsSplitMetadataBuilder {
425426
self
426427
}
427428

428-
pub fn add_parquet_file(mut self, path: impl Into<String>) -> Self {
429-
self.parquet_files.push(path.into());
429+
pub fn parquet_file(mut self, path: impl Into<String>) -> Self {
430+
self.parquet_file = path.into();
430431
self
431432
}
432433

@@ -500,7 +501,7 @@ impl MetricsSplitMetadataBuilder {
500501
low_cardinality_tags: self.low_cardinality_tags,
501502
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
502503
created_at: SystemTime::now(),
503-
parquet_files: self.parquet_files,
504+
parquet_file: self.parquet_file,
504505
window,
505506
sort_fields: self.sort_fields,
506507
num_merge_ops: self.num_merge_ops,
@@ -626,7 +627,7 @@ mod tests {
626627
"low_cardinality_tags": {},
627628
"high_cardinality_tag_keys": [],
628629
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
629-
"parquet_files": ["split1.parquet"]
630+
"parquet_file": "split1.parquet"
630631
}"#;
631632

632633
let metadata: MetricsSplitMetadata =

quickwit/quickwit-parquet-engine/src/storage/split_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl ParquetSplitWriter {
122122
.size_bytes(0)
123123
.sort_fields(self.writer.sort_fields_string())
124124
.window_duration_secs(window_duration)
125-
.add_parquet_file(filename);
125+
.parquet_file(filename);
126126

127127
if let Some(ws) = window_start_secs {
128128
builder = builder.window_start_secs(ws);

0 commit comments

Comments
 (0)