From f2be9cc3580c4cf1f987bca7ae0f41d6ed9cdaf5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 02:01:12 +0800 Subject: [PATCH 1/8] python: add explicit cuvs accelerator path --- python/python/lance/cuvs.py | 238 +++++++++++++++++++++++ python/python/lance/dataset.py | 142 +++++++++----- python/python/lance/indices/builder.py | 25 +++ python/python/tests/test_vector_index.py | 129 ++++++++++++ 4 files changed, 480 insertions(+), 54 deletions(-) create mode 100644 python/python/lance/cuvs.py diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py new file mode 100644 index 0000000000..ab46eb8a43 --- /dev/null +++ b/python/python/lance/cuvs.py @@ -0,0 +1,238 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +from __future__ import annotations + +from importlib import import_module +from typing import Tuple + +import pyarrow as pa + +from .dependencies import numpy as np + + +def is_cuvs_accelerator(accelerator: object) -> bool: + return accelerator == "cuvs" + + +def _require_cuvs(): + try: + return import_module("cuvs.neighbors.ivf_pq") + except ModuleNotFoundError as exc: + raise ModuleNotFoundError( + "accelerator='cuvs' requires the 'cuvs' package to be installed" + ) from exc + + +def _optional_cupy(): + try: + return import_module("cupy") + except ModuleNotFoundError: + return None + + +def _metric_to_cuvs(metric_type: str) -> str: + metric_type = metric_type.lower() + if metric_type in {"l2", "euclidean"}: + return "sqeuclidean" + if metric_type == "dot": + return "inner_product" + if metric_type == "cosine": + return "cosine" + raise ValueError(f"Metric '{metric_type}' is not supported by cuVS IVF_PQ") + + +def _column_to_numpy(table: pa.Table, column: str) -> np.ndarray: + array = table.column(column).combine_chunks() + values = array.to_pylist() + if len(values) == 0: + raise ValueError("cuVS training requires at least one training vector") + matrix = np.asarray(values) + if matrix.ndim != 2: + raise ValueError( + f"Expected a 2D training matrix for column '{column}', got {matrix.shape}" + ) + if matrix.dtype == np.float64: + matrix = matrix.astype(np.float32) + elif matrix.dtype not in (np.float16, np.float32): + matrix = matrix.astype(np.float32) + return matrix + + +def _as_numpy(array_like) -> np.ndarray: + if isinstance(array_like, np.ndarray): + return array_like + try: + array = np.asarray(array_like) + if isinstance(array, np.ndarray): + return array + except Exception: + pass + + if hasattr(array_like, "get"): + return np.asarray(array_like.get()) + + cupy = _optional_cupy() + if cupy is not None: + return cupy.asnumpy(array_like) + + raise TypeError("Unable to convert cuVS output to numpy") + + +def _normalize_centroids(index, num_partitions: int, dimension: int) -> np.ndarray: + centroids = _as_numpy(index.centers) + if centroids.shape != (num_partitions, dimension): + raise ValueError( + "cuVS returned incompatible IVF centroids shape: " + f"expected {(num_partitions, dimension)}, got {centroids.shape}" + ) + return centroids + + +def _normalize_pq_codebook( + index, num_sub_vectors: int, num_bits: int, dimension: int +) -> np.ndarray: + pq_book_size = 1 << num_bits + subvector_dim = dimension // num_sub_vectors + pq_centers = _as_numpy(index.pq_centers) + + expected_shapes = { + (num_sub_vectors, subvector_dim, pq_book_size): (0, 2, 1), + (num_sub_vectors, pq_book_size, subvector_dim): None, + } + transpose = expected_shapes.get(pq_centers.shape) + if transpose is None and pq_centers.shape not in expected_shapes: + raise ValueError( + "cuVS returned incompatible PQ codebook shape: expected one of " + f"{list(expected_shapes.keys())}, got {pq_centers.shape}" + ) + if transpose is not None: + pq_centers = np.transpose(pq_centers, transpose) + return pq_centers + + +def _estimate_trainset_fraction( + num_rows: int, num_partitions: int, sample_rate: int +) -> float: + if num_rows <= 0: + raise ValueError("cuVS training requires a non-empty dataset") + desired_rows = max(num_partitions * sample_rate, 256 * 256) + return min(1.0, desired_rows / num_rows) + + +def train_ivf_pq_on_cuvs( + dataset, + column: str, + num_partitions: int, + metric_type: str, + accelerator: str, + num_sub_vectors: int, + *, + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, + filter_nan: bool = True, +) -> Tuple[np.ndarray, np.ndarray]: + if accelerator != "cuvs": + raise ValueError("cuVS acceleration only supports accelerator='cuvs'") + if num_bits != 8: + raise ValueError("cuVS IVF_PQ integration currently supports only num_bits=8") + + dimension = dataset.schema.field(column).type.list_size + if dimension % num_sub_vectors != 0: + raise ValueError( + "cuVS IVF_PQ integration requires vector dimension to be divisible by " + "num_sub_vectors" + ) + + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + num_rows = dataset.count_rows(filter=filt) + if num_rows == 0: + raise ValueError("cuVS training requires at least one non-null training vector") + + train_rows = max(1, min(num_rows, max(num_partitions * sample_rate, 256 * 256))) + trainset = dataset.sample( + train_rows, + columns=[column], + filter=filt, + randomize_order=True, + ) + matrix = _column_to_numpy(trainset, column) + + ivf_pq = _require_cuvs() + build_params = ivf_pq.IndexParams( + n_lists=num_partitions, + metric=_metric_to_cuvs(metric_type), + kmeans_n_iters=max_iters, + kmeans_trainset_fraction=_estimate_trainset_fraction( + matrix.shape[0], num_partitions, sample_rate + ), + pq_bits=num_bits, + pq_dim=num_sub_vectors, + codebook_kind="subspace", + force_random_rotation=False, + add_data_on_build=False, + ) + + index = ivf_pq.build(build_params, matrix) + + centroids = _normalize_centroids(index, num_partitions, dimension) + pq_codebook = _normalize_pq_codebook(index, num_sub_vectors, num_bits, dimension) + return centroids, pq_codebook + + +def one_pass_train_ivf_pq_on_cuvs( + dataset, + column: str, + num_partitions: int, + metric_type: str, + accelerator: str, + num_sub_vectors: int, + *, + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, + filter_nan: bool = True, +): + return train_ivf_pq_on_cuvs( + dataset, + column, + num_partitions, + metric_type, + accelerator, + num_sub_vectors, + sample_rate=sample_rate, + max_iters=max_iters, + num_bits=num_bits, + filter_nan=filter_nan, + ) + + +def prepare_global_ivf_pq_on_cuvs( + dataset, + column: str, + num_partitions: int, + num_sub_vectors: int, + *, + distance_type: str = "l2", + accelerator: str = "cuvs", + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, +): + centroids, pq_codebook = train_ivf_pq_on_cuvs( + dataset, + column, + num_partitions, + distance_type, + accelerator, + num_sub_vectors, + sample_rate=sample_rate, + max_iters=max_iters, + num_bits=num_bits, + ) + return {"ivf_centroids": centroids, "pq_codebook": pq_codebook} diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7496746285..a5e1681b25 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -39,6 +39,7 @@ from lance.log import LOGGER from .blob import BlobFile +from .cuvs import is_cuvs_accelerator from .dependencies import ( _check_for_numpy, _check_for_torch, @@ -2899,20 +2900,24 @@ def _create_index_impl( # Handle timing for various parts of accelerated builds timers = {} + use_cuvs = is_cuvs_accelerator(accelerator) if accelerator is not None and index_type != "IVF_PQ": + if use_cuvs: + raise ValueError( + f"accelerator='{accelerator}' only supports IVF_PQ index builds" + ) LOGGER.warning( "Index type %s does not support GPU acceleration; falling back to CPU", index_type, ) accelerator = None + use_cuvs = False # IMPORTANT: Distributed indexing is CPU-only. Enforce single-node when - # accelerator or torch-related paths are detected. - torch_detected = False + # any Python-side accelerator path is selected. + accelerated_build_detected = accelerator is not None try: - if accelerator is not None: - torch_detected = True - else: + if accelerator is None: impl = kwargs.get("implementation") use_torch_flag = kwargs.get("use_torch") is True one_pass_flag = kwargs.get("one_pass_ivfpq") is True @@ -2925,16 +2930,16 @@ def _create_index_impl( or torch_centroids or torch_codebook ): - torch_detected = True + accelerated_build_detected = True except Exception: # Be conservative: if detection fails, do not modify behavior pass - if torch_detected: + if accelerated_build_detected: if require_commit: if fragment_ids is not None or index_uuid is not None: LOGGER.info( - "Torch detected; " + "Accelerated build detected; " "enforce single-node indexing (distributed is CPU-only)." ) fragment_ids = None @@ -2942,63 +2947,92 @@ def _create_index_impl( else: if index_uuid is not None: LOGGER.info( - "Torch detected; " + "Accelerated build detected; " "enforce single-node indexing (distributed is CPU-only)." ) index_uuid = None if accelerator is not None: - from .vector import ( - one_pass_assign_ivf_pq_on_accelerator, - one_pass_train_ivf_pq_on_accelerator, - ) - - LOGGER.info("Doing one-pass ivfpq accelerated computations") if num_partitions is None: num_rows = self.count_rows() num_partitions = _target_partition_size_to_num_partitions( num_rows, target_partition_size ) - timers["ivf+pq_train:start"] = time.time() - ( - ivf_centroids, - ivf_kmeans, - pq_codebook, - pq_kmeans_list, - ) = one_pass_train_ivf_pq_on_accelerator( - self, - column[0], - num_partitions, - metric, - accelerator, - num_sub_vectors=num_sub_vectors, - batch_size=20480, - filter_nan=filter_nan, - ) - timers["ivf+pq_train:end"] = time.time() - ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] - LOGGER.info("ivf+pq training time: %ss", ivfpq_train_time) - timers["ivf+pq_assign:start"] = time.time() - shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator( - self, - column[0], - metric, - accelerator, - ivf_kmeans, - pq_kmeans_list, - batch_size=20480, - filter_nan=filter_nan, - ) - timers["ivf+pq_assign:end"] = time.time() - ivfpq_assign_time = ( - timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] - ) - LOGGER.info("ivf+pq transform time: %ss", ivfpq_assign_time) - kwargs["precomputed_shuffle_buffers"] = shuffle_buffers - kwargs["precomputed_shuffle_buffers_path"] = os.path.join( - shuffle_output_dir, "data" - ) + if use_cuvs: + from .cuvs import one_pass_train_ivf_pq_on_cuvs + + LOGGER.info("Doing one-pass ivfpq cuVS training") + timers["ivf+pq_train:start"] = time.time() + ivf_centroids, pq_codebook = one_pass_train_ivf_pq_on_cuvs( + self, + column[0], + num_partitions, + metric, + accelerator, + num_sub_vectors=num_sub_vectors, + sample_rate=kwargs.get("sample_rate", 256), + max_iters=kwargs.get("max_iters", 50), + num_bits=kwargs.get("num_bits", 8), + filter_nan=filter_nan, + ) + timers["ivf+pq_train:end"] = time.time() + ivfpq_train_time = ( + timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] + ) + LOGGER.info("cuVS ivf+pq training time: %ss", ivfpq_train_time) + else: + from .vector import ( + one_pass_assign_ivf_pq_on_accelerator, + one_pass_train_ivf_pq_on_accelerator, + ) + + LOGGER.info("Doing one-pass ivfpq accelerated computations") + timers["ivf+pq_train:start"] = time.time() + ( + ivf_centroids, + ivf_kmeans, + pq_codebook, + pq_kmeans_list, + ) = one_pass_train_ivf_pq_on_accelerator( + self, + column[0], + num_partitions, + metric, + accelerator, + num_sub_vectors=num_sub_vectors, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_train:end"] = time.time() + ivfpq_train_time = ( + timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] + ) + LOGGER.info("ivf+pq training time: %ss", ivfpq_train_time) + timers["ivf+pq_assign:start"] = time.time() + ( + shuffle_output_dir, + shuffle_buffers, + ) = one_pass_assign_ivf_pq_on_accelerator( + self, + column[0], + metric, + accelerator, + ivf_kmeans, + pq_kmeans_list, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_assign:end"] = time.time() + ivfpq_assign_time = ( + timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] + ) + LOGGER.info("ivf+pq transform time: %ss", ivfpq_assign_time) + + kwargs["precomputed_shuffle_buffers"] = shuffle_buffers + kwargs["precomputed_shuffle_buffers_path"] = os.path.join( + shuffle_output_dir, "data" + ) if index_type.startswith("IVF"): if (ivf_centroids is not None) and (ivf_centroids_file is not None): raise ValueError( diff --git a/python/python/lance/indices/builder.py b/python/python/lance/indices/builder.py index c31ea0a7a0..00591ead93 100644 --- a/python/python/lance/indices/builder.py +++ b/python/python/lance/indices/builder.py @@ -9,6 +9,7 @@ import numpy as np import pyarrow as pa +from lance.cuvs import is_cuvs_accelerator, prepare_global_ivf_pq_on_cuvs from lance.indices.ivf import IvfModel from lance.indices.pq import PqModel @@ -115,6 +116,11 @@ def train_ivf( self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows) distance_type = self._normalize_distance_type(distance_type) self._verify_ivf_params(num_partitions) + if is_cuvs_accelerator(accelerator): + raise NotImplementedError( + "IndicesBuilder.train_ivf does not support accelerator='cuvs'; " + "use prepare_global_ivf_pq instead" + ) if accelerator is None: from lance.lance import indices @@ -250,6 +256,25 @@ def prepare_global_ivf_pq( `IndicesBuilder.train_pq` (indices.train_pq_model). No public method names elsewhere are changed. """ + if is_cuvs_accelerator(accelerator): + if fragment_ids is not None: + raise NotImplementedError( + "fragment_ids is not supported with accelerator='cuvs'" + ) + num_rows = self._count_rows() + num_partitions = self._determine_num_partitions(num_partitions, num_rows) + num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension) + return prepare_global_ivf_pq_on_cuvs( + self.dataset, + self.column[0], + num_partitions, + num_subvectors, + distance_type=distance_type, + accelerator=accelerator, + sample_rate=sample_rate, + max_iters=max_iters, + ) + # Global IVF training ivf_model = self.train_ivf( num_partitions, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index b20ffc8cf7..54c6003c27 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -9,10 +9,12 @@ import string import tempfile import time +from importlib import import_module from pathlib import Path from typing import Optional import lance +import lance.cuvs as lance_cuvs import numpy as np import pyarrow as pa import pyarrow.compute as pc @@ -505,6 +507,15 @@ def test_create_index_unsupported_accelerator(tmp_path): accelerator="cuda:abc", ) + with pytest.raises(ValueError): + dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=4, + num_sub_vectors=16, + accelerator="cuvs:0", + ) + def test_create_index_accelerator_fallback(tmp_path, caplog): tbl = create_table() @@ -526,6 +537,124 @@ def test_create_index_accelerator_fallback(tmp_path, caplog): ) +def test_create_index_cuvs_dispatch(tmp_path, monkeypatch): + tbl = create_table(nvec=512, ndim=128) + dataset = lance.write_dataset(tbl, tmp_path) + calls = {} + + def fake_train( + dataset_arg, + column, + num_partitions, + metric_type, + accelerator, + num_sub_vectors, + *, + sample_rate, + max_iters, + num_bits, + filter_nan, + ): + calls["dataset"] = dataset_arg + calls["column"] = column + calls["num_partitions"] = num_partitions + calls["metric_type"] = metric_type + calls["accelerator"] = accelerator + calls["num_sub_vectors"] = num_sub_vectors + calls["sample_rate"] = sample_rate + calls["max_iters"] = max_iters + calls["num_bits"] = num_bits + calls["filter_nan"] = filter_nan + return ( + np.random.randn(num_partitions, 128).astype(np.float32), + np.random.randn(num_sub_vectors, 256, 128 // num_sub_vectors).astype( + np.float32 + ), + ) + + monkeypatch.setattr(lance_cuvs, "one_pass_train_ivf_pq_on_cuvs", fake_train) + + dataset = dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=4, + num_sub_vectors=16, + accelerator="cuvs", + ) + + assert calls["column"] == "vector" + assert calls["num_partitions"] == 4 + assert calls["metric_type"] == "L2" + assert calls["accelerator"] == "cuvs" + assert calls["num_sub_vectors"] == 16 + assert dataset.stats.index_stats("vector_idx")["index_type"] == "IVF_PQ" + + +def test_create_index_cuvs_rejects_non_ivf_pq(tmp_path): + tbl = create_table() + dataset = lance.write_dataset(tbl, tmp_path) + + with pytest.raises(ValueError, match="only supports IVF_PQ"): + dataset.create_index( + "vector", + index_type="IVF_FLAT", + num_partitions=4, + accelerator="cuvs", + ) + + +def test_prepare_global_ivf_pq_cuvs_dispatch(tmp_path, monkeypatch): + ds = _make_sample_dataset_base(tmp_path, "cuvs_prepare_ds", 512, 128) + builder = IndicesBuilder(ds, "vector") + builder_module = import_module("lance.indices.builder") + calls = {} + + def fake_prepare( + dataset_arg, + column, + num_partitions, + num_sub_vectors, + *, + distance_type, + accelerator, + sample_rate, + max_iters, + ): + calls["dataset"] = dataset_arg + calls["column"] = column + calls["num_partitions"] = num_partitions + calls["num_sub_vectors"] = num_sub_vectors + calls["distance_type"] = distance_type + calls["accelerator"] = accelerator + calls["sample_rate"] = sample_rate + calls["max_iters"] = max_iters + return { + "ivf_centroids": np.random.randn(num_partitions, 128).astype(np.float32), + "pq_codebook": np.random.randn( + num_sub_vectors, 256, 128 // num_sub_vectors + ).astype(np.float32), + } + + monkeypatch.setattr(builder_module, "prepare_global_ivf_pq_on_cuvs", fake_prepare) + + prepared = builder.prepare_global_ivf_pq( + num_partitions=4, + num_subvectors=16, + distance_type="l2", + accelerator="cuvs", + sample_rate=7, + max_iters=20, + ) + + assert calls["column"] == "vector" + assert calls["num_partitions"] == 4 + assert calls["num_sub_vectors"] == 16 + assert calls["distance_type"] == "l2" + assert calls["accelerator"] == "cuvs" + assert prepared["ivf_centroids"].shape == (4, 128) + assert prepared["pq_codebook"].shape == (16, 256, 8) + + def test_use_index(dataset, tmp_path): ann_ds = lance.write_dataset(dataset.to_table(), tmp_path / "indexed.lance") ann_ds = ann_ds.create_index( From 2f071f6a28fa399df7e19cd551adcb5dc663f5f3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 02:05:54 +0800 Subject: [PATCH 2/8] python: document cuvs installation requirements --- python/DEVELOPMENT.md | 16 ++++++++++++++++ python/python/lance/cuvs.py | 4 +++- python/python/lance/dataset.py | 20 ++++++++++++++------ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/python/DEVELOPMENT.md b/python/DEVELOPMENT.md index 12c5654960..21dba0bddd 100644 --- a/python/DEVELOPMENT.md +++ b/python/DEVELOPMENT.md @@ -8,6 +8,22 @@ uv sync --extra tests --extra dev Add extras such as `benchmarks`, `torch`, or `geo` only when you need them. After the environment is initialized, either activate it or use `uv run ...` for commands. +`accelerator="cuvs"` does not have a normal project extra today. cuVS Python +packages are published per CUDA major version and are typically installed from +NVIDIA's package index, for example: + +```shell +uv pip install --extra-index-url https://pypi.nvidia.com cuvs-cu12 +``` + +or: + +```shell +uv pip install --extra-index-url https://pypi.nvidia.com cuvs-cu13 +``` + +Pick the package that matches the CUDA version in your environment. + `uv sync` is not just downloading Python packages here. It also builds the local `pylance` Rust extension as part of the editable environment, so the first run, cache misses, or Rust dependency changes can make it noticeably slow. This is expected; let the build finish instead of interrupting it and switching to a different environment setup. ## Building the project diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py index ab46eb8a43..6c0a4085c5 100644 --- a/python/python/lance/cuvs.py +++ b/python/python/lance/cuvs.py @@ -20,7 +20,9 @@ def _require_cuvs(): return import_module("cuvs.neighbors.ivf_pq") except ModuleNotFoundError as exc: raise ModuleNotFoundError( - "accelerator='cuvs' requires the 'cuvs' package to be installed" + "accelerator='cuvs' requires cuVS Python bindings to be installed. " + "Install a CUDA-matched package such as 'cuvs-cu12' or 'cuvs-cu13' " + "from https://pypi.nvidia.com." ) from exc diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index a5e1681b25..cda142f5bb 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3263,7 +3263,12 @@ def create_index( The number of sub-vectors for PQ (Product Quantization). accelerator : str or ``torch.Device``, optional If set, use an accelerator to speed up the training process. - Accepted accelerator: "cuda" (Nvidia GPU) and "mps" (Apple Silicon GPU). + Accepted accelerator: + + - "cuda" or ``torch.device(...)`` for the existing torch-based path + - "mps" for Apple Silicon GPU + - "cuvs" for the explicit cuVS-based IVF_PQ training path + If not set, use the CPU. index_cache_size : int, optional The size of the index cache in number of entries. Default value is 256. @@ -3372,8 +3377,10 @@ def create_index( Experimental Accelerator (GPU) support: - *accelerate*: use GPU to train IVF partitions. - Only supports CUDA (Nvidia) or MPS (Apple) currently. - Requires PyTorch being installed. + `accelerator="cuda"` and `accelerator="mps"` use the existing torch path. + `accelerator="cuvs"` uses cuVS for IVF_PQ training only. + The torch path requires PyTorch. The cuVS path requires the cuVS Python + bindings to be installed separately. .. code-block:: python @@ -3388,9 +3395,10 @@ def create_index( accelerator="cuda" ) - Note: GPU acceleration is currently supported only for the ``IVF_PQ`` index - type. Providing an accelerator for other index types will fall back to CPU - index building. + Note: accelerator support is currently limited to the ``IVF_PQ`` index type. + Providing ``accelerator="cuda"`` for other index types will fall back to CPU + index building. Providing ``accelerator="cuvs"`` for other index types will + raise an error. References ---------- From 1a6c44b7a1630d6ab8fe14aeea8dcce2d6c8bf99 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 14:17:05 +0800 Subject: [PATCH 3/8] python: fix cuvs training on real datasets --- python/python/lance/cuvs.py | 26 ++++++++++--- python/python/tests/test_vector_index.py | 48 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py index 6c0a4085c5..0bfa910cb2 100644 --- a/python/python/lance/cuvs.py +++ b/python/python/lance/cuvs.py @@ -7,6 +7,7 @@ from typing import Tuple import pyarrow as pa +import pyarrow.compute as pc from .dependencies import numpy as np @@ -64,6 +65,10 @@ def _column_to_numpy(table: pa.Table, column: str) -> np.ndarray: def _as_numpy(array_like) -> np.ndarray: if isinstance(array_like, np.ndarray): return array_like + + if hasattr(array_like, "copy_to_host"): + return np.asarray(array_like.copy_to_host()) + try: array = np.asarray(array_like) if isinstance(array, np.ndarray): @@ -122,6 +127,20 @@ def _estimate_trainset_fraction( return min(1.0, desired_rows / num_rows) +def _sample_training_table(dataset, column: str, train_rows: int, filt: str | None) -> pa.Table: + if filt is None: + return dataset.sample(train_rows, columns=[column], randomize_order=True) + + total_rows = dataset.count_rows() + sample_rows = min(total_rows, max(train_rows * 2, train_rows + 1024)) + trainset = dataset.sample(sample_rows, columns=[column], randomize_order=True) + trainset = trainset.filter(pc.is_valid(trainset.column(column))) + if len(trainset) >= train_rows or sample_rows == total_rows: + return trainset.slice(0, min(train_rows, len(trainset))) + + return dataset.to_table(columns=[column], filter=filt, limit=train_rows) + + def train_ivf_pq_on_cuvs( dataset, column: str, @@ -157,12 +176,7 @@ def train_ivf_pq_on_cuvs( raise ValueError("cuVS training requires at least one non-null training vector") train_rows = max(1, min(num_rows, max(num_partitions * sample_rate, 256 * 256))) - trainset = dataset.sample( - train_rows, - columns=[column], - filter=filt, - randomize_order=True, - ) + trainset = _sample_training_table(dataset, column, train_rows, filt) matrix = _column_to_numpy(trainset, column) ivf_pq = _require_cuvs() diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 54c6003c27..b92952e5f3 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -655,6 +655,54 @@ def fake_prepare( assert prepared["pq_codebook"].shape == (16, 256, 8) +def test_train_ivf_pq_on_cuvs_nullable_vectors(tmp_path, monkeypatch): + tbl = create_table(nvec=32, ndim=16, nullify=True) + dataset = lance.write_dataset(tbl, tmp_path) + + class FakeIndex: + centers = np.random.randn(4, 16).astype(np.float32) + pq_centers = np.random.randn(4, 256, 4).astype(np.float32) + + class FakeIvfPqModule: + class IndexParams: + def __init__(self, **kwargs): + self.kwargs = kwargs + + @staticmethod + def build(build_params, matrix): + assert build_params.kwargs["n_lists"] == 4 + assert matrix.shape[1] == 16 + assert matrix.dtype == np.float32 + return FakeIndex() + + monkeypatch.setattr(lance_cuvs, "_require_cuvs", lambda: FakeIvfPqModule()) + + centroids, pq_codebook = lance_cuvs.train_ivf_pq_on_cuvs( + dataset, + "vector", + 4, + "L2", + "cuvs", + 4, + sample_rate=4, + ) + + assert centroids.shape == (4, 16) + assert pq_codebook.shape == (4, 256, 4) + + +def test_cuvs_as_numpy_prefers_copy_to_host(): + class FakeDeviceTensor: + def copy_to_host(self): + return np.arange(6, dtype=np.float32).reshape(2, 3) + + array = lance_cuvs._as_numpy(FakeDeviceTensor()) + + assert isinstance(array, np.ndarray) + assert array.shape == (2, 3) + assert array.dtype == np.float32 + + def test_use_index(dataset, tmp_path): ann_ds = lance.write_dataset(dataset.to_table(), tmp_path / "indexed.lance") ann_ds = ann_ds.create_index( From 76995c0357d5f3e6c1dc55c4cfc26ac52a012dd7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 17:03:47 +0800 Subject: [PATCH 4/8] python: format cuvs helper --- python/python/lance/cuvs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py index 0bfa910cb2..5c58af9552 100644 --- a/python/python/lance/cuvs.py +++ b/python/python/lance/cuvs.py @@ -127,7 +127,9 @@ def _estimate_trainset_fraction( return min(1.0, desired_rows / num_rows) -def _sample_training_table(dataset, column: str, train_rows: int, filt: str | None) -> pa.Table: +def _sample_training_table( + dataset, column: str, train_rows: int, filt: str | None +) -> pa.Table: if filt is None: return dataset.sample(train_rows, columns=[column], randomize_order=True) From fbe0f50faf39f3c92365b534697f00dca0c4fbe6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 17:12:33 +0800 Subject: [PATCH 5/8] python: clarify accelerator hardware requirements --- python/python/lance/dataset.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cda142f5bb..baa9890daf 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3266,8 +3266,13 @@ def create_index( Accepted accelerator: - "cuda" or ``torch.device(...)`` for the existing torch-based path + on NVIDIA GPUs - "mps" for Apple Silicon GPU - - "cuvs" for the explicit cuVS-based IVF_PQ training path + - "cuvs" for the explicit cuVS-based IVF_PQ training path on NVIDIA + GPUs + + The cuVS path also requires the cuVS Python bindings to be installed + separately. If not set, use the CPU. index_cache_size : int, optional @@ -3377,10 +3382,13 @@ def create_index( Experimental Accelerator (GPU) support: - *accelerate*: use GPU to train IVF partitions. - `accelerator="cuda"` and `accelerator="mps"` use the existing torch path. - `accelerator="cuvs"` uses cuVS for IVF_PQ training only. - The torch path requires PyTorch. The cuVS path requires the cuVS Python - bindings to be installed separately. + `accelerator="cuda"` and `accelerator="mps"` use the existing torch + path. `accelerator="cuda"` runs on NVIDIA GPUs and `accelerator="mps"` + runs on Apple Silicon GPUs. `accelerator="cuvs"` uses cuVS for IVF_PQ + training only and requires an NVIDIA GPU. + + The torch path requires PyTorch. The cuVS path requires the cuVS + Python bindings to be installed separately. .. code-block:: python From f00d0783e221b27e05a8a4331f2af4d76ea93c2a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 19:15:25 +0800 Subject: [PATCH 6/8] python: add cuvs one-pass ivfpq assignment --- python/python/lance/cuvs.py | 248 ++++++++++++++++++++++- python/python/lance/dataset.py | 28 ++- python/python/tests/test_vector_index.py | 70 +++++++ 3 files changed, 337 insertions(+), 9 deletions(-) diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py index 5c58af9552..45142b6b77 100644 --- a/python/python/lance/cuvs.py +++ b/python/python/lance/cuvs.py @@ -3,13 +3,20 @@ from __future__ import annotations +import re +import tempfile from importlib import import_module -from typing import Tuple +from typing import TYPE_CHECKING, Iterator, Tuple import pyarrow as pa import pyarrow.compute as pc from .dependencies import numpy as np +from .log import LOGGER +from .util import _normalize_metric_type + +if TYPE_CHECKING: + from pathlib import Path def is_cuvs_accelerator(accelerator: object) -> bool: @@ -34,8 +41,33 @@ def _optional_cupy(): return None +def _xp_module(): + cupy = _optional_cupy() + return cupy if cupy is not None else np + + +def _make_progress(total: int): + try: + from tqdm.auto import tqdm + + return tqdm(total=total) + except ModuleNotFoundError: + + class _NoOpProgress: + def set_description(self, _description: str): + return None + + def update(self, _count: int): + return None + + def close(self): + return None + + return _NoOpProgress() + + def _metric_to_cuvs(metric_type: str) -> str: - metric_type = metric_type.lower() + metric_type = _normalize_metric_type(metric_type).lower() if metric_type in {"l2", "euclidean"}: return "sqeuclidean" if metric_type == "dot": @@ -45,12 +77,7 @@ def _metric_to_cuvs(metric_type: str) -> str: raise ValueError(f"Metric '{metric_type}' is not supported by cuVS IVF_PQ") -def _column_to_numpy(table: pa.Table, column: str) -> np.ndarray: - array = table.column(column).combine_chunks() - values = array.to_pylist() - if len(values) == 0: - raise ValueError("cuVS training requires at least one training vector") - matrix = np.asarray(values) +def _coerce_float_matrix(matrix: np.ndarray, *, column: str) -> np.ndarray: if matrix.ndim != 2: raise ValueError( f"Expected a 2D training matrix for column '{column}', got {matrix.shape}" @@ -62,6 +89,22 @@ def _column_to_numpy(table: pa.Table, column: str) -> np.ndarray: return matrix +def _column_to_numpy(table: pa.Table | pa.RecordBatch, column: str) -> np.ndarray: + array = table.column(column) + if isinstance(array, pa.ChunkedArray): + array = array.combine_chunks() + if len(array) == 0: + raise ValueError("cuVS training requires at least one training vector") + + if pa.types.is_fixed_size_list(array.type): + values = array.values.to_numpy(zero_copy_only=False) + matrix = values.reshape(len(array), array.type.list_size) + return _coerce_float_matrix(matrix, column=column) + + values = array.to_pylist() + return _coerce_float_matrix(np.asarray(values), column=column) + + def _as_numpy(array_like) -> np.ndarray: if isinstance(array_like, np.ndarray): return array_like @@ -143,6 +186,195 @@ def _sample_training_table( return dataset.to_table(columns=[column], filter=filt, limit=train_rows) +def _normalize_metric(metric_type: str) -> str: + return _normalize_metric_type(metric_type).lower() + + +def _backend_asarray(array_like, xp): + if xp is np: + return np.asarray(array_like) + return xp.asarray(array_like) + + +def _backend_to_numpy(array_like, xp) -> np.ndarray: + if xp is np: + return np.asarray(array_like) + return xp.asnumpy(array_like) + + +def _normalize_rows(matrix, xp): + eps = xp.finfo(matrix.dtype).eps + norms = xp.linalg.norm(matrix, axis=1, keepdims=True) + return matrix / xp.maximum(norms, eps) + + +def _argmin_distance(vectors, centroids, metric_type: str, xp): + if vectors.shape[0] == 0: + return xp.empty((0,), dtype=xp.int32) + + metric_type = _normalize_metric(metric_type) + if metric_type in {"l2", "euclidean"}: + vec_norms = xp.sum(vectors * vectors, axis=1, keepdims=True) + ctr_norms = xp.sum(centroids * centroids, axis=1, keepdims=False) + distances = vec_norms + ctr_norms - 2 * vectors @ centroids.T + return xp.argmin(distances, axis=1).astype(xp.int32, copy=False) + + if metric_type == "dot": + scores = vectors @ centroids.T + return xp.argmax(scores, axis=1).astype(xp.int32, copy=False) + + if metric_type == "cosine": + scores = _normalize_rows(vectors, xp) @ _normalize_rows(centroids, xp).T + return xp.argmax(scores, axis=1).astype(xp.int32, copy=False) + + raise ValueError(f"Metric '{metric_type}' is not supported by cuVS IVF_PQ") + + +def _encode_pq_codes(residuals, pq_codebook, metric_type: str, xp) -> np.ndarray: + num_rows, num_sub_vectors, _ = residuals.shape + codes = np.empty((num_rows, num_sub_vectors), dtype=np.uint8) + for subvector_idx in range(num_sub_vectors): + sub_vectors = residuals[:, subvector_idx, :] + sub_codebook = pq_codebook[subvector_idx] + nearest = _argmin_distance(sub_vectors, sub_codebook, metric_type, xp) + codes[:, subvector_idx] = _backend_to_numpy(nearest, xp).astype( + np.uint8, copy=False + ) + return codes + + +def _make_shuffle_batch( + row_ids: np.ndarray, + partitions: np.ndarray, + pq_codes: np.ndarray, + num_sub_vectors: int, +) -> pa.RecordBatch: + pq_values = pa.array(pq_codes.reshape(-1)) + pq_code_array = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors) + return pa.RecordBatch.from_arrays( + [ + pa.array(row_ids, type=pa.uint64()), + pa.array(partitions, type=pa.uint32()), + pq_code_array, + ], + schema=pa.schema( + [ + pa.field("row_id", pa.uint64()), + pa.field("__ivf_part_id", pa.uint32()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ), + ) + + +def one_pass_assign_ivf_pq_on_cuvs( + dataset, + column: str, + metric_type: str, + accelerator: str, + ivf_centroids: np.ndarray, + pq_codebook: np.ndarray, + dst_dataset_uri: str | Path | None = None, + batch_size: int = 1024 * 10 * 4, + *, + filter_nan: bool = True, +): + if accelerator != "cuvs": + raise ValueError("cuVS acceleration only supports accelerator='cuvs'") + + num_rows = dataset.count_rows() + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + num_sub_vectors = pq_codebook.shape[0] + subvector_size = pq_codebook.shape[2] + dim = ivf_centroids.shape[1] + if dim != num_sub_vectors * subvector_size: + raise ValueError( + "cuVS returned incompatible IVF/PQ dimensions: " + f"centroids dim {dim} != {num_sub_vectors} * {subvector_size}" + ) + + xp = _xp_module() + backend_centroids = _backend_asarray(ivf_centroids, xp) + backend_codebook = _backend_asarray(pq_codebook, xp) + + progress = _make_progress(num_rows) + progress.set_description("Assigning partitions and computing pq codes") + + def _partition_and_pq_codes_assignment() -> Iterator[pa.RecordBatch]: + for batch in dataset.to_batches( + columns=[column], + filter=filt, + with_row_id=True, + batch_size=batch_size, + ): + vectors = _column_to_numpy(batch, column) + row_ids = batch.column("_rowid").to_numpy() + valid_mask = np.isfinite(vectors).all(axis=1) + if not np.all(valid_mask): + LOGGER.warning( + "%s vectors are ignored during partition assignment", + len(valid_mask) - int(valid_mask.sum()), + ) + row_ids = row_ids[valid_mask] + vectors = vectors[valid_mask] + if len(row_ids) == 0: + continue + backend_vectors = _backend_asarray(vectors, xp) + + partitions = _argmin_distance( + backend_vectors, backend_centroids, metric_type, xp + ) + selected_centroids = backend_centroids[partitions] + residuals = backend_vectors - selected_centroids + residuals = residuals.reshape(-1, num_sub_vectors, subvector_size) + pq_codes = _encode_pq_codes(residuals, backend_codebook, metric_type, xp) + + partition_batch = _make_shuffle_batch( + row_ids, + _backend_to_numpy(partitions, xp), + pq_codes, + num_sub_vectors, + ) + progress.update(partition_batch.num_rows) + yield partition_batch + + output_schema = pa.schema( + [ + pa.field("row_id", pa.uint64()), + pa.field("__ivf_part_id", pa.uint32()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ) + rbr = pa.RecordBatchReader.from_batches( + output_schema, _partition_and_pq_codes_assignment() + ) + if dst_dataset_uri is None: + dst_dataset_uri = tempfile.mkdtemp() + if re.search(r".:\\", dst_dataset_uri) is not None: + dst_dataset_uri = dst_dataset_uri.replace("\\", "/", 1) + + from . import write_dataset + + ds = write_dataset( + rbr, + dst_dataset_uri, + schema=output_schema, + data_storage_version="legacy", + ) + + progress.close() + LOGGER.info("Saved precomputed pq_codes to %s", dst_dataset_uri) + + shuffle_buffers = [ + data_file.path for frag in ds.get_fragments() for data_file in frag.data_files() + ] + return str(dst_dataset_uri), shuffle_buffers + + def train_ivf_pq_on_cuvs( dataset, column: str, diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index baa9890daf..485f442569 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2960,7 +2960,10 @@ def _create_index_impl( ) if use_cuvs: - from .cuvs import one_pass_train_ivf_pq_on_cuvs + from .cuvs import ( + one_pass_assign_ivf_pq_on_cuvs, + one_pass_train_ivf_pq_on_cuvs, + ) LOGGER.info("Doing one-pass ivfpq cuVS training") timers["ivf+pq_train:start"] = time.time() @@ -2981,6 +2984,29 @@ def _create_index_impl( timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] ) LOGGER.info("cuVS ivf+pq training time: %ss", ivfpq_train_time) + timers["ivf+pq_assign:start"] = time.time() + ( + shuffle_output_dir, + shuffle_buffers, + ) = one_pass_assign_ivf_pq_on_cuvs( + self, + column[0], + metric, + accelerator, + ivf_centroids, + pq_codebook, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_assign:end"] = time.time() + ivfpq_assign_time = ( + timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] + ) + LOGGER.info("cuVS ivf+pq transform time: %ss", ivfpq_assign_time) + kwargs["precomputed_shuffle_buffers"] = shuffle_buffers + kwargs["precomputed_shuffle_buffers_path"] = os.path.join( + shuffle_output_dir, "data" + ) else: from .vector import ( one_pass_assign_ivf_pq_on_accelerator, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index b92952e5f3..7b49137e7a 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -572,7 +572,48 @@ def fake_train( ), ) + def fake_assign( + dataset_arg, + column, + metric_type, + accelerator, + ivf_centroids, + pq_codebook, + dst_dataset_uri=None, + batch_size=20480, + *, + filter_nan, + ): + calls["assign_dataset"] = dataset_arg + calls["assign_column"] = column + calls["assign_metric_type"] = metric_type + calls["assign_accelerator"] = accelerator + calls["assign_batch_size"] = batch_size + calls["assign_filter_nan"] = filter_nan + + row_ids = dataset_arg.to_table(columns=[], with_row_id=True)[ + "_rowid" + ].to_numpy() + part_ids = pa.array(np.zeros(len(row_ids), dtype=np.uint32)) + pq_values = pa.array(np.zeros(len(row_ids) * 16, dtype=np.uint8)) + pq_codes = pa.FixedSizeListArray.from_arrays(pq_values, 16) + shuffle_ds_uri = str(tmp_path / "cuvs_shuffle_buffers") + shuffle_ds = lance.write_dataset( + pa.Table.from_arrays( + [pa.array(row_ids), part_ids, pq_codes], + names=["row_id", "__ivf_part_id", "__pq_code"], + ), + shuffle_ds_uri, + ) + shuffle_buffers = [ + data_file.path + for frag in shuffle_ds.get_fragments() + for data_file in frag.data_files() + ] + return shuffle_ds_uri, shuffle_buffers + monkeypatch.setattr(lance_cuvs, "one_pass_train_ivf_pq_on_cuvs", fake_train) + monkeypatch.setattr(lance_cuvs, "one_pass_assign_ivf_pq_on_cuvs", fake_assign) dataset = dataset.create_index( "vector", @@ -587,6 +628,9 @@ def fake_train( assert calls["metric_type"] == "L2" assert calls["accelerator"] == "cuvs" assert calls["num_sub_vectors"] == 16 + assert calls["assign_column"] == "vector" + assert calls["assign_metric_type"] == "L2" + assert calls["assign_accelerator"] == "cuvs" assert dataset.stats.index_stats("vector_idx")["index_type"] == "IVF_PQ" @@ -703,6 +747,32 @@ def copy_to_host(self): assert array.dtype == np.float32 +def test_one_pass_assign_ivf_pq_on_cuvs_writes_shuffle_buffers(tmp_path): + tbl = create_table(nvec=32, ndim=16) + dataset = lance.write_dataset(tbl, tmp_path / "cuvs_assign_src") + + ivf_centroids = np.random.randn(4, 16).astype(np.float32) + pq_codebook = np.random.randn(4, 256, 4).astype(np.float32) + + shuffle_uri, shuffle_buffers = lance_cuvs.one_pass_assign_ivf_pq_on_cuvs( + dataset, + "vector", + "l2", + "cuvs", + ivf_centroids, + pq_codebook, + batch_size=8, + ) + + shuffle_ds = lance.dataset(shuffle_uri) + batch = next(shuffle_ds.to_batches()) + + assert len(shuffle_buffers) > 0 + assert batch.column("row_id").type == pa.uint64() + assert batch.column("__ivf_part_id").type == pa.uint32() + assert batch.column("__pq_code").type == pa.list_(pa.uint8(), 4) + + def test_use_index(dataset, tmp_path): ann_ds = lance.write_dataset(dataset.to_table(), tmp_path / "indexed.lance") ann_ds = ann_ds.create_index( From 7f7e6e2e3829b45c9324ae7014c314489879eaab Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Apr 2026 23:31:24 +0800 Subject: [PATCH 7/8] python: use cuvs transform for full ivf pq build --- python/python/lance/cuvs.py | 219 ++++++++++------------- python/python/lance/dataset.py | 5 +- python/python/tests/test_vector_index.py | 45 ++++- 3 files changed, 142 insertions(+), 127 deletions(-) diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py index 45142b6b77..8498be7720 100644 --- a/python/python/lance/cuvs.py +++ b/python/python/lance/cuvs.py @@ -41,11 +41,6 @@ def _optional_cupy(): return None -def _xp_module(): - cupy = _optional_cupy() - return cupy if cupy is not None else np - - def _make_progress(total: int): try: from tqdm.auto import tqdm @@ -129,6 +124,16 @@ def _as_numpy(array_like) -> np.ndarray: raise TypeError("Unable to convert cuVS output to numpy") +def _to_cuvs_transform_input(matrix: np.ndarray): + cupy = _optional_cupy() + if cupy is None: + raise ModuleNotFoundError( + "accelerator='cuvs' full index build requires the 'cupy' package " + "to pass transform batches in device memory" + ) + return cupy.asarray(matrix) + + def _normalize_centroids(index, num_partitions: int, dimension: int) -> np.ndarray: centroids = _as_numpy(index.centers) if centroids.shape != (num_partitions, dimension): @@ -186,63 +191,6 @@ def _sample_training_table( return dataset.to_table(columns=[column], filter=filt, limit=train_rows) -def _normalize_metric(metric_type: str) -> str: - return _normalize_metric_type(metric_type).lower() - - -def _backend_asarray(array_like, xp): - if xp is np: - return np.asarray(array_like) - return xp.asarray(array_like) - - -def _backend_to_numpy(array_like, xp) -> np.ndarray: - if xp is np: - return np.asarray(array_like) - return xp.asnumpy(array_like) - - -def _normalize_rows(matrix, xp): - eps = xp.finfo(matrix.dtype).eps - norms = xp.linalg.norm(matrix, axis=1, keepdims=True) - return matrix / xp.maximum(norms, eps) - - -def _argmin_distance(vectors, centroids, metric_type: str, xp): - if vectors.shape[0] == 0: - return xp.empty((0,), dtype=xp.int32) - - metric_type = _normalize_metric(metric_type) - if metric_type in {"l2", "euclidean"}: - vec_norms = xp.sum(vectors * vectors, axis=1, keepdims=True) - ctr_norms = xp.sum(centroids * centroids, axis=1, keepdims=False) - distances = vec_norms + ctr_norms - 2 * vectors @ centroids.T - return xp.argmin(distances, axis=1).astype(xp.int32, copy=False) - - if metric_type == "dot": - scores = vectors @ centroids.T - return xp.argmax(scores, axis=1).astype(xp.int32, copy=False) - - if metric_type == "cosine": - scores = _normalize_rows(vectors, xp) @ _normalize_rows(centroids, xp).T - return xp.argmax(scores, axis=1).astype(xp.int32, copy=False) - - raise ValueError(f"Metric '{metric_type}' is not supported by cuVS IVF_PQ") - - -def _encode_pq_codes(residuals, pq_codebook, metric_type: str, xp) -> np.ndarray: - num_rows, num_sub_vectors, _ = residuals.shape - codes = np.empty((num_rows, num_sub_vectors), dtype=np.uint8) - for subvector_idx in range(num_sub_vectors): - sub_vectors = residuals[:, subvector_idx, :] - sub_codebook = pq_codebook[subvector_idx] - nearest = _argmin_distance(sub_vectors, sub_codebook, metric_type, xp) - codes[:, subvector_idx] = _backend_to_numpy(nearest, xp).astype( - np.uint8, copy=False - ) - return codes - - def _make_shuffle_batch( row_ids: np.ndarray, partitions: np.ndarray, @@ -267,6 +215,65 @@ def _make_shuffle_batch( ) +def _train_ivf_pq_index_on_cuvs( + dataset, + column: str, + num_partitions: int, + metric_type: str, + accelerator: str, + num_sub_vectors: int, + *, + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, + filter_nan: bool = True, +): + if accelerator != "cuvs": + raise ValueError("cuVS acceleration only supports accelerator='cuvs'") + if num_bits != 8: + raise ValueError("cuVS IVF_PQ integration currently supports only num_bits=8") + + dimension = dataset.schema.field(column).type.list_size + if dimension % num_sub_vectors != 0: + raise ValueError( + "cuVS IVF_PQ integration requires vector dimension to be divisible by " + "num_sub_vectors" + ) + + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + num_rows = dataset.count_rows(filter=filt) + if num_rows == 0: + raise ValueError("cuVS training requires at least one non-null training vector") + + train_rows = max(1, min(num_rows, max(num_partitions * sample_rate, 256 * 256))) + trainset = _sample_training_table(dataset, column, train_rows, filt) + matrix = _column_to_numpy(trainset, column) + + ivf_pq = _require_cuvs() + build_params = ivf_pq.IndexParams( + n_lists=num_partitions, + metric=_metric_to_cuvs(metric_type), + kmeans_n_iters=max_iters, + kmeans_trainset_fraction=_estimate_trainset_fraction( + matrix.shape[0], num_partitions, sample_rate + ), + pq_bits=num_bits, + pq_dim=num_sub_vectors, + codebook_kind="subspace", + force_random_rotation=False, + add_data_on_build=False, + ) + + index = ivf_pq.build(build_params, matrix) + centroids = _normalize_centroids(index, num_partitions, dimension) + pq_codebook = _normalize_pq_codebook(index, num_sub_vectors, num_bits, dimension) + return index, centroids, pq_codebook + + def one_pass_assign_ivf_pq_on_cuvs( dataset, column: str, @@ -274,6 +281,7 @@ def one_pass_assign_ivf_pq_on_cuvs( accelerator: str, ivf_centroids: np.ndarray, pq_codebook: np.ndarray, + trained_index=None, dst_dataset_uri: str | Path | None = None, batch_size: int = 1024 * 10 * 4, *, @@ -289,18 +297,14 @@ def one_pass_assign_ivf_pq_on_cuvs( filt = None num_sub_vectors = pq_codebook.shape[0] - subvector_size = pq_codebook.shape[2] - dim = ivf_centroids.shape[1] - if dim != num_sub_vectors * subvector_size: + ivf_pq = _require_cuvs() + + if trained_index is None: raise ValueError( - "cuVS returned incompatible IVF/PQ dimensions: " - f"centroids dim {dim} != {num_sub_vectors} * {subvector_size}" + "one_pass_assign_ivf_pq_on_cuvs requires a trained cuVS index for " + "single-node transform" ) - xp = _xp_module() - backend_centroids = _backend_asarray(ivf_centroids, xp) - backend_codebook = _backend_asarray(pq_codebook, xp) - progress = _make_progress(num_rows) progress.set_description("Assigning partitions and computing pq codes") @@ -323,19 +327,20 @@ def _partition_and_pq_codes_assignment() -> Iterator[pa.RecordBatch]: vectors = vectors[valid_mask] if len(row_ids) == 0: continue - backend_vectors = _backend_asarray(vectors, xp) - - partitions = _argmin_distance( - backend_vectors, backend_centroids, metric_type, xp + partitions, pq_codes = ivf_pq.transform( + trained_index, _to_cuvs_transform_input(vectors) ) - selected_centroids = backend_centroids[partitions] - residuals = backend_vectors - selected_centroids - residuals = residuals.reshape(-1, num_sub_vectors, subvector_size) - pq_codes = _encode_pq_codes(residuals, backend_codebook, metric_type, xp) + partitions = _as_numpy(partitions).astype(np.uint32, copy=False) + pq_codes = _as_numpy(pq_codes).astype(np.uint8, copy=False) + if pq_codes.shape != (len(row_ids), num_sub_vectors): + raise ValueError( + "cuVS transform returned incompatible PQ codes shape: " + f"expected {(len(row_ids), num_sub_vectors)}, got {pq_codes.shape}" + ) partition_batch = _make_shuffle_batch( row_ids, - _backend_to_numpy(partitions, xp), + partitions, pq_codes, num_sub_vectors, ) @@ -388,50 +393,18 @@ def train_ivf_pq_on_cuvs( num_bits: int = 8, filter_nan: bool = True, ) -> Tuple[np.ndarray, np.ndarray]: - if accelerator != "cuvs": - raise ValueError("cuVS acceleration only supports accelerator='cuvs'") - if num_bits != 8: - raise ValueError("cuVS IVF_PQ integration currently supports only num_bits=8") - - dimension = dataset.schema.field(column).type.list_size - if dimension % num_sub_vectors != 0: - raise ValueError( - "cuVS IVF_PQ integration requires vector dimension to be divisible by " - "num_sub_vectors" - ) - - if dataset.schema.field(column).nullable and filter_nan: - filt = f"{column} is not null" - else: - filt = None - - num_rows = dataset.count_rows(filter=filt) - if num_rows == 0: - raise ValueError("cuVS training requires at least one non-null training vector") - - train_rows = max(1, min(num_rows, max(num_partitions * sample_rate, 256 * 256))) - trainset = _sample_training_table(dataset, column, train_rows, filt) - matrix = _column_to_numpy(trainset, column) - - ivf_pq = _require_cuvs() - build_params = ivf_pq.IndexParams( - n_lists=num_partitions, - metric=_metric_to_cuvs(metric_type), - kmeans_n_iters=max_iters, - kmeans_trainset_fraction=_estimate_trainset_fraction( - matrix.shape[0], num_partitions, sample_rate - ), - pq_bits=num_bits, - pq_dim=num_sub_vectors, - codebook_kind="subspace", - force_random_rotation=False, - add_data_on_build=False, + _, centroids, pq_codebook = _train_ivf_pq_index_on_cuvs( + dataset, + column, + num_partitions, + metric_type, + accelerator, + num_sub_vectors, + sample_rate=sample_rate, + max_iters=max_iters, + num_bits=num_bits, + filter_nan=filter_nan, ) - - index = ivf_pq.build(build_params, matrix) - - centroids = _normalize_centroids(index, num_partitions, dimension) - pq_codebook = _normalize_pq_codebook(index, num_sub_vectors, num_bits, dimension) return centroids, pq_codebook diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 485f442569..1b24962a70 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2961,13 +2961,13 @@ def _create_index_impl( if use_cuvs: from .cuvs import ( + _train_ivf_pq_index_on_cuvs, one_pass_assign_ivf_pq_on_cuvs, - one_pass_train_ivf_pq_on_cuvs, ) LOGGER.info("Doing one-pass ivfpq cuVS training") timers["ivf+pq_train:start"] = time.time() - ivf_centroids, pq_codebook = one_pass_train_ivf_pq_on_cuvs( + trained_index, ivf_centroids, pq_codebook = _train_ivf_pq_index_on_cuvs( self, column[0], num_partitions, @@ -2995,6 +2995,7 @@ def _create_index_impl( accelerator, ivf_centroids, pq_codebook, + trained_index=trained_index, batch_size=20480, filter_nan=filter_nan, ) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 7b49137e7a..a54475e1ae 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -542,6 +542,10 @@ def test_create_index_cuvs_dispatch(tmp_path, monkeypatch): dataset = lance.write_dataset(tbl, tmp_path) calls = {} + class FakeIndex: + pq_dim = 16 + pq_bits = 8 + def fake_train( dataset_arg, column, @@ -566,6 +570,7 @@ def fake_train( calls["num_bits"] = num_bits calls["filter_nan"] = filter_nan return ( + FakeIndex(), np.random.randn(num_partitions, 128).astype(np.float32), np.random.randn(num_sub_vectors, 256, 128 // num_sub_vectors).astype( np.float32 @@ -579,6 +584,7 @@ def fake_assign( accelerator, ivf_centroids, pq_codebook, + trained_index=None, dst_dataset_uri=None, batch_size=20480, *, @@ -588,6 +594,7 @@ def fake_assign( calls["assign_column"] = column calls["assign_metric_type"] = metric_type calls["assign_accelerator"] = accelerator + calls["assign_trained_index"] = trained_index calls["assign_batch_size"] = batch_size calls["assign_filter_nan"] = filter_nan @@ -612,7 +619,7 @@ def fake_assign( ] return shuffle_ds_uri, shuffle_buffers - monkeypatch.setattr(lance_cuvs, "one_pass_train_ivf_pq_on_cuvs", fake_train) + monkeypatch.setattr(lance_cuvs, "_train_ivf_pq_index_on_cuvs", fake_train) monkeypatch.setattr(lance_cuvs, "one_pass_assign_ivf_pq_on_cuvs", fake_assign) dataset = dataset.create_index( @@ -631,6 +638,7 @@ def fake_assign( assert calls["assign_column"] == "vector" assert calls["assign_metric_type"] == "L2" assert calls["assign_accelerator"] == "cuvs" + assert isinstance(calls["assign_trained_index"], FakeIndex) assert dataset.stats.index_stats("vector_idx")["index_type"] == "IVF_PQ" @@ -747,13 +755,45 @@ def copy_to_host(self): assert array.dtype == np.float32 -def test_one_pass_assign_ivf_pq_on_cuvs_writes_shuffle_buffers(tmp_path): +def test_one_pass_assign_ivf_pq_on_cuvs_writes_shuffle_buffers(tmp_path, monkeypatch): tbl = create_table(nvec=32, ndim=16) dataset = lance.write_dataset(tbl, tmp_path / "cuvs_assign_src") ivf_centroids = np.random.randn(4, 16).astype(np.float32) pq_codebook = np.random.randn(4, 256, 4).astype(np.float32) + class FakeDeviceTensor: + def __init__(self, array): + self._array = array + + def copy_to_host(self): + return self._array + + class FakeCupyArray: + def __init__(self, array): + self.array = array + + class FakeCupyModule: + @staticmethod + def asarray(array): + return FakeCupyArray(array) + + class FakeIndex: + pq_dim = 4 + pq_bits = 8 + + class FakeIvfPqModule: + @staticmethod + def transform(index, vectors): + assert isinstance(index, FakeIndex) + assert isinstance(vectors, FakeCupyArray) + labels = np.arange(len(vectors.array), dtype=np.uint32) % 4 + pq_codes = np.full((len(vectors.array), 4), 7, dtype=np.uint8) + return FakeDeviceTensor(labels), FakeDeviceTensor(pq_codes) + + monkeypatch.setattr(lance_cuvs, "_require_cuvs", lambda: FakeIvfPqModule()) + monkeypatch.setattr(lance_cuvs, "_optional_cupy", lambda: FakeCupyModule()) + shuffle_uri, shuffle_buffers = lance_cuvs.one_pass_assign_ivf_pq_on_cuvs( dataset, "vector", @@ -761,6 +801,7 @@ def test_one_pass_assign_ivf_pq_on_cuvs_writes_shuffle_buffers(tmp_path): "cuvs", ivf_centroids, pq_codebook, + trained_index=FakeIndex(), batch_size=8, ) From f9c5d03d8f34e680bb54f03a626d219bcde62e8f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 2 Apr 2026 17:04:49 +0800 Subject: [PATCH 8/8] python: route cuvs precomputed shuffle to v3 files --- python/python/lance/cuvs.py | 135 ++++++++++---- python/python/lance/dataset.py | 6 +- python/python/tests/test_vector_index.py | 16 +- rust/lance-index/src/vector/v3/shuffler.rs | 203 ++++++++++++++++++++- rust/lance/src/index/vector/builder.rs | 70 ++++++- 5 files changed, 376 insertions(+), 54 deletions(-) diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py index 8498be7720..ed8cad8390 100644 --- a/python/python/lance/cuvs.py +++ b/python/python/lance/cuvs.py @@ -3,6 +3,8 @@ from __future__ import annotations +import json +import os import re import tempfile from importlib import import_module @@ -12,6 +14,7 @@ import pyarrow.compute as pc from .dependencies import numpy as np +from .file import LanceFileWriter from .log import LOGGER from .util import _normalize_metric_type @@ -195,24 +198,108 @@ def _make_shuffle_batch( row_ids: np.ndarray, partitions: np.ndarray, pq_codes: np.ndarray, + num_partitions: int, num_sub_vectors: int, -) -> pa.RecordBatch: - pq_values = pa.array(pq_codes.reshape(-1)) +) -> tuple[pa.RecordBatch, pa.RecordBatch]: + sort_indices = np.argsort(partitions, kind="stable") + row_ids = row_ids[sort_indices] + partitions = partitions[sort_indices] + pq_codes = pq_codes[sort_indices] + + pq_values = pa.array(pq_codes.reshape(-1), type=pa.uint8()) pq_code_array = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors) - return pa.RecordBatch.from_arrays( + partition_counts = np.bincount(partitions, minlength=num_partitions).astype( + np.uint64, copy=False + ) + offsets = np.cumsum(partition_counts, dtype=np.uint64) + data_batch = pa.RecordBatch.from_arrays( [ pa.array(row_ids, type=pa.uint64()), - pa.array(partitions, type=pa.uint32()), pq_code_array, ], schema=pa.schema( [ - pa.field("row_id", pa.uint64()), - pa.field("__ivf_part_id", pa.uint32()), + pa.field("_rowid", pa.uint64()), pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), ] ), ) + offsets_batch = pa.RecordBatch.from_arrays( + [pa.array(offsets, type=pa.uint64())], + schema=pa.schema([pa.field("offset", pa.uint64())]), + ) + return data_batch, offsets_batch + + +def _shuffle_metadata( + num_partitions: int, num_batches: int, partition_counts +) -> dict[str, str]: + return { + "lance:shuffle:num_partitions": str(num_partitions), + "lance:shuffle:num_batches": str(num_batches), + "lance:shuffle:partition_counts": json.dumps(list(partition_counts)), + "lance:shuffle:total_loss": "0.0", + } + + +def _write_v3_shuffle_files( + output_root: str, + batches: Iterator[tuple[pa.RecordBatch, pa.RecordBatch]], + *, + num_partitions: int, + num_sub_vectors: int, +) -> list[str]: + os.makedirs(output_root, exist_ok=True) + data_path = os.path.join(output_root, "shuffle_data.lance") + offsets_path = os.path.join(output_root, "shuffle_offsets.lance") + + data_schema = pa.schema( + [ + pa.field("_rowid", pa.uint64()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ) + offsets_schema = pa.schema([pa.field("offset", pa.uint64())]) + + data_writer = None + offsets_writer = LanceFileWriter(offsets_path, offsets_schema) + total_partition_counts = np.zeros(num_partitions, dtype=np.uint64) + global_row_count = np.uint64(0) + num_batches = 0 + + for data_batch, offsets_batch in batches: + if data_writer is None: + data_writer = LanceFileWriter(data_path, data_batch.schema) + data_writer.write_batch(data_batch) + + offsets = offsets_batch.column(0).to_numpy() + adjusted_offsets = offsets + global_row_count + offsets_writer.write_batch( + pa.RecordBatch.from_arrays( + [pa.array(adjusted_offsets, type=pa.uint64())], + schema=offsets_schema, + ) + ) + last_offset = np.uint64(0) + for idx, offset in enumerate(offsets): + total_partition_counts[idx] += np.uint64(offset) - last_offset + last_offset = np.uint64(offset) + global_row_count += np.uint64(data_batch.num_rows) + num_batches += 1 + + if data_writer is None: + data_writer = LanceFileWriter(data_path, data_schema) + + metadata = _shuffle_metadata( + num_partitions, num_batches, total_partition_counts.tolist() + ) + for key, value in metadata.items(): + data_writer.add_schema_metadata(key, value) + offsets_writer.add_schema_metadata(key, value) + + data_writer.close() + offsets_writer.close() + return ["shuffle_data.lance", "shuffle_offsets.lance"] def _train_ivf_pq_index_on_cuvs( @@ -283,7 +370,7 @@ def one_pass_assign_ivf_pq_on_cuvs( pq_codebook: np.ndarray, trained_index=None, dst_dataset_uri: str | Path | None = None, - batch_size: int = 1024 * 10 * 4, + batch_size: int = 1024 * 128, *, filter_nan: bool = True, ): @@ -308,7 +395,9 @@ def one_pass_assign_ivf_pq_on_cuvs( progress = _make_progress(num_rows) progress.set_description("Assigning partitions and computing pq codes") - def _partition_and_pq_codes_assignment() -> Iterator[pa.RecordBatch]: + def _partition_and_pq_codes_assignment() -> Iterator[ + tuple[pa.RecordBatch, pa.RecordBatch] + ]: for batch in dataset.to_batches( columns=[column], filter=filt, @@ -342,41 +431,25 @@ def _partition_and_pq_codes_assignment() -> Iterator[pa.RecordBatch]: row_ids, partitions, pq_codes, + ivf_centroids.shape[0], num_sub_vectors, ) - progress.update(partition_batch.num_rows) + progress.update(len(row_ids)) yield partition_batch - output_schema = pa.schema( - [ - pa.field("row_id", pa.uint64()), - pa.field("__ivf_part_id", pa.uint32()), - pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), - ] - ) - rbr = pa.RecordBatchReader.from_batches( - output_schema, _partition_and_pq_codes_assignment() - ) if dst_dataset_uri is None: dst_dataset_uri = tempfile.mkdtemp() if re.search(r".:\\", dst_dataset_uri) is not None: dst_dataset_uri = dst_dataset_uri.replace("\\", "/", 1) - - from . import write_dataset - - ds = write_dataset( - rbr, - dst_dataset_uri, - schema=output_schema, - data_storage_version="legacy", + shuffle_buffers = _write_v3_shuffle_files( + str(dst_dataset_uri), + _partition_and_pq_codes_assignment(), + num_partitions=ivf_centroids.shape[0], + num_sub_vectors=num_sub_vectors, ) progress.close() LOGGER.info("Saved precomputed pq_codes to %s", dst_dataset_uri) - - shuffle_buffers = [ - data_file.path for frag in ds.get_fragments() for data_file in frag.data_files() - ] return str(dst_dataset_uri), shuffle_buffers diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 1b24962a70..5dee776791 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2996,7 +2996,7 @@ def _create_index_impl( ivf_centroids, pq_codebook, trained_index=trained_index, - batch_size=20480, + batch_size=1024 * 128, filter_nan=filter_nan, ) timers["ivf+pq_assign:end"] = time.time() @@ -3005,9 +3005,7 @@ def _create_index_impl( ) LOGGER.info("cuVS ivf+pq transform time: %ss", ivfpq_assign_time) kwargs["precomputed_shuffle_buffers"] = shuffle_buffers - kwargs["precomputed_shuffle_buffers_path"] = os.path.join( - shuffle_output_dir, "data" - ) + kwargs["precomputed_shuffle_buffers_path"] = shuffle_output_dir else: from .vector import ( one_pass_assign_ivf_pq_on_accelerator, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index a54475e1ae..cf0bfe6e2b 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -805,13 +805,17 @@ def transform(index, vectors): batch_size=8, ) - shuffle_ds = lance.dataset(shuffle_uri) - batch = next(shuffle_ds.to_batches()) + from lance.file import LanceFileReader - assert len(shuffle_buffers) > 0 - assert batch.column("row_id").type == pa.uint64() - assert batch.column("__ivf_part_id").type == pa.uint32() - assert batch.column("__pq_code").type == pa.list_(pa.uint8(), 4) + data_reader = LanceFileReader(str(Path(shuffle_uri) / "shuffle_data.lance")) + offsets_reader = LanceFileReader(str(Path(shuffle_uri) / "shuffle_offsets.lance")) + data_batch = next(data_reader.read_all(batch_size=1024).to_batches()) + offsets_batch = next(offsets_reader.read_all(batch_size=1024).to_batches()) + + assert shuffle_buffers == ["shuffle_data.lance", "shuffle_offsets.lance"] + assert data_batch.column("_rowid").type == pa.uint64() + assert data_batch.column("__pq_code").type == pa.list_(pa.uint8(), 4) + assert offsets_batch.column("offset").type == pa.uint64() def test_use_index(dataset, tmp_path): diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index 0bf714df23..45c719d523 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -4,6 +4,7 @@ //! Shuffler is a component that takes a stream of record batches and shuffles them into //! the corresponding IVF partitions. +use std::collections::HashMap; use std::ops::Range; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; @@ -36,6 +37,13 @@ use object_store::path::Path; use crate::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN}; +const SHUFFLE_NUM_PARTITIONS_METADATA_KEY: &str = "lance:shuffle:num_partitions"; +const SHUFFLE_NUM_BATCHES_METADATA_KEY: &str = "lance:shuffle:num_batches"; +const SHUFFLE_PARTITION_COUNTS_METADATA_KEY: &str = "lance:shuffle:partition_counts"; +const SHUFFLE_TOTAL_LOSS_METADATA_KEY: &str = "lance:shuffle:total_loss"; +pub const SHUFFLE_DATA_FILE_NAME: &str = "shuffle_data.lance"; +pub const SHUFFLE_OFFSETS_FILE_NAME: &str = "shuffle_offsets.lance"; + #[async_trait::async_trait] /// A reader that can read the shuffled partitions. pub trait ShuffleReader: Send + Sync { @@ -435,7 +443,7 @@ impl Shuffler for TwoFileShuffler { ); // Create data file writer - let data_path = self.output_dir.child("shuffle_data.lance"); + let data_path = self.output_dir.child(SHUFFLE_DATA_FILE_NAME); let spill_path = self.output_dir.child("shuffle_data.spill"); let writer = self.object_store.create(&data_path).await?; let mut file_writer = FileWriter::try_new( @@ -446,7 +454,7 @@ impl Shuffler for TwoFileShuffler { .with_page_metadata_spill(self.object_store.clone(), spill_path); // Create offsets file writer - let offsets_path = self.output_dir.child("shuffle_offsets.lance"); + let offsets_path = self.output_dir.child(SHUFFLE_OFFSETS_FILE_NAME); let spill_path = self.output_dir.child("shuffle_offsets.spill"); let writer = self.object_store.create(&offsets_path).await?; let mut offsets_writer = FileWriter::try_new( @@ -527,13 +535,37 @@ impl Shuffler for TwoFileShuffler { .await?; } + let partition_counts_json = serde_json::to_string(&partition_counts).map_err(|e| { + Error::invalid_input(format!("Failed to serialize shuffle partition counts: {e}")) + })?; + let num_partitions_str = num_partitions.to_string(); + let num_batches_str = num_batches + .load(std::sync::atomic::Ordering::Relaxed) + .to_string(); + let total_loss_str = total_loss.lock().unwrap().to_string(); + for writer in [&mut file_writer, &mut offsets_writer] { + writer.add_schema_metadata( + SHUFFLE_NUM_PARTITIONS_METADATA_KEY, + num_partitions_str.clone(), + ); + writer.add_schema_metadata(SHUFFLE_NUM_BATCHES_METADATA_KEY, num_batches_str.clone()); + writer.add_schema_metadata( + SHUFFLE_PARTITION_COUNTS_METADATA_KEY, + partition_counts_json.clone(), + ); + writer.add_schema_metadata(SHUFFLE_TOTAL_LOSS_METADATA_KEY, total_loss_str.clone()); + } + // Finish files file_writer.finish().await?; offsets_writer.finish().await?; - let num_batches = num_batches.load(std::sync::atomic::Ordering::Relaxed); - - let total_loss_val = *total_loss.lock().unwrap(); + let num_batches = num_batches_str + .parse::() + .expect("num_batches string was produced from u64"); + let total_loss_val = total_loss_str + .parse::() + .expect("total_loss string was produced from f64"); TwoFileShuffleReader::try_new( self.object_store.clone(), @@ -558,6 +590,46 @@ pub struct TwoFileShuffleReader { } impl TwoFileShuffleReader { + pub async fn try_open_existing( + object_store: Arc, + output_dir: Path, + data_file: impl AsRef, + offsets_file: impl AsRef, + ) -> Result> { + let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); + let scheduler = ScanScheduler::new(object_store, scheduler_config); + + let file_reader = FileReader::try_open( + scheduler + .open_file( + &output_dir.child(data_file.as_ref()), + &CachedFileSize::unknown(), + ) + .await?, + None, + Arc::::default(), + &LanceCache::no_cache(), + FileReaderOptions::default(), + ) + .await?; + + let offsets_reader = FileReader::try_open( + scheduler + .open_file( + &output_dir.child(offsets_file.as_ref()), + &CachedFileSize::unknown(), + ) + .await?, + None, + Arc::::default(), + &LanceCache::no_cache(), + FileReaderOptions::default(), + ) + .await?; + + Self::from_existing_readers(scheduler, file_reader, offsets_reader) + } + async fn try_new( object_store: Arc, output_dir: Path, @@ -573,7 +645,7 @@ impl TwoFileShuffleReader { let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); let scheduler = ScanScheduler::new(object_store, scheduler_config); - let data_path = output_dir.child("shuffle_data.lance"); + let data_path = output_dir.child(SHUFFLE_DATA_FILE_NAME); let file_reader = FileReader::try_open( scheduler .open_file(&data_path, &CachedFileSize::unknown()) @@ -585,7 +657,7 @@ impl TwoFileShuffleReader { ) .await?; - let offsets_path = output_dir.child("shuffle_offsets.lance"); + let offsets_path = output_dir.child(SHUFFLE_OFFSETS_FILE_NAME); let offsets_reader = FileReader::try_open( scheduler .open_file(&offsets_path, &CachedFileSize::unknown()) @@ -608,6 +680,87 @@ impl TwoFileShuffleReader { })) } + fn from_existing_readers( + scheduler: Arc, + file_reader: FileReader, + offsets_reader: FileReader, + ) -> Result> { + let metadata: &HashMap = &offsets_reader.schema().metadata; + + let num_partitions = metadata + .get(SHUFFLE_NUM_PARTITIONS_METADATA_KEY) + .ok_or_else(|| { + Error::invalid_input(format!( + "Missing required metadata key {SHUFFLE_NUM_PARTITIONS_METADATA_KEY} in precomputed V3 shuffle offsets file" + )) + })? + .parse::() + .map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_NUM_PARTITIONS_METADATA_KEY}: {e}" + )) + })?; + let num_batches = metadata + .get(SHUFFLE_NUM_BATCHES_METADATA_KEY) + .ok_or_else(|| { + Error::invalid_input(format!( + "Missing required metadata key {SHUFFLE_NUM_BATCHES_METADATA_KEY} in precomputed V3 shuffle offsets file" + )) + })? + .parse::() + .map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_NUM_BATCHES_METADATA_KEY}: {e}" + )) + })?; + let partition_counts = serde_json::from_str::>( + metadata + .get(SHUFFLE_PARTITION_COUNTS_METADATA_KEY) + .ok_or_else(|| { + Error::invalid_input(format!( + "Missing required metadata key {SHUFFLE_PARTITION_COUNTS_METADATA_KEY} in precomputed V3 shuffle offsets file" + )) + })?, + ) + .map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_PARTITION_COUNTS_METADATA_KEY}: {e}" + )) + })?; + if partition_counts.len() != num_partitions { + return Err(Error::invalid_input(format!( + "Precomputed V3 shuffle partition count length {} does not match num_partitions {}", + partition_counts.len(), + num_partitions + ))); + } + let total_loss = metadata + .get(SHUFFLE_TOTAL_LOSS_METADATA_KEY) + .map(|value| { + value.parse::().map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_TOTAL_LOSS_METADATA_KEY}: {e}" + )) + }) + }) + .transpose()? + .unwrap_or(0.0); + + if num_batches == 0 { + return Ok(Box::new(EmptyReader)); + } + + Ok(Box::new(Self { + _scheduler: scheduler, + file_reader, + offsets_reader, + num_partitions, + num_batches, + partition_counts, + total_loss, + })) + } + async fn partition_ranges(&self, partition_id: usize) -> Result>> { let mut positions = Vec::with_capacity(self.num_batches as usize * 2); for batch_idx in 0..self.num_batches { @@ -844,6 +997,42 @@ mod tests { assert!((loss - 4.25).abs() < 1e-10, "expected 4.25, got {}", loss); } + #[tokio::test] + async fn test_two_file_shuffler_reopen_existing_files() { + let dir = TempStrDir::default(); + let output_dir = Path::from(dir.as_ref()); + let num_partitions = 3; + + let batch1 = make_batch(&[0, 1, 2], &[10, 20, 30], Some(1.5)); + let batch2 = make_batch(&[2, 0, 1, 0], &[40, 50, 60, 70], Some(2.0)); + + let shuffler = TwoFileShuffler::new(output_dir.clone(), num_partitions); + let stream = batches_to_stream(vec![batch1, batch2]); + let _ = shuffler.shuffle(stream).await.unwrap(); + + let reopened = TwoFileShuffleReader::try_open_existing( + Arc::new(ObjectStore::local()), + output_dir, + SHUFFLE_DATA_FILE_NAME, + SHUFFLE_OFFSETS_FILE_NAME, + ) + .await + .unwrap(); + + assert_eq!(reopened.partition_size(0).unwrap(), 3); + assert_eq!(reopened.partition_size(1).unwrap(), 2); + assert_eq!(reopened.partition_size(2).unwrap(), 2); + + let p0 = collect_partition(reopened.as_ref(), 0).await.unwrap(); + let vals: &Int32Array = p0.column_by_name("val").unwrap().as_primitive(); + let mut v: Vec = vals.iter().map(|x| x.unwrap()).collect(); + v.sort(); + assert_eq!(v, vec![10, 50, 70]); + + let loss = reopened.total_loss().unwrap(); + assert!((loss - 3.5).abs() < 1e-10, "expected 3.5, got {}", loss); + } + #[tokio::test] async fn test_two_file_shuffler_single_batch() { let dir = TempStrDir::default(); diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 24298cbba1..9a7001834d 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::future; +use std::path::Path as StdPath; use std::sync::Arc; use std::{collections::HashMap, pin::Pin}; @@ -44,7 +45,10 @@ use lance_index::vector::quantizer::{QuantizerMetadata, QuantizerStorage}; use lance_index::vector::shared::{SupportedIvfIndexType, write_unified_ivf_and_index_metadata}; use lance_index::vector::storage::STORAGE_METADATA_KEY; use lance_index::vector::transform::Flatten; -use lance_index::vector::v3::shuffler::{EmptyReader, IvfShufflerReader}; +use lance_index::vector::v3::shuffler::{ + EmptyReader, IvfShufflerReader, SHUFFLE_DATA_FILE_NAME, SHUFFLE_OFFSETS_FILE_NAME, + TwoFileShuffleReader, +}; use lance_index::vector::v3::subindex::SubIndexType; use lance_index::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN, PQ_CODE_COLUMN, VectorIndex}; use lance_index::vector::{PART_ID_FIELD, ivf::storage::IvfModel}; @@ -141,6 +145,43 @@ type BuildStream = Pin::Storage, S, f64)>>> + Send>>; impl IvfIndexBuilder { + async fn try_open_precomputed_v3_shuffle_reader( + &self, + root: &Path, + files: &[String], + ) -> Result>> { + if files.len() != 2 { + return Ok(None); + } + + let mut data_file = None; + let mut offsets_file = None; + for file in files { + let Some(file_name) = StdPath::new(file).file_name() else { + return Ok(None); + }; + match file_name.to_string_lossy().as_ref() { + SHUFFLE_DATA_FILE_NAME => data_file = Some(SHUFFLE_DATA_FILE_NAME), + SHUFFLE_OFFSETS_FILE_NAME => offsets_file = Some(SHUFFLE_OFFSETS_FILE_NAME), + _ => return Ok(None), + } + } + let (Some(data_file), Some(offsets_file)) = (data_file, offsets_file) else { + return Ok(None); + }; + + Ok(Some( + TwoFileShuffleReader::try_open_existing( + Arc::new(ObjectStore::local()), + root.clone(), + data_file, + offsets_file, + ) + .await? + .into(), + )) + } + #[allow(clippy::too_many_arguments)] pub fn new( dataset: Dataset, @@ -528,13 +569,30 @@ impl IvfIndexBuilder .as_ref() .and_then(|p| p.precomputed_shuffle_buffers.as_ref()) { - Some((uri, _)) => { + Some((uri, files)) => { + if let Some(reader) = self + .try_open_precomputed_v3_shuffle_reader(uri, files) + .await? + { + log::info!("shuffle with precomputed V3 shuffle files from {}", uri); + self.shuffle_reader = Some(reader); + return Ok(()); + } + let uri = to_local_path(uri); - // the uri points to data directory, - // so need to trim the "data" suffix for reading the dataset - let uri = uri.trim_end_matches("data"); + let uri = if StdPath::new(&uri) + .file_name() + .is_some_and(|name| name == "data") + { + StdPath::new(&uri) + .parent() + .map(|path| path.to_string_lossy().to_string()) + .unwrap_or(uri) + } else { + uri + }; log::info!("shuffle with precomputed shuffle buffers from {}", uri); - let ds = Dataset::open(uri).await?; + let ds = Dataset::open(&uri).await?; ds.scan().try_into_stream().await? } _ => {