Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct IndexingSchedulerState {
///
/// Scheduling executes the following steps:
/// 1. Builds a [`PhysicalIndexingPlan`] from the list of logical indexing tasks. See
/// [`build_physical_indexing_plan`] for the implementation details.
/// `build_physical_indexing_plan` for the implementation details.
/// 2. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks
/// by gRPC. An indexer immediately returns an Ok and apply asynchronously the received plan. Any
/// errors (network) happening in this step are ignored. The scheduler runs a control loop that
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct IndexingSchedulerState {
/// Concretely, it will send the faulty nodes of the plan they are supposed to follow.
//
/// Finally, in order to give the time for each indexer to run their indexing tasks, the control
/// plane will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired
/// plane will wait at least `MIN_DURATION_BETWEEN_SCHEDULING` before comparing the desired
/// plan with the running plan.
pub struct IndexingScheduler {
cluster_id: String,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub enum UploaderType {
/// [`SplitsUpdateMailbox`] wraps either a [`Mailbox<Sequencer<P>>`] or [`Mailbox<P>`].
///
/// It makes it possible to send a splits update either to the [`Sequencer`] or directly
/// to the publisher actor `P`. It is used in combination with [`SplitsUpdateSender`] that
/// to the publisher actor `P`. It is used in combination with `SplitsUpdateSender` that
/// will do the send.
///
/// This is useful as we have different requirements between the indexing pipeline and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! [`FileBackedIndex`] module. It is public so that the crate `quickwit-backward-compat` can
//! import [`FileBackedIndex`] and run backward-compatibility tests. You should not have to import
//! `FileBackedIndex` module. It is public so that the crate `quickwit-backward-compat` can
//! import `FileBackedIndex` and run backward-compatibility tests. You should not have to import
//! anything from here directly.

mod serialize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::error;
use super::file_backed_index::FileBackedIndex;
use super::store_operations::{METASTORE_FILE_NAME, load_index};

/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first
/// Lazy `FileBackedIndex`. It loads a `FileBackedIndex` on demand. When the index is first
/// loaded, it optionally spawns a task to periodically poll the storage and update the index.
pub(crate) struct LazyFileBackedIndex {
index_id: IndexId,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

//! Module for [`FileBackedMetastore`]. It is public so that the crate `quickwit-backward-compat`
//! can import [`FileBackedIndex`] and run backward-compatibility tests. You should not have to
//! can import `FileBackedIndex` and run backward-compatibility tests. You should not have to
//! import anything from here directly.

pub mod file_backed_index;
Expand Down Expand Up @@ -116,9 +116,9 @@ impl From<bool> for MutationOccurred<()> {
/// into as many files and stores a map of indexes
/// (index_id, index_status) in a dedicated file `manifest.json`.
///
/// A [`LazyIndexStatus`] describes the lifecycle of an index: [`LazyIndexStatus::Creating`] and
/// [`LazyIndexStatus::Deleting`] are transitioning states that indicates that the index is not
/// yet available. On the contrary, the [`LazyIndexStatus::Active`] status indicates the index is
/// A `LazyIndexStatus` describes the lifecycle of an index: `LazyIndexStatus::Creating` and
/// `LazyIndexStatus::Deleting` are transitioning states that indicates that the index is not
/// yet available. On the contrary, the `LazyIndexStatus::Active` status indicates the index is
/// ready to be fetched and updated.
///
/// Transitioning states are useful to track inconsistencies between the in-memory and on-disk data
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore/src/metastore_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl fmt::Debug for MetastoreResolver {
}

impl MetastoreResolver {
/// Creates an empty [`MetastoreResolverBuilder`].
/// Creates an empty `MetastoreResolverBuilder`.
pub fn builder() -> MetastoreResolverBuilder {
MetastoreResolverBuilder::default()
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,9 @@ mod tests {
Some("unknown_service")
);
// No metric_unit tag when unit is empty
assert!(dp.tags.get("metric_unit").is_none());
assert!(!dp.tags.contains_key("metric_unit"));
// No start_timestamp_secs tag when start time is 0
assert!(dp.tags.get("start_timestamp_secs").is_none());
assert!(!dp.tags.contains_key("start_timestamp_secs"));
// Only "service" should be in tags (no attributes, no unit, no start time)
assert_eq!(dp.tags.len(), 1);
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-parquet-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
arrow = { workspace = true }
chrono = { workspace = true }
parquet = { workspace = true }
quickwit-common = { workspace = true }
quickwit-proto = { workspace = true }
sea-query = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-parquet-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ pub mod index;
pub mod ingest;
pub mod metrics;
pub mod schema;
pub mod sort_fields;
pub mod split;
pub mod storage;
pub mod table_config;

#[cfg(any(test, feature = "testsuite"))]
pub mod test_helpers;
240 changes: 240 additions & 0 deletions quickwit/quickwit-parquet-engine/src/sort_fields/column_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Column type identification from name suffixes and string names.
//!
//! Type can be specified via Husky-convention suffixes (`__s`, `__i`, `__nf`)
//! or inferred from well-known bare names. The discriminant values match
//! the Go iota exactly for cross-system interoperability.

use std::str::FromStr;

use super::SortFieldsError;

/// Well-known column name for timestamps.
pub const TIMESTAMP: &str = "timestamp";

/// Well-known column name for tiebreaker.
pub const TIEBREAKER: &str = "tiebreaker";

/// Well-known column name for timeseries ID hash.
pub const TIMESERIES_ID: &str = "timeseries_id";

/// Well-known column name for metric value.
pub const METRIC_VALUE: &str = "metric_value";

/// Column type IDs matching Go `types.TypeID` iota values.
///
/// Only the types that appear in sort schemas are included here.
/// The discriminant values MUST match Go exactly for cross-system interop.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum ColumnTypeId {
Int64 = 2,
Float64 = 10,
String = 14,
Sketch = 17,
CpcSketch = 20,
ItemSketch = 22,
}

impl ColumnTypeId {
/// The Husky-convention suffix for this column type.
///
/// Used when serializing back to the string format with explicit types.
pub fn suffix(self) -> &'static str {
match self {
Self::Int64 => "__i",
Self::Float64 => "__nf",
Self::String => "__s",
Self::Sketch => "__sk",
Self::CpcSketch => "__cpcsk",
Self::ItemSketch => "__isk",
}
}

/// Human-readable type name matching Go `TypeID.String()`.
pub fn as_str(self) -> &'static str {
match self {
Self::Int64 => "dense-int64",
Self::Float64 => "dense-float64",
Self::String => "dense-string",
Self::Sketch => "dense-sketch",
Self::CpcSketch => "dense-cpc-sketch",
Self::ItemSketch => "dense-item-sketch",
}
}

/// Resolve column type from a column name, stripping any type suffix.
///
/// Returns `(bare_name, type)`. Type resolution order:
/// 1. Explicit suffix (`__s`, `__i`, `__nf`, etc.) — stripped, type from suffix
/// 2. Well-known bare name defaults:
/// - `timestamp`, `tiebreaker`, `timeseries_id` → Int64
/// - `metric_value` → Float64
/// - everything else → String
pub fn from_column_name(name: &str) -> Result<(&str, Self), SortFieldsError> {
// Try explicit suffixes first (longest match first to avoid ambiguity).
if let Some(bare) = name.strip_suffix("__isk") {
return Ok((bare, Self::ItemSketch));
}
if let Some(bare) = name.strip_suffix("__cpcsk") {
return Ok((bare, Self::CpcSketch));
}
if let Some(bare) = name.strip_suffix("__sk") {
return Ok((bare, Self::Sketch));
}
if let Some(bare) = name.strip_suffix("__nf") {
return Ok((bare, Self::Float64));
}
if let Some(bare) = name.strip_suffix("__i") {
return Ok((bare, Self::Int64));
}
if let Some(bare) = name.strip_suffix("__s") {
return Ok((bare, Self::String));
}

// No suffix — use well-known name defaults.
Ok((name, default_type_for_name(name)))
}
}

/// Default column type and sort direction for a bare column name.
///
/// This is the single source of truth for well-known column defaults.
/// Used by the parser (type inference, default direction), display
/// (suffix omission, direction omission), and validation.
pub struct ColumnDefaults {
pub column_type: ColumnTypeId,
/// True if the default sort direction is descending.
pub descending: bool,
}

/// Well-known name → default type and sort direction lookup table.
///
/// Columns not in this table default to String, ascending.
static WELL_KNOWN_COLUMNS: &[(&str, ColumnDefaults)] = &[
(
TIMESTAMP,
ColumnDefaults {
column_type: ColumnTypeId::Int64,
descending: true,
},
),
(
"timestamp_secs",
ColumnDefaults {
column_type: ColumnTypeId::Int64,
descending: true,
},
),
(
TIEBREAKER,
ColumnDefaults {
column_type: ColumnTypeId::Int64,
descending: false,
},
),
(
TIMESERIES_ID,
ColumnDefaults {
column_type: ColumnTypeId::Int64,
descending: false,
},
),
(
METRIC_VALUE,
ColumnDefaults {
column_type: ColumnTypeId::Float64,
descending: false,
},
),
(
"value",
ColumnDefaults {
column_type: ColumnTypeId::Float64,
descending: false,
},
),
];

const DEFAULT_COLUMN: ColumnDefaults = ColumnDefaults {
column_type: ColumnTypeId::String,
descending: false,
};

/// Look up default type and direction for a bare column name.
pub fn column_defaults(name: &str) -> &'static ColumnDefaults {
WELL_KNOWN_COLUMNS
.iter()
.find(|(n, _)| *n == name)
.map(|(_, d)| d)
.unwrap_or(&DEFAULT_COLUMN)
}

/// Default column type for a bare name (convenience wrapper).
pub fn default_type_for_name(name: &str) -> ColumnTypeId {
column_defaults(name).column_type
}

/// Whether this bare name defaults to descending sort.
pub fn default_is_descending(name: &str) -> bool {
column_defaults(name).descending
}

impl std::fmt::Display for ColumnTypeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

/// Parse a type name string (e.g., "dense-int64") into a `ColumnTypeId`.
impl FromStr for ColumnTypeId {
type Err = SortFieldsError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"dense-int64" => Ok(Self::Int64),
"dense-float64" => Ok(Self::Float64),
"dense-string" => Ok(Self::String),
"dense-sketch" => Ok(Self::Sketch),
"dense-cpc-sketch" => Ok(Self::CpcSketch),
"dense-item-sketch" => Ok(Self::ItemSketch),
_ => Err(SortFieldsError::UnknownColumnType(format!(
"unknown column type '{}'",
s
))),
}
}
}

/// Convert a proto `column_type` u64 back to a `ColumnTypeId`.
impl TryFrom<u64> for ColumnTypeId {
type Error = SortFieldsError;

fn try_from(value: u64) -> Result<Self, Self::Error> {
match value {
2 => Ok(Self::Int64),
10 => Ok(Self::Float64),
14 => Ok(Self::String),
17 => Ok(Self::Sketch),
20 => Ok(Self::CpcSketch),
22 => Ok(Self::ItemSketch),
_ => Err(SortFieldsError::UnknownColumnType(format!(
"unknown column type id: {}",
value
))),
}
}
}
Loading
Loading