Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ pub fn is_metrics_index(index_id: &str) -> bool {
index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-")
}

/// Returns whether the given index ID corresponds to a sketches index.
///
/// Sketches indexes use the Parquet/DataFusion pipeline with sketch-specific
/// processors and writers.
pub fn is_sketches_index(index_id: &str) -> bool {
index_id.starts_with("sketches-")
}

/// Returns whether the given index ID uses the Parquet/DataFusion pipeline.
pub fn is_parquet_pipeline_index(index_id: &str) -> bool {
is_metrics_index(index_id) || is_sketches_index(index_id)
}

#[macro_export]
macro_rules! ignore_error_kind {
($kind:path, $expr:expr) => {
Expand Down Expand Up @@ -444,6 +457,21 @@ mod tests {
assert!(!is_metrics_index("my-metrics-index")); // Not prefixed
}

#[test]
fn test_is_sketches_index() {
assert!(is_sketches_index("sketches-default"));
assert!(!is_sketches_index("otel-metrics"));
assert!(!is_sketches_index("my-index"));
}

#[test]
fn test_is_parquet_pipeline_index() {
assert!(is_parquet_pipeline_index("otel-metrics"));
assert!(is_parquet_pipeline_index("sketches-default"));
assert!(!is_parquet_pipeline_index("otel-logs-v0_7"));
assert!(!is_parquet_pipeline_index("my-index"));
}

#[test]
fn test_parse_bool_lenient() {
assert_eq!(parse_bool_lenient("true"), Some(true));
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::{Duration, Instant};
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_common::is_metrics_index;
use quickwit_common::is_parquet_pipeline_index;
use quickwit_common::pretty::PrettySample;
use quickwit_config::{FileSourceParams, SourceParams, indexing_pipeline_params_fingerprint};
use quickwit_proto::indexing::{
Expand Down Expand Up @@ -218,7 +218,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
SourceParams::IngestApi => {
// Metrics indexes should use IngestV2 only, not IngestV1.
// The ParquetSourceLoader doesn't support IngestV1.
if is_metrics_index(&source_uid.index_uid.index_id) {
if is_parquet_pipeline_index(&source_uid.index_uid.index_id) {
continue;
}
// TODO ingest v1 is scheduled differently
Expand Down
46 changes: 38 additions & 8 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_actors::{
use quickwit_common::metrics::OwnedGaugeGuard;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::{KillSwitch, is_metrics_index};
use quickwit_common::{KillSwitch, is_parquet_pipeline_index, is_sketches_index};
use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_ingest::IngesterPool;
Expand Down Expand Up @@ -409,9 +409,12 @@ impl IndexingPipeline {

let index_id = &self.params.pipeline_id.index_uid.index_id;

// Route metrics indexes to the Parquet/DataFusion pipeline
if is_metrics_index(index_id) {
return self.spawn_parquet_pipeline(ctx).await;
// Route metrics and sketches indexes to the Parquet/DataFusion pipeline
if is_parquet_pipeline_index(index_id) {
let use_sketch_processors = is_sketches_index(index_id);
return self
.spawn_parquet_pipeline(ctx, use_sketch_processors)
.await;
}

let source_id = &self.params.pipeline_id.source_id;
Expand Down Expand Up @@ -585,16 +588,21 @@ impl IndexingPipeline {
index=%self.params.pipeline_id.index_uid.index_id,
r#gen=self.generation()
))]
async fn spawn_parquet_pipeline(&mut self, ctx: &ActorContext<Self>) -> anyhow::Result<()> {
async fn spawn_parquet_pipeline(
&mut self,
ctx: &ActorContext<Self>,
use_sketch_processors: bool,
) -> anyhow::Result<()> {
let index_id = &self.params.pipeline_id.index_uid.index_id;
let source_id = &self.params.pipeline_id.source_id;

info!(
index_id,
source_id,
use_sketch_processors,
pipeline_uid=%self.params.pipeline_id.pipeline_uid,
root_dir=%self.params.indexing_directory.path().display(),
"spawning parquet indexing pipeline for metrics",
"spawning parquet indexing pipeline",
);

let (source_mailbox, source_inbox) = ctx
Expand Down Expand Up @@ -638,11 +646,23 @@ impl IndexingPipeline {

// ParquetPackager
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
let split_kind = if use_sketch_processors {
quickwit_parquet_engine::split::ParquetSplitKind::Sketches
} else {
quickwit_parquet_engine::split::ParquetSplitKind::Metrics
};
let sort_order = if use_sketch_processors {
quickwit_parquet_engine::schema::SKETCH_SORT_ORDER
} else {
quickwit_parquet_engine::schema::SORT_ORDER
};
let split_writer_kind = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
split_kind,
writer_config,
sort_order,
self.params.indexing_directory.path(),
);
let parquet_packager = ParquetPackager::new(split_writer, parquet_uploader_mailbox);
let parquet_packager = ParquetPackager::new(split_writer_kind, parquet_uploader_mailbox);
let (parquet_packager_mailbox, parquet_packager_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
Expand All @@ -664,7 +684,17 @@ impl IndexingPipeline {
.spawn(parquet_indexer);

// ParquetDocProcessor
let processor = if use_sketch_processors {
crate::actors::parquet_doc_processor::IngestProcessor::Sketches(
quickwit_parquet_engine::ingest::SketchParquetIngestProcessor::new(),
)
} else {
crate::actors::parquet_doc_processor::IngestProcessor::Metrics(
quickwit_parquet_engine::ingest::ParquetIngestProcessor,
)
};
let parquet_doc_processor = ParquetDocProcessor::new(
processor,
index_id.to_string(),
source_id.to_string(),
parquet_indexer_mailbox,
Expand Down
42 changes: 34 additions & 8 deletions quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
use quickwit_parquet_engine::ingest::{IngestError, ParquetIngestProcessor};
use quickwit_parquet_engine::ingest::{
IngestError, ParquetIngestProcessor, SketchParquetIngestProcessor,
};
use quickwit_proto::types::{IndexId, SourceId};
use serde::Serialize;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -116,17 +118,32 @@ pub enum ParquetDocProcessorError {
Ingest(#[from] IngestError),
}

/// ParquetDocProcessor actor that routes Arrow IPC batches to the metrics engine.
/// Enum wrapping the ingest processor variant for metrics vs sketches.
pub enum IngestProcessor {
Metrics(ParquetIngestProcessor),
Sketches(SketchParquetIngestProcessor),
}

impl IngestProcessor {
fn process_ipc(&self, ipc_bytes: &[u8]) -> Result<RecordBatch, IngestError> {
match self {
Self::Metrics(p) => p.process_ipc(ipc_bytes),
Self::Sketches(p) => p.process_ipc(ipc_bytes),
}
}
}

/// ParquetDocProcessor actor that routes Arrow IPC batches to the parquet engine.
///
/// This actor receives RawDocBatch messages containing Arrow IPC data and converts
/// them to RecordBatch using ParquetIngestProcessor. The resulting batches are
/// forwarded to ParquetIndexer for accumulation and split production.
/// them to RecordBatch using the configured ingest processor. The resulting batches
/// are forwarded to ParquetIndexer for accumulation and split production.
///
/// Unlike DocProcessor which converts to Tantivy documents, this actor works
/// exclusively with Arrow RecordBatch for high-throughput metrics ingestion.
/// exclusively with Arrow RecordBatch for high-throughput metrics/sketch ingestion.
pub struct ParquetDocProcessor {
/// Processor for converting Arrow IPC to RecordBatch.
processor: ParquetIngestProcessor,
processor: IngestProcessor,
/// Processing counters.
counters: ParquetDocProcessorCounters,
/// Publish lock for coordinating with sources.
Expand All @@ -138,11 +155,11 @@ pub struct ParquetDocProcessor {
impl ParquetDocProcessor {
/// Creates a new ParquetDocProcessor.
pub fn new(
processor: IngestProcessor,
index_id: IndexId,
source_id: SourceId,
indexer_mailbox: Mailbox<ParquetIndexer>,
) -> Self {
let processor = ParquetIngestProcessor;
let counters = ParquetDocProcessorCounters::new(index_id.clone(), source_id.clone());

info!(
Expand Down Expand Up @@ -403,6 +420,7 @@ mod tests {

let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
let metrics_doc_processor = ParquetDocProcessor::new(
IngestProcessor::Metrics(ParquetIngestProcessor),
"test-metrics-index".to_string(),
"test-source".to_string(),
indexer_mailbox,
Expand Down Expand Up @@ -444,6 +462,7 @@ mod tests {

let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
let metrics_doc_processor = ParquetDocProcessor::new(
IngestProcessor::Metrics(ParquetIngestProcessor),
"test-metrics-index".to_string(),
"test-source".to_string(),
indexer_mailbox,
Expand Down Expand Up @@ -479,6 +498,7 @@ mod tests {

let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
let metrics_doc_processor = ParquetDocProcessor::new(
IngestProcessor::Metrics(ParquetIngestProcessor),
"test-metrics-index".to_string(),
"test-source".to_string(),
indexer_mailbox,
Expand Down Expand Up @@ -549,7 +569,12 @@ mod tests {

// Create ParquetPackager
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
let split_writer = ParquetSplitWriter::new(
quickwit_parquet_engine::split::ParquetSplitKind::Metrics,
writer_config,
quickwit_parquet_engine::schema::SORT_ORDER,
temp_dir.path(),
);
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);

Expand All @@ -564,6 +589,7 @@ mod tests {
let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer);

let metrics_doc_processor = ParquetDocProcessor::new(
IngestProcessor::Metrics(ParquetIngestProcessor),
"test-index".to_string(),
"test-source".to_string(),
indexer_mailbox,
Expand Down
Loading
Loading