Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] }
tempfile = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,8 @@ impl IndexingPipeline {
.spawn(parquet_uploader);

// ParquetPackager
let parquet_schema = quickwit_parquet_engine::schema::ParquetSchema::new();
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
parquet_schema,
writer_config,
self.params.indexing_directory.path(),
);
Expand Down
221 changes: 7 additions & 214 deletions quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
use quickwit_parquet_engine::ingest::{IngestError, ParquetIngestProcessor};
use quickwit_parquet_engine::schema::ParquetSchema;
use quickwit_proto::types::{IndexId, SourceId};
use serde::Serialize;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -143,8 +142,7 @@ impl ParquetDocProcessor {
source_id: SourceId,
indexer_mailbox: Mailbox<ParquetIndexer>,
) -> Self {
let schema = ParquetSchema::new();
let processor = ParquetIngestProcessor::new(schema);
let processor = ParquetIngestProcessor;
let counters = ParquetDocProcessorCounters::new(index_id.clone(), source_id.clone());

info!(
Expand Down Expand Up @@ -306,7 +304,7 @@ impl Handler<RawDocBatch> for ParquetDocProcessor {
// forever.
if !checkpoint_forwarded && !checkpoint_delta.is_empty() {
let empty_batch =
RecordBatch::new_empty(self.processor.schema().arrow_schema().clone());
RecordBatch::new_empty(std::sync::Arc::new(arrow::datatypes::Schema::empty()));
Comment on lines 306 to +307
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve required schema on checkpoint-only flush batches

Constructing the fallback checkpoint batch with Schema::empty() causes a hard failure when force_commit is true and no valid docs were forwarded (for example, all docs in the raw batch failed parsing): the indexer still flushes this batch, and ParquetWriter::write_to_file now rejects it because required columns are missing. In that path, the packager exits with an error instead of forwarding the checkpoint, which can stall ingestion progress for that shard.

Useful? React with 👍 / 👎.

let processed_batch =
ProcessedParquetBatch::new(empty_batch, checkpoint_delta, force_commit);
ctx.send_message(&self.indexer_mailbox, processed_batch)
Expand Down Expand Up @@ -399,14 +397,8 @@ mod tests {

#[tokio::test]
async fn test_metrics_doc_processor_valid_arrow_ipc() {
use std::sync::Arc as StdArc;
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;

use arrow::array::{
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
StructArray, UInt8Array, UInt64Array,
};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow::record_batch::RecordBatch;
let universe = Universe::with_accelerated_time();

let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
Expand All @@ -419,103 +411,7 @@ mod tests {
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
universe.spawn_builder().spawn(metrics_doc_processor);

// Create a test batch matching the metrics schema
let schema = ParquetSchema::new();
let num_rows = 3;

// Helper to create dictionary arrays
fn create_dict_array(values: &[&str]) -> ArrayRef {
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let string_array = StringArray::from(values.to_vec());
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
let keys: Vec<Option<i32>> = values
.iter()
.enumerate()
.map(|(i, v)| v.map(|_| i as i32))
.collect();
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
let string_array = StringArray::from(string_values);
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(vec![100u64, 101u64, 102u64]));
let start_timestamp_secs: ArrayRef =
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
let value: ArrayRef = StdArc::new(Float64Array::from(vec![42.0, 43.0, 44.0]));
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
let tag_datacenter: ArrayRef =
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);

// Create empty Variant (Struct with metadata and value BinaryView fields)
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array.clone() as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array.clone() as ArrayRef,
),
]));

let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);

let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array as ArrayRef,
),
]));

let batch = RecordBatch::try_new(
schema.arrow_schema().clone(),
vec![
metric_name,
metric_type,
metric_unit,
timestamp_secs,
start_timestamp_secs,
value,
tag_service,
tag_env,
tag_datacenter,
tag_region,
tag_host,
attributes,
service_name,
resource_attributes,
],
)
.unwrap();

// Serialize to Arrow IPC
let batch = create_test_batch_with_tags(3, &["service"]);
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();

// Create RawDocBatch with the IPC bytes
Expand Down Expand Up @@ -624,13 +520,8 @@ mod tests {
async fn test_metrics_doc_processor_with_indexer() {
use std::sync::Arc as StdArc;

use arrow::array::{
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
StructArray, UInt8Array, UInt64Array,
};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow::record_batch::RecordBatch;
use quickwit_parquet_engine::storage::{ParquetSplitWriter, ParquetWriterConfig};
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;
use quickwit_proto::metastore::MockMetastoreService;
use quickwit_storage::RamStorage;

Expand All @@ -657,9 +548,8 @@ mod tests {
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

// Create ParquetPackager
let parquet_schema = ParquetSchema::new();
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(parquet_schema, writer_config, temp_dir.path());
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);

Expand All @@ -681,104 +571,7 @@ mod tests {
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
universe.spawn_builder().spawn(metrics_doc_processor);

// Create a test batch
let schema = ParquetSchema::new();
let num_rows = 5;

fn create_dict_array(values: &[&str]) -> ArrayRef {
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let string_array = StringArray::from(values.to_vec());
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
let keys: Vec<Option<i32>> = values
.iter()
.enumerate()
.map(|(i, v)| v.map(|_| i as i32))
.collect();
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
let string_array = StringArray::from(string_values);
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
let timestamps: Vec<u64> = (0..num_rows).map(|i| 100 + i as u64).collect();
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(timestamps));
let start_timestamp_secs: ArrayRef =
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
let values: Vec<f64> = (0..num_rows).map(|i| 42.0 + i as f64).collect();
let value: ArrayRef = StdArc::new(Float64Array::from(values));
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
let tag_datacenter: ArrayRef =
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);

// Create empty Variant (Struct with metadata and value BinaryView fields)
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array.clone() as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array.clone() as ArrayRef,
),
]));

let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);

let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array as ArrayRef,
),
]));

let batch = RecordBatch::try_new(
schema.arrow_schema().clone(),
vec![
metric_name,
metric_type,
metric_unit,
timestamp_secs,
start_timestamp_secs,
value,
tag_service,
tag_env,
tag_datacenter,
tag_region,
tag_host,
attributes,
service_name,
resource_attributes,
],
)
.unwrap();

// Serialize to Arrow IPC
let batch = create_test_batch_with_tags(5, &["service"]);
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();

// Create RawDocBatch with force_commit to trigger split production
Expand Down
Loading
Loading