Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions java/lance-jni/src/blocking_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,41 @@ fn inner_blob_read_up_to<'local>(
Ok(arr)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_BlobFile_nativeReadRange<'local>(
mut env: JNIEnv<'local>,
jblob: JObject,
offset: jlong,
len: jint,
) -> jbyteArray {
ok_or_throw_with_return!(
env,
inner_blob_read_range(&mut env, jblob, offset, len).map(|arr| arr.into_raw()),
JByteArray::default().into_raw()
)
}

fn inner_blob_read_range<'local>(
env: &mut JNIEnv<'local>,
jblob: JObject,
offset: jlong,
len: jint,
) -> Result<JByteArray<'local>> {
let end = (offset as u64)
.checked_add(len as u64)
.ok_or_else(|| lance_core::Error::invalid_input("offset + len overflowed".to_string()))?;
let bytes = {
let blob = unsafe { env.get_rust_field::<_, _, BlockingBlobFile>(jblob, NATIVE_BLOB) }?;
RT.block_on(blob.inner.read_range(offset as u64..end))?
};
let arr = env.new_byte_array(bytes.len() as jint)?;
let u8_slice: &[u8] = bytes.as_ref();
let i8_slice: &[i8] = unsafe { transmute(u8_slice) };

env.set_byte_array_region(&arr, 0, i8_slice)?;
Ok(arr)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_BlobFile_nativeSeek(
mut env: JNIEnv,
Expand Down
7 changes: 7 additions & 0 deletions java/src/main/java/org/lance/BlobFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public byte[] readUpTo(int len) throws IOException {
return nativeReadUpTo(len);
}

/** Read a blob-local range without changing the current cursor. */
public byte[] readRange(long offset, int len) throws IOException {
if (offset < 0) throw new IllegalArgumentException("offset must be non-negative");
if (len < 0) throw new IllegalArgumentException("len must be non-negative");
return nativeReadRange(offset, len);
}

/** Seek to a new cursor position. */
public void seek(long newCursor) throws IOException {
if (newCursor < 0) throw new IllegalArgumentException("newCursor must be non-negative");
Expand Down
4 changes: 4 additions & 0 deletions python/python/lance/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ def size(self) -> int:
def readall(self) -> bytes:
return self.inner.readall()

def read_range(self, offset: int, length: int) -> bytes:
"""Read a blob-local byte range without changing the current cursor."""
return self.inner.read_range(offset, length)

def readinto(self, b: bytearray) -> int:
return self.inner.read_into(b)

Expand Down
95 changes: 84 additions & 11 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,23 @@ def _is_null_blob_description(description: Any) -> bool:
return False


def _resolve_blob_selection(
ids: Optional[Union[List[int], pa.Array]],
addresses: Optional[Union[List[int], pa.Array]],
indices: Optional[Union[List[int], pa.Array]],
) -> Tuple[str, Union[List[int], pa.Array]]:
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
raise ValueError("Exactly one of ids, indices, or addresses must be specified")

if ids is not None:
return "ids", ids
if addresses is not None:
return "addresses", addresses
if indices is not None:
return "indices", indices
raise ValueError("Either ids, addresses, or indices must be specified")


class MergeInsertBuilder(_MergeInsertBuilder):
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
"""Executes the merge insert operation
Expand Down Expand Up @@ -1910,21 +1927,77 @@ def take_blobs(
-------
blob_files : List[BlobFile]
"""
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
raise ValueError(
"Exactly one of ids, indices, or addresses must be specified"
)
selection_kind, selection_values = _resolve_blob_selection(
ids, addresses, indices
)

if ids is not None:
lance_blob_files = self._ds.take_blobs(ids, blob_column)
elif addresses is not None:
lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column)
elif indices is not None:
lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column)
if selection_kind == "ids":
lance_blob_files = self._ds.take_blobs(selection_values, blob_column)
elif selection_kind == "addresses":
lance_blob_files = self._ds.take_blobs_by_addresses(
selection_values, blob_column
)
else:
raise ValueError("Either ids, addresses, or indices must be specified")
lance_blob_files = self._ds.take_blobs_by_indices(
selection_values, blob_column
)
return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files]

def read_blobs(
self,
blob_column: str,
ids: Optional[Union[List[int], pa.Array]] = None,
addresses: Optional[Union[List[int], pa.Array]] = None,
indices: Optional[Union[List[int], pa.Array]] = None,
*,
io_buffer_size: Optional[int] = None,
preserve_order: Optional[bool] = None,
) -> List[Tuple[int, bytes]]:
"""
Read blobs directly into memory using Lance's planned blob reader.

Unlike :py:meth:`take_blobs`, which returns file-like :py:class:`lance.BlobFile`
handles for random access, this API plans and executes batched reads and
returns materialized blob payloads.

Exactly one of ids, addresses, or indices must be specified.

Parameters
----------
blob_column : str
The name of the blob column to read.
ids : Integer Array or array-like
Row IDs to read in the dataset.
addresses : Integer Array or array-like
The (unstable) row addresses to read in the dataset.
indices : Integer Array or array-like
The offset / indices of the row in the dataset.
io_buffer_size : int, optional
Override the scheduler I/O buffer size used while materializing blobs.
preserve_order : bool, optional
If True, returned rows follow the requested selection order.

Returns
-------
blobs : List[Tuple[int, bytes]]
A list of ``(row_address, blob_bytes)`` pairs.
"""
selection_kind, selection_values = _resolve_blob_selection(
ids, addresses, indices
)

kwargs = {
"io_buffer_size": io_buffer_size,
"preserve_order": preserve_order,
}
if selection_kind == "ids":
return self._ds.read_blobs(selection_values, blob_column, **kwargs)
if selection_kind == "addresses":
return self._ds.read_blobs_by_addresses(
selection_values, blob_column, **kwargs
)
return self._ds.read_blobs_by_indices(selection_values, blob_column, **kwargs)

def head(self, num_rows, **kwargs):
"""
Load the first N rows of the dataset.
Expand Down
21 changes: 21 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,27 @@ class _Dataset:
row_indices: List[int],
blob_column: str,
) -> List[LanceBlobFile]: ...
def read_blobs(
self,
row_ids: List[int],
blob_column: str,
io_buffer_size: Optional[int] = None,
preserve_order: Optional[bool] = None,
) -> List[Tuple[int, bytes]]: ...
def read_blobs_by_addresses(
self,
row_addresses: List[int],
blob_column: str,
io_buffer_size: Optional[int] = None,
preserve_order: Optional[bool] = None,
) -> List[Tuple[int, bytes]]: ...
def read_blobs_by_indices(
self,
row_indices: List[int],
blob_column: str,
io_buffer_size: Optional[int] = None,
preserve_order: Optional[bool] = None,
) -> List[Tuple[int, bytes]]: ...
def take_scan(
self,
row_slices: Iterable[Tuple[int, int]],
Expand Down
162 changes: 162 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@
lance_dataset_module = importlib.import_module("lance.dataset")


def _blob_row_ids(dataset):
return dataset.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist()


def _blob_row_addresses(dataset):
return (
dataset.to_table(columns=["idx"], with_row_address=True)
.column("_rowaddr")
.to_pylist()
)


def _out_of_order_blob_selection(dataset_with_blobs, selection_kind):
addresses = _blob_row_addresses(dataset_with_blobs)
expected = [(addresses[4], b"quux"), (addresses[0], b"foo")]

if selection_kind == "ids":
return [
_blob_row_ids(dataset_with_blobs)[4],
_blob_row_ids(dataset_with_blobs)[0],
], expected
if selection_kind == "addresses":
return [addresses[4], addresses[0]], expected
return [4, 0], expected


def test_blob_read_from_binary():
values = [b"foo", b"bar", b"baz"]
data = pa.table(
Expand Down Expand Up @@ -251,6 +277,134 @@ def test_blob_by_indices(tmp_path, dataset_with_blobs):
assert f1.read() == f2.read()


@pytest.mark.parametrize(
("selection_kind", "selection_values", "expected"),
[
("ids", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
("addresses", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
("indices", [0, 4], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
],
)
def test_read_blobs(dataset_with_blobs, selection_kind, selection_values, expected):
kwargs = {selection_kind: selection_values}

blobs = dataset_with_blobs.read_blobs(
"blobs",
**kwargs,
io_buffer_size=1024,
preserve_order=True,
)

assert blobs == expected


def test_read_blobs_requires_single_selector(dataset_with_blobs):
with pytest.raises(
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
):
dataset_with_blobs.read_blobs("blobs", ids=[0], indices=[0])


def test_read_blobs_requires_selector(dataset_with_blobs):
with pytest.raises(
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
):
dataset_with_blobs.read_blobs("blobs")


def test_read_blobs_rejects_non_blob_column(dataset_with_blobs):
with pytest.raises(ValueError, match="not a blob column"):
dataset_with_blobs.read_blobs("idx", indices=[0])


@pytest.mark.parametrize(
("selection_kind", "selection_values", "expected"),
[
(
"ids",
pa.array([0, (1 << 32) + 1], type=pa.uint64()),
[(0, b"foo"), ((1 << 32) + 1, b"quux")],
),
(
"addresses",
pa.array([0, (1 << 32) + 1], type=pa.uint64()),
[(0, b"foo"), ((1 << 32) + 1, b"quux")],
),
(
"indices",
pa.array([0, 4], type=pa.uint64()),
[(0, b"foo"), ((1 << 32) + 1, b"quux")],
),
],
)
def test_read_blobs_accepts_arrow_array_selectors(
dataset_with_blobs, selection_kind, selection_values, expected
):
kwargs = {selection_kind: selection_values}

blobs = dataset_with_blobs.read_blobs("blobs", **kwargs)

assert blobs == expected


@pytest.mark.parametrize(
("selection_kind", "selection_values"),
[
("ids", []),
("addresses", []),
("indices", []),
("ids", pa.array([], type=pa.uint64())),
("addresses", pa.array([], type=pa.uint64())),
("indices", pa.array([], type=pa.uint64())),
],
)
def test_read_blobs_accepts_empty_selection(
dataset_with_blobs, selection_kind, selection_values
):
kwargs = {selection_kind: selection_values}

assert dataset_with_blobs.read_blobs("blobs", **kwargs) == []


@pytest.mark.parametrize(
("planner_kwargs", "error_message"),
[
({"io_buffer_size": 0}, "io_buffer_size must be greater than 0"),
],
)
def test_read_blobs_rejects_invalid_planner_options(
dataset_with_blobs, planner_kwargs, error_message
):
with pytest.raises(ValueError, match=error_message):
dataset_with_blobs.read_blobs("blobs", indices=[0], **planner_kwargs)


@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
def test_read_blobs_preserves_input_order(dataset_with_blobs, selection_kind):
selection_values, expected = _out_of_order_blob_selection(
dataset_with_blobs, selection_kind
)
kwargs = {selection_kind: selection_values}

blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=True)

assert blobs == expected


@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
def test_read_blobs_without_preserve_order_returns_same_rows(
dataset_with_blobs, selection_kind
):
selection_values, expected = _out_of_order_blob_selection(
dataset_with_blobs, selection_kind
)
kwargs = {selection_kind: selection_values}

blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=False)

assert sorted(blobs) == sorted(expected)


def test_blob_file_seek(tmp_path, dataset_with_blobs):
row_ids = (
dataset_with_blobs.to_table(columns=[], with_row_id=True)
Expand Down Expand Up @@ -466,6 +620,12 @@ def test_blob_extension_write_external_slice(tmp_path):
with blob_file as f:
assert f.read() == expected

assert ds.read_blobs("blob", indices=[0, 1, 2]) == [
(0, b"alpha"),
(1, b"bravo"),
(2, b"charlie"),
]


def test_blob_extension_write_external_slice_ingest(tmp_path):
tar_path = tmp_path / "container.tar"
Expand Down Expand Up @@ -548,6 +708,8 @@ def test_blob_extension_take_blobs_multi_base(payload, is_dataset_root, tmp_path
with blobs[0] as f:
assert f.read() == payload

assert ds.read_blobs("blob", indices=[0]) == [(0, payload)]


@pytest.fixture
def dataset_for_pandas_blob_tests(tmp_path):
Expand Down
Loading
Loading