fix: list columns fail in v2.0 blocking decode path#6407
Open
frankliee wants to merge 1 commit intolance-format:mainfrom
Open
fix: list columns fail in v2.0 blocking decode path#6407frankliee wants to merge 1 commit intolance-format:mainfrom
frankliee wants to merge 1 commit intolance-format:mainfrom
Conversation
Summary
Reading List, LargeList, Binary, or LargeBinary columns via read_stream_projected_blocking (used by the Java JNI LanceFileReader) fails with:
Caused by: java.io.IOException: LanceError(Internal):
drain was called on primitive field decoder for data type Int32 on column 2
but the decoder was never awaited,
/app/rust/lance-encoding/src/previous/encodings/logical/primitive.rs:348:27
Full Rust-side error:
thread 'tokio-runtime-worker' panicked at
rust/lance-encoding/src/previous/encodings/logical/primitive.rs:348:27:
Internal error: drain was called on primitive field decoder for data type Int32
on column 4 but the decoder was never awaited
Stack:
lance_encoding::previous::encodings::logical::primitive::PrimitiveFieldDecoder::drain
lance_encoding::previous::encodings::logical::list::ListPageDecoder::drain
lance_encoding::previous::encodings::logical::struct::SimpleStructDecoder::drain
lance_encoding::decoder::BatchDecodeIterator::next_batch_task
lance_file::reader::FileReader::read_stream_projected_blocking
lance_jni::file_reader::BlockingFileReader::open_stream
Java_org_lance_file_LanceFileReader_readAllNative
Schema:
col_a: int64
col_b: list<int32>
This is caused by two independent bugs:
Bug 1: tokio::spawn in list scheduler is incompatible with the blocking decode path.
ListFieldSchedulingJob::schedule_next uses tokio::spawn to run indirect_schedule_task concurrently. This works in the async path where an active tokio runtime drives the spawned task.
However, in the blocking path (schedule_and_decode_blocking), the scheduling runs outside any tokio runtime context, so the spawned task is never executed, and the JoinHandle cannot be
awaited from a different runtime.
Bug 2: BatchDecodeIterator skips wait when all rows are already scheduled.
For list columns, all rows are marked as "scheduled" after the first schedule_ranges call (because the ListPageDecoder message is sent synchronously), even though the item data has not
been loaded yet. Starting from the second batch, scheduled_need == 0, so wait_for_io() is skipped entirely. The subsequent drain then encounters PrimitiveFieldDecoder instances whose
physical decoders were never awaited.
Changes
- list.rs: Replace tokio::spawn(indirect_schedule_task(...)) with an inline BoxFuture via .boxed(). This removes the dependency on a tokio runtime during scheduling and makes the
indirect scheduling work in both async and blocking contexts. The ListPageDecoder.unloaded field type changes from JoinHandle to BoxFuture, and the JoinError handling in
wait_for_loaded is simplified accordingly.
- decoder.rs: Add an else branch in BatchDecodeIterator::next_batch_task to call self.root_decoder.wait(loaded_need, ...) even when scheduled_need == 0. This ensures that list item
data is fully loaded before draining, regardless of the scheduling status.
- reader.rs: Add test_project_list_int32 regression test covering projected reads of list<int32> columns on both V2_0 and V2_1 file formats.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Reading List, LargeList, Binary, or LargeBinary columns via read_stream_projected_blocking (used by the Java JNI LanceFileReader) fails with:
Caused by: java.io.IOException: LanceError(Internal):
drain was called on primitive field decoder for data type Int32 on column 2
but the decoder was never awaited,
/app/rust/lance-encoding/src/previous/encodings/logical/primitive.rs:348:27
Full Rust-side error:
thread 'tokio-runtime-worker' panicked at
rust/lance-encoding/src/previous/encodings/logical/primitive.rs:348:27:
Internal error: drain was called on primitive field decoder for data type Int32
on column 4 but the decoder was never awaited
Stack:
Schema:
col_a: int64
col_b: list
This is caused by two independent bugs:
Bug 1: tokio::spawn in list scheduler is incompatible with the blocking decode path.
ListFieldSchedulingJob::schedule_next uses tokio::spawn to run indirect_schedule_task concurrently. This works in the async path where an active tokio runtime drives the spawned task.
However, in the blocking path (schedule_and_decode_blocking), the scheduling runs outside any tokio runtime context, so the spawned task is never executed, and the JoinHandle cannot be
awaited from a different runtime.
Bug 2: BatchDecodeIterator skips wait when all rows are already scheduled.
For list columns, all rows are marked as "scheduled" after the first schedule_ranges call (because the ListPageDecoder message is sent synchronously), even though the item data has not
been loaded yet. Starting from the second batch, scheduled_need == 0, so wait_for_io() is skipped entirely. The subsequent drain then encounters PrimitiveFieldDecoder instances whose
physical decoders were never awaited.
Changes