From 6aa5e454a59c49692879a5481c4624ac9389d5bc Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Tue, 16 Dec 2025 15:42:58 -0500 Subject: [PATCH] fix hardcoded dtype Signed-off-by: Connor Tsui --- vortex-layout/src/layouts/dict/writer.rs | 146 ++++++++++++++++++++--- 1 file changed, 132 insertions(+), 14 deletions(-) diff --git a/vortex-layout/src/layouts/dict/writer.rs b/vortex-layout/src/layouts/dict/writer.rs index 9b6a714c254..a09a9e636ce 100644 --- a/vortex-layout/src/layouts/dict/writer.rs +++ b/vortex-layout/src/layouts/dict/writer.rs @@ -27,7 +27,6 @@ use vortex_array::builders::dict::DictEncoder; use vortex_array::builders::dict::dict_encoder; use vortex_btrblocks::BtrBlocksCompressor; use vortex_dtype::DType; -use vortex_dtype::Nullability::NonNullable; use vortex_dtype::PType; use vortex_error::VortexError; use vortex_error::VortexResult; @@ -49,10 +48,22 @@ use crate::sequence::SequentialStream; use crate::sequence::SequentialStreamAdapter; use crate::sequence::SequentialStreamExt; +/// Constraints for dictionary layout encoding. +/// +/// Note that [`max_len`](Self::max_len) is limited to `u16` (65,535 entries) by design. Since +/// layout chunks are typically ~8k elements, having more than 64k unique values in a dictionary +/// means dictionary encoding provides little compression benefit. If a column has very high +/// cardinality, the fallback encoding strategy should be used instead. #[derive(Clone)] pub struct DictLayoutConstraints { + /// Maximum size of the dictionary in bytes. pub max_bytes: usize, - // Dict layout codes currently only support u16 codes + /// Maximum dictionary length. Limited to `u16` because dictionaries with more than 64k unique + /// values provide diminishing compression returns given typical chunk sizes (~8k elements). + /// + /// The codes dtype is chosen dynamically based on the actual dictionary size: + /// - [`PType::U8`] when the dictionary has at most 255 entries + /// - [`PType::U16`] when the dictionary has more than 255 entries pub max_len: u16, } @@ -387,7 +398,7 @@ impl Stream for DictionaryTransformer { } match self.input.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(DictionaryChunk::Codes(codes)))) => { + Poll::Ready(Some(Ok(DictionaryChunk::Codes((seq_id, codes))))) => { if self.active_codes_tx.is_none() { // Start a new group let (codes_tx, codes_rx) = kanal::bounded_async::(1); @@ -396,13 +407,17 @@ impl Stream for DictionaryTransformer { self.active_codes_tx = Some(codes_tx.clone()); self.active_values_tx = Some(values_tx); - // Send first codes + let codes_dtype = codes.dtype().clone(); + + // Send first codes. self.pending_send = - Some(Box::pin(async move { codes_tx.send(Ok(codes)).await })); + Some(Box::pin( + async move { codes_tx.send(Ok((seq_id, codes))).await }, + )); - // Create output streams + // Create output streams. let codes_stream = SequentialStreamAdapter::new( - DType::Primitive(PType::U16, NonNullable), + codes_dtype, codes_rx.into_stream().boxed(), ) .sendable(); @@ -416,13 +431,13 @@ impl Stream for DictionaryTransformer { .boxed(); return Poll::Ready(Some((codes_stream, values_future))); - } else { - // Continue streaming codes to existing group - if let Some(tx) = &self.active_codes_tx { - let tx = tx.clone(); - self.pending_send = - Some(Box::pin(async move { tx.send(Ok(codes)).await })); - } + } + + // Continue streaming codes to existing group + if let Some(tx) = &self.active_codes_tx { + let tx = tx.clone(); + self.pending_send = + Some(Box::pin(async move { tx.send(Ok((seq_id, codes))).await })); } } Poll::Ready(Some(Ok(DictionaryChunk::Values(values)))) => { @@ -514,3 +529,106 @@ fn encode_chunk(mut encoder: Box, chunk: &dyn Array) -> Encodin fn remainder(array: &dyn Array, encoded_len: usize) -> Option { (encoded_len < array.len()).then(|| array.slice(encoded_len..array.len())) } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use vortex_array::IntoArray; + use vortex_array::arrays::VarBinArray; + use vortex_array::builders::dict::DictConstraints; + use vortex_dtype::DType; + use vortex_dtype::Nullability::NonNullable; + use vortex_dtype::PType; + + use super::DictionaryTransformer; + use super::dict_encode_stream; + use crate::sequence::SequenceId; + use crate::sequence::SequentialStream; + use crate::sequence::SequentialStreamAdapter; + use crate::sequence::SequentialStreamExt; + + /// Regression test for a bug where the codes stream dtype was hardcoded to U16 instead of + /// using the actual codes dtype from the array. When `max_len <= 255`, the dict encoder + /// produces U8 codes, but the stream was incorrectly typed as U16, causing a dtype mismatch + /// assertion failure in [`SequentialStreamAdapter`]. + #[tokio::test] + async fn test_dict_transformer_uses_u8_for_small_dictionaries() { + // Use max_len = 100 to force U8 codes (since 100 <= 255). + let constraints = DictConstraints { + max_bytes: 1024 * 1024, + max_len: 100, + }; + + // Create a simple string array with a few unique values. + let arr = VarBinArray::from(vec!["hello", "world", "hello", "world"]).into_array(); + + // Wrap into a sequential stream. + let mut pointer = SequenceId::root(); + let input_stream = SequentialStreamAdapter::new( + arr.dtype().clone(), + futures::stream::once(async move { Ok((pointer.advance(), arr)) }), + ) + .sendable(); + + // Encode into dict chunks. + let dict_stream = dict_encode_stream(input_stream, constraints); + + // Transform into codes/values streams. + let mut transformer = DictionaryTransformer::new(dict_stream); + + // Get the first (and only) run. + let (codes_stream, _values_fut) = transformer + .next() + .await + .expect("expected at least one dictionary run"); + + // The key assertion: codes stream dtype should be U8, not U16. + assert_eq!( + codes_stream.dtype(), + &DType::Primitive(PType::U8, NonNullable), + "codes stream should use U8 dtype for small dictionaries, not U16" + ); + } + + /// Test that the codes stream uses U16 dtype when the dictionary has more than 255 entries. + #[tokio::test] + async fn test_dict_transformer_uses_u16_for_large_dictionaries() { + // Use max_len = 1000 to allow U16 codes (since 1000 > 255). + let constraints = DictConstraints { + max_bytes: 1024 * 1024, + max_len: 1000, + }; + + // Create an array with more than 255 distinct values to force U16 codes. + let values: Vec = (0..300).map(|i| format!("value_{i}")).collect(); + let arr = + VarBinArray::from(values.iter().map(|s| s.as_str()).collect::>()).into_array(); + + // Wrap into a sequential stream. + let mut pointer = SequenceId::root(); + let input_stream = SequentialStreamAdapter::new( + arr.dtype().clone(), + futures::stream::once(async move { Ok((pointer.advance(), arr)) }), + ) + .sendable(); + + // Encode into dict chunks. + let dict_stream = dict_encode_stream(input_stream, constraints); + + // Transform into codes/values streams. + let mut transformer = DictionaryTransformer::new(dict_stream); + + // Get the first (and only) run. + let (codes_stream, _values_fut) = transformer + .next() + .await + .expect("expected at least one dictionary run"); + + // Codes stream dtype should be U16 since we have more than 255 distinct values. + assert_eq!( + codes_stream.dtype(), + &DType::Primitive(PType::U16, NonNullable), + "codes stream should use U16 dtype for dictionaries with >255 entries" + ); + } +}