Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Reverse Phase 31: Remove compaction metadata columns.
DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs;
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Phase 31: Add compaction metadata columns to metrics_splits.
-- These columns support time-windowed compaction planning and execution.
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT '';
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA;
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}';

-- Compaction scope index: supports the compaction planner's primary query pattern
-- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple."
CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the log side, we have a column (maturity_timestamp) that is used (I think?) to filter further the scope of splits candidate for merge: they need to be published and immature.

I would prefer to keep the same logic, but it should work without it.

Generally speaking AI hurts the consistency of this project. I guess that's the price to pay for the increased productivity.

ON metrics_splits (index_uid, sort_fields, window_start)
WHERE split_state = 'Published';
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,18 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
}
}

// Filter by compaction scope
if let Some(ws) = query.window_start
&& split.metadata.window_start() != Some(ws)
{
return false;
}
if let Some(ref sf) = query.sort_fields
&& split.metadata.sort_fields != *sf
{
return false;
}

true
}

Expand Down
15 changes: 15 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub struct ListMetricsSplitsQuery {
pub tag_region: Option<String>,
/// Host tag filter.
pub tag_host: Option<String>,
/// Window start filter for compaction scope queries.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, so if I understand correctly, this query object exists both for compaction and for the read path.

For the read path we use range_start/range_end, for compaction we use window_start.
Do we ever change the window duration in reality?

Could we drop window_start and query for [time_range_start..time_range_start + 15mn) ?

pub window_start: Option<i64>,
/// Sort fields filter for compaction scope queries.
pub sort_fields: Option<String>,
/// Limit number of results.
pub limit: Option<usize>,
}
Expand Down Expand Up @@ -107,6 +111,17 @@ impl ListMetricsSplitsQuery {
self.metric_names = names;
self
}

/// Filter by compaction scope (window_start + sort_fields).
pub fn with_compaction_scope(
mut self,
window_start: i64,
sort_fields: impl Into<String>,
) -> Self {
self.window_start = Some(window_start);
self.sort_fields = Some(sort_fields.into());
self
}
}

/// Splits batch size returned by the stream splits API
Expand Down
175 changes: 110 additions & 65 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,13 @@ impl MetastoreService for PostgresqlMetastore {
let mut num_rows_list = Vec::with_capacity(splits_metadata.len());
let mut size_bytes_list = Vec::with_capacity(splits_metadata.len());
let mut split_metadata_jsons = Vec::with_capacity(splits_metadata.len());
let mut window_starts: Vec<Option<i64>> = Vec::with_capacity(splits_metadata.len());
let mut window_duration_secs_list: Vec<Option<i32>> =
Vec::with_capacity(splits_metadata.len());
let mut sort_fields_list: Vec<String> = Vec::with_capacity(splits_metadata.len());
let mut num_merge_ops_list: Vec<i32> = Vec::with_capacity(splits_metadata.len());
let mut row_keys_list: Vec<Option<Vec<u8>>> = Vec::with_capacity(splits_metadata.len());
let mut zonemap_regexes_json_list: Vec<String> = Vec::with_capacity(splits_metadata.len());

for metadata in &splits_metadata {
let insertable =
Expand Down Expand Up @@ -1837,6 +1844,12 @@ impl MetastoreService for PostgresqlMetastore {
num_rows_list.push(insertable.num_rows);
size_bytes_list.push(insertable.size_bytes);
split_metadata_jsons.push(insertable.split_metadata_json);
window_starts.push(insertable.window_start);
window_duration_secs_list.push(insertable.window_duration_secs);
sort_fields_list.push(insertable.sort_fields);
num_merge_ops_list.push(insertable.num_merge_ops);
row_keys_list.push(insertable.row_keys);
zonemap_regexes_json_list.push(insertable.zonemap_regexes.to_string());
}

info!(
Expand All @@ -1863,6 +1876,12 @@ impl MetastoreService for PostgresqlMetastore {
num_rows,
size_bytes,
split_metadata_json,
window_start,
window_duration_secs,
sort_fields,
num_merge_ops,
row_keys,
zonemap_regexes,
create_timestamp,
update_timestamp
)
Expand All @@ -1887,6 +1906,12 @@ impl MetastoreService for PostgresqlMetastore {
num_rows,
size_bytes,
split_metadata_json,
window_start,
window_duration_secs,
sort_fields,
num_merge_ops,
row_keys,
zonemap_regexes_json::jsonb,
(CURRENT_TIMESTAMP AT TIME ZONE 'UTC'),
(CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
FROM UNNEST(
Expand All @@ -1904,7 +1929,13 @@ impl MetastoreService for PostgresqlMetastore {
$12::text[],
$13::bigint[],
$14::bigint[],
$15::text[]
$15::text[],
$16::bigint[],
$17::int[],
$18::text[],
$19::int[],
$20::bytea[],
$21::text[]
) AS staged(
split_id,
split_state,
Expand All @@ -1920,7 +1951,13 @@ impl MetastoreService for PostgresqlMetastore {
high_cardinality_tag_keys_json,
num_rows,
size_bytes,
split_metadata_json
split_metadata_json,
window_start,
window_duration_secs,
sort_fields,
num_merge_ops,
row_keys,
zonemap_regexes_json
)
ON CONFLICT (split_id) DO UPDATE
SET
Expand All @@ -1937,6 +1974,12 @@ impl MetastoreService for PostgresqlMetastore {
num_rows = EXCLUDED.num_rows,
size_bytes = EXCLUDED.size_bytes,
split_metadata_json = EXCLUDED.split_metadata_json,
window_start = EXCLUDED.window_start,
window_duration_secs = EXCLUDED.window_duration_secs,
sort_fields = EXCLUDED.sort_fields,
num_merge_ops = EXCLUDED.num_merge_ops,
row_keys = EXCLUDED.row_keys,
zonemap_regexes = EXCLUDED.zonemap_regexes,
update_timestamp = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
WHERE metrics_splits.split_state = 'Staged'
RETURNING split_id
Expand All @@ -1962,6 +2005,12 @@ impl MetastoreService for PostgresqlMetastore {
.bind(&num_rows_list)
.bind(&size_bytes_list)
.bind(&split_metadata_jsons)
.bind(&window_starts)
.bind(&window_duration_secs_list)
.bind(&sort_fields_list)
.bind(&num_merge_ops_list)
.bind(&row_keys_list)
.bind(&zonemap_regexes_json_list)
.fetch_all(tx.as_mut())
.await
.map_err(|sqlx_error| convert_sqlx_err(&index_id_for_err, sqlx_error))
Expand Down Expand Up @@ -2102,26 +2151,22 @@ impl MetastoreService for PostgresqlMetastore {
.await
.map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?;

// Verify all staged splits were published
// Verify all staged splits were published.
// If some splits are missing, it means they don't exist (NotFound),
// not that they're in the wrong state (FailedPrecondition).
if published_count as usize != staged_split_ids.len() {
let entity = EntityKind::Splits {
return Err(MetastoreError::NotFound(EntityKind::Splits {
split_ids: staged_split_ids.clone(),
};
let message = format!(
"expected to publish {} splits, but only {} were in Staged state",
staged_split_ids.len(),
published_count
);
return Err(MetastoreError::FailedPrecondition { entity, message });
}));
}

// Verify all replaced splits were marked for deletion
// Verify all replaced splits were marked for deletion.
if marked_count as usize != replaced_split_ids.len() {
let entity = EntityKind::Splits {
split_ids: replaced_split_ids.clone(),
};
let message = format!(
"expected to mark {} splits for deletion, but only {} were in Published state",
"expected to replace {} splits, but only {} were in Published state",
replaced_split_ids.len(),
marked_count
);
Expand Down Expand Up @@ -2169,7 +2214,13 @@ impl MetastoreService for PostgresqlMetastore {
num_rows,
size_bytes,
split_metadata_json,
EXTRACT(EPOCH FROM update_timestamp)::bigint as update_timestamp
EXTRACT(EPOCH FROM update_timestamp)::bigint as update_timestamp,
window_start,
window_duration_secs,
sort_fields,
num_merge_ops,
row_keys,
zonemap_regexes
FROM metrics_splits
WHERE index_uid = $1
"#,
Expand Down Expand Up @@ -2221,6 +2272,16 @@ impl MetastoreService for PostgresqlMetastore {
param_idx += 1;
}

// Compaction scope filters
if query.window_start.is_some() {
sql.push_str(&format!(" AND window_start = ${}", param_idx));
param_idx += 1;
}
if query.sort_fields.is_some() {
sql.push_str(&format!(" AND sort_fields = ${}", param_idx));
param_idx += 1;
}

sql.push_str(" ORDER BY time_range_start ASC");

// Add limit
Expand All @@ -2229,27 +2290,7 @@ impl MetastoreService for PostgresqlMetastore {
}

// Execute query with bindings
let mut query_builder = sqlx::query_as::<
_,
(
String, // split_id
String, // split_state
String, // index_uid
i64, // time_range_start
i64, // time_range_end
Vec<String>, // metric_names
Option<Vec<String>>, // tag_service
Option<Vec<String>>, // tag_env
Option<Vec<String>>, // tag_datacenter
Option<Vec<String>>, // tag_region
Option<Vec<String>>, // tag_host
Vec<String>, // high_cardinality_tag_keys
i64, // num_rows
i64, // size_bytes
String, // split_metadata_json
i64, // update_timestamp
),
>(&sql);
let mut query_builder = sqlx::query(&sql);

query_builder = query_builder.bind(query.index_uid.to_string());

Expand Down Expand Up @@ -2280,6 +2321,12 @@ impl MetastoreService for PostgresqlMetastore {
if let Some(ref host) = query.tag_host {
query_builder = query_builder.bind(host);
}
if let Some(ws) = query.window_start {
query_builder = query_builder.bind(ws);
}
if let Some(ref sf) = query.sort_fields {
query_builder = query_builder.bind(sf);
}
if let Some(limit) = query.limit {
query_builder = query_builder.bind(limit as i64);
}
Expand All @@ -2294,38 +2341,39 @@ impl MetastoreService for PostgresqlMetastore {
.into_iter()
.filter_map(|row| {
use quickwit_parquet_engine::split::{MetricsSplitState, PgMetricsSplit};
use sqlx::Row as _;

let pg_split = PgMetricsSplit {
split_id: row.0,
split_state: row.1,
index_uid: row.2,
time_range_start: row.3,
time_range_end: row.4,
metric_names: row.5,
tag_service: row.6,
tag_env: row.7,
tag_datacenter: row.8,
tag_region: row.9,
tag_host: row.10,
high_cardinality_tag_keys: row.11,
num_rows: row.12,
size_bytes: row.13,
split_metadata_json: row.14,
update_timestamp: row.15,
window_start: None,
window_duration_secs: 0,
sort_fields: String::new(),
num_merge_ops: 0,
row_keys: None,
zonemap_regexes: String::new(),
split_id: row.get("split_id"),
split_state: row.get("split_state"),
index_uid: row.get("index_uid"),
time_range_start: row.get("time_range_start"),
time_range_end: row.get("time_range_end"),
metric_names: row.get("metric_names"),
tag_service: row.get("tag_service"),
tag_env: row.get("tag_env"),
tag_datacenter: row.get("tag_datacenter"),
tag_region: row.get("tag_region"),
tag_host: row.get("tag_host"),
high_cardinality_tag_keys: row.get("high_cardinality_tag_keys"),
num_rows: row.get("num_rows"),
size_bytes: row.get("size_bytes"),
split_metadata_json: row.get("split_metadata_json"),
update_timestamp: row.get("update_timestamp"),
window_start: row.get("window_start"),
window_duration_secs: row.get("window_duration_secs"),
sort_fields: row.get("sort_fields"),
num_merge_ops: row.get("num_merge_ops"),
row_keys: row.get("row_keys"),
zonemap_regexes: row.get("zonemap_regexes"),
};

let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);
let metadata = pg_split.to_metadata().ok()?;

Some(MetricsSplitRecord {
state,
update_timestamp: row.15,
update_timestamp: pg_split.update_timestamp,
metadata,
})
})
Expand Down Expand Up @@ -2408,7 +2456,9 @@ impl MetastoreService for PostgresqlMetastore {
.await
.map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?;

// Check if any splits could not be deleted
// Log if some splits were not deleted (either non-existent or not
// in MarkedForDeletion state). Delete is idempotent — we don't error
// for missing splits.
if deleted_split_ids.len() != request.split_ids.len() {
let not_deleted: Vec<String> = request
.split_ids
Expand All @@ -2421,13 +2471,8 @@ impl MetastoreService for PostgresqlMetastore {
warn!(
index_uid = %request.index_uid(),
not_deleted = ?not_deleted,
"some metrics splits were not in MarkedForDeletion state"
"some metrics splits were not deleted (non-existent or not marked for deletion)"
);
let entity = EntityKind::Splits {
split_ids: not_deleted,
};
let message = "splits are not marked for deletion".to_string();
return Err(MetastoreError::FailedPrecondition { entity, message });
}
}

Expand Down
Loading
Loading