Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 132 additions & 14 deletions vortex-layout/src/layouts/dict/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use vortex_array::builders::dict::DictEncoder;
use vortex_array::builders::dict::dict_encoder;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::PType;
use vortex_error::VortexError;
use vortex_error::VortexResult;
Expand All @@ -49,10 +48,22 @@ use crate::sequence::SequentialStream;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;

/// Constraints for dictionary layout encoding.
///
/// Note that [`max_len`](Self::max_len) is limited to `u16` (65,535 entries) by design. Since
/// layout chunks are typically ~8k elements, having more than 64k unique values in a dictionary
/// means dictionary encoding provides little compression benefit. If a column has very high
/// cardinality, the fallback encoding strategy should be used instead.
#[derive(Clone)]
pub struct DictLayoutConstraints {
/// Maximum size of the dictionary in bytes.
pub max_bytes: usize,
// Dict layout codes currently only support u16 codes
/// Maximum dictionary length. Limited to `u16` because dictionaries with more than 64k unique
/// values provide diminishing compression returns given typical chunk sizes (~8k elements).
///
/// The codes dtype is chosen dynamically based on the actual dictionary size:
/// - [`PType::U8`] when the dictionary has at most 255 entries
/// - [`PType::U16`] when the dictionary has more than 255 entries
pub max_len: u16,
}

Expand Down Expand Up @@ -387,7 +398,7 @@ impl Stream for DictionaryTransformer {
}

match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(DictionaryChunk::Codes(codes)))) => {
Poll::Ready(Some(Ok(DictionaryChunk::Codes((seq_id, codes))))) => {
if self.active_codes_tx.is_none() {
// Start a new group
let (codes_tx, codes_rx) = kanal::bounded_async::<SequencedChunk>(1);
Expand All @@ -396,13 +407,17 @@ impl Stream for DictionaryTransformer {
self.active_codes_tx = Some(codes_tx.clone());
self.active_values_tx = Some(values_tx);

// Send first codes
let codes_dtype = codes.dtype().clone();

// Send first codes.
self.pending_send =
Some(Box::pin(async move { codes_tx.send(Ok(codes)).await }));
Some(Box::pin(
async move { codes_tx.send(Ok((seq_id, codes))).await },
));

// Create output streams
// Create output streams.
let codes_stream = SequentialStreamAdapter::new(
DType::Primitive(PType::U16, NonNullable),
codes_dtype,
codes_rx.into_stream().boxed(),
)
.sendable();
Expand All @@ -416,13 +431,13 @@ impl Stream for DictionaryTransformer {
.boxed();

return Poll::Ready(Some((codes_stream, values_future)));
} else {
// Continue streaming codes to existing group
if let Some(tx) = &self.active_codes_tx {
let tx = tx.clone();
self.pending_send =
Some(Box::pin(async move { tx.send(Ok(codes)).await }));
}
}

// Continue streaming codes to existing group
if let Some(tx) = &self.active_codes_tx {
let tx = tx.clone();
self.pending_send =
Some(Box::pin(async move { tx.send(Ok((seq_id, codes))).await }));
}
}
Poll::Ready(Some(Ok(DictionaryChunk::Values(values)))) => {
Expand Down Expand Up @@ -514,3 +529,106 @@ fn encode_chunk(mut encoder: Box<dyn DictEncoder>, chunk: &dyn Array) -> Encodin
fn remainder(array: &dyn Array, encoded_len: usize) -> Option<ArrayRef> {
(encoded_len < array.len()).then(|| array.slice(encoded_len..array.len()))
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use vortex_array::IntoArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::builders::dict::DictConstraints;
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::PType;

use super::DictionaryTransformer;
use super::dict_encode_stream;
use crate::sequence::SequenceId;
use crate::sequence::SequentialStream;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;

/// Regression test for a bug where the codes stream dtype was hardcoded to U16 instead of
/// using the actual codes dtype from the array. When `max_len <= 255`, the dict encoder
/// produces U8 codes, but the stream was incorrectly typed as U16, causing a dtype mismatch
/// assertion failure in [`SequentialStreamAdapter`].
#[tokio::test]
async fn test_dict_transformer_uses_u8_for_small_dictionaries() {
// Use max_len = 100 to force U8 codes (since 100 <= 255).
let constraints = DictConstraints {
max_bytes: 1024 * 1024,
max_len: 100,
};

// Create a simple string array with a few unique values.
let arr = VarBinArray::from(vec!["hello", "world", "hello", "world"]).into_array();

// Wrap into a sequential stream.
let mut pointer = SequenceId::root();
let input_stream = SequentialStreamAdapter::new(
arr.dtype().clone(),
futures::stream::once(async move { Ok((pointer.advance(), arr)) }),
)
.sendable();

// Encode into dict chunks.
let dict_stream = dict_encode_stream(input_stream, constraints);

// Transform into codes/values streams.
let mut transformer = DictionaryTransformer::new(dict_stream);

// Get the first (and only) run.
let (codes_stream, _values_fut) = transformer
.next()
.await
.expect("expected at least one dictionary run");

// The key assertion: codes stream dtype should be U8, not U16.
assert_eq!(
codes_stream.dtype(),
&DType::Primitive(PType::U8, NonNullable),
"codes stream should use U8 dtype for small dictionaries, not U16"
);
}

/// Test that the codes stream uses U16 dtype when the dictionary has more than 255 entries.
#[tokio::test]
async fn test_dict_transformer_uses_u16_for_large_dictionaries() {
// Use max_len = 1000 to allow U16 codes (since 1000 > 255).
let constraints = DictConstraints {
max_bytes: 1024 * 1024,
max_len: 1000,
};

// Create an array with more than 255 distinct values to force U16 codes.
let values: Vec<String> = (0..300).map(|i| format!("value_{i}")).collect();
let arr =
VarBinArray::from(values.iter().map(|s| s.as_str()).collect::<Vec<_>>()).into_array();

// Wrap into a sequential stream.
let mut pointer = SequenceId::root();
let input_stream = SequentialStreamAdapter::new(
arr.dtype().clone(),
futures::stream::once(async move { Ok((pointer.advance(), arr)) }),
)
.sendable();

// Encode into dict chunks.
let dict_stream = dict_encode_stream(input_stream, constraints);

// Transform into codes/values streams.
let mut transformer = DictionaryTransformer::new(dict_stream);

// Get the first (and only) run.
let (codes_stream, _values_fut) = transformer
.next()
.await
.expect("expected at least one dictionary run");

// Codes stream dtype should be U16 since we have more than 255 distinct values.
assert_eq!(
codes_stream.dtype(),
&DType::Primitive(PType::U16, NonNullable),
"codes stream should use U16 dtype for dictionaries with >255 entries"
);
}
}
Loading