Skip to content
41 changes: 41 additions & 0 deletions protos/ann_ivf.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

syntax = "proto3";

package lance.datafusion;

import "table_identifier.proto";

// Serialized vector query parameters.
message VectorQueryProto {
// Query vector as Arrow IPC bytes (supports Float16, Float32, Float64, UInt8, etc.)
bytes key_arrow_ipc = 1;
string column = 2;
uint32 k = 3;
optional float lower_bound = 4;
optional float upper_bound = 5;
uint32 minimum_nprobes = 6;
optional uint32 maximum_nprobes = 7;
optional uint32 ef = 8;
optional uint32 refine_factor = 9;
// Distance metric type as string ("l2", "cosine", "dot", "hamming").
// Absent means None (use the index's default metric).
optional string metric_type = 10;
bool use_index = 11;
float dist_q_c = 12;
}

message ANNIvfPartitionExecProto {
VectorQueryProto query = 1;
TableIdentifier table = 2;
repeated string index_uuids = 3;
}

message ANNIvfSubIndexExecProto {
VectorQueryProto query = 1;
TableIdentifier table = 2;
// Each element is a prost-encoded lance.table.IndexMetadata message.
// We use raw bytes to avoid a cross-package proto import.
repeated bytes indices = 3;
}
1 change: 1 addition & 0 deletions rust/lance-datafusion/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn main() -> Result<()> {
&[
"./protos/table_identifier.proto",
"./protos/filtered_read.proto",
"./protos/ann_ivf.proto",
],
&["./protos"],
)?;
Expand Down
9 changes: 8 additions & 1 deletion rust/lance/src/io/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,26 @@ pub mod filtered_read;
pub mod filtered_read_proto;
pub mod fts;
pub(crate) mod knn;
#[cfg(feature = "substrait")]
pub mod knn_proto;
mod optimizer;
mod projection;
mod pushdown_scan;
mod rowids;
pub mod scalar_index;
mod scan;
#[cfg(feature = "substrait")]
pub mod table_identifier;
mod take;
#[cfg(test)]
pub mod testing;
pub mod utils;

pub use filter::LanceFilterExec;
pub use knn::{ANNIvfPartitionExec, ANNIvfSubIndexExec, KNNVectorDistanceExec};
pub use knn::{
ANNIvfPartitionExec, ANNIvfSubIndexExec, KNN_INDEX_SCHEMA, KNN_PARTITION_SCHEMA,
KNNVectorDistanceExec,
};
pub use lance_datafusion::planner::Planner;
pub use lance_index::scalar::expression::FilterPlan;
pub use optimizer::get_physical_optimizer;
Expand Down
63 changes: 3 additions & 60 deletions rust/lance/src/io/exec/filtered_read_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,73 +24,14 @@ use lance_core::utils::mask::RowAddrTreeMap;
use lance_core::{Error, Result};
use lance_datafusion::pb;
use lance_datafusion::substrait::{encode_substrait, parse_substrait, prune_schema_for_substrait};
use lance_io::object_store::StorageOptions;
use lance_table::format::Fragment;
use prost::Message;

use crate::Dataset;
use crate::dataset::builder::DatasetBuilder;

use super::filtered_read::{
FilteredReadExec, FilteredReadOptions, FilteredReadPlan, FilteredReadThreadingMode,
};

// =============================================================================
// TableIdentifier helpers (reusable by other execs)
// =============================================================================

/// Build a [`TableIdentifier`] from a [`Dataset`].
///
/// Default: lightweight mode (uri + version + etag only, no serialized manifest).
/// Includes the dataset's latest storage options (if any) so the remote executor
/// can open or cache the dataset with the correct storage configuration.
pub async fn table_identifier_from_dataset(dataset: &Dataset) -> Result<pb::TableIdentifier> {
Ok(pb::TableIdentifier {
uri: dataset.uri().to_string(),
version: dataset.manifest.version,
manifest_etag: dataset.manifest_location.e_tag.clone(),
serialized_manifest: None,
storage_options: dataset
.latest_storage_options()
.await?
.map(|StorageOptions(m)| m)
.unwrap_or_default(),
})
}

/// Build a [`TableIdentifier`] with serialized manifest bytes included.
///
/// Fast path: remote executor skips manifest read from storage.
pub async fn table_identifier_from_dataset_with_manifest(
dataset: &Dataset,
) -> Result<pb::TableIdentifier> {
let manifest_proto = lance_table::format::pb::Manifest::from(dataset.manifest.as_ref());
Ok(pb::TableIdentifier {
uri: dataset.uri().to_string(),
version: dataset.manifest.version,
manifest_etag: dataset.manifest_location.e_tag.clone(),
serialized_manifest: Some(manifest_proto.encode_to_vec()),
storage_options: dataset
.latest_storage_options()
.await?
.map(|StorageOptions(m)| m)
.unwrap_or_default(),
})
}

/// Open a dataset from a table identifier proto
pub async fn open_dataset_from_table_identifier(
table_id: &pb::TableIdentifier,
) -> Result<Arc<Dataset>> {
let mut builder = DatasetBuilder::from_uri(&table_id.uri).with_version(table_id.version);
if let Some(manifest_bytes) = &table_id.serialized_manifest {
builder = builder.with_serialized_manifest(manifest_bytes)?;
}
if !table_id.storage_options.is_empty() {
builder = builder.with_storage_options(table_id.storage_options.clone());
}
Ok(Arc::new(builder.load().await?))
}
use super::table_identifier::{open_dataset_from_table_identifier, table_identifier_from_dataset};

// =============================================================================
// FilteredReadExec <-> Proto
Expand Down Expand Up @@ -558,9 +499,11 @@ mod tests {
use lance_core::datatypes::OnMissing;
use lance_core::utils::mask::RowAddrTreeMap;
use lance_datagen::{array, gen_batch};
use prost::Message;
use roaring::RoaringBitmap;
use std::collections::HashSet;

use crate::io::exec::table_identifier::table_identifier_from_dataset_with_manifest;
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};

#[test]
Expand Down
20 changes: 20 additions & 0 deletions rust/lance/src/io/exec/knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,26 @@ pub struct ANNIvfSubIndexExec {
}

impl ANNIvfSubIndexExec {
/// Returns a reference to the vector query.
pub fn query(&self) -> &Query {
&self.query
}

/// Returns a reference to the dataset.
pub fn dataset(&self) -> &Arc<Dataset> {
&self.dataset
}

/// Returns a reference to the index metadata.
pub fn indices(&self) -> &[IndexMetadata] {
&self.indices
}

/// Returns a reference to the prefilter source.
pub fn prefilter_source(&self) -> &PreFilterSource {
&self.prefilter_source
}

pub fn try_new(
input: Arc<dyn ExecutionPlan>,
dataset: Arc<Dataset>,
Expand Down
Loading
Loading