diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index d5d75aeb..5c07c173 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -35,6 +35,7 @@ storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3"] +lumina = ["libloading"] [dependencies] url = "2.5.2" @@ -73,6 +74,7 @@ md-5 = "0.10" regex = "1" uuid = { version = "1", features = ["v4"] } urlencoding = "2.1" +libloading = { version = "0.8", optional = true } [dev-dependencies] axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] } diff --git a/crates/paimon/src/globalindex/mod.rs b/crates/paimon/src/globalindex/mod.rs new file mode 100644 index 00000000..04d72215 --- /dev/null +++ b/crates/paimon/src/globalindex/mod.rs @@ -0,0 +1,405 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use roaring::RoaringTreemap; +use std::collections::BinaryHeap; +use std::sync::Arc; + +pub type ScoreGetter = Arc f32 + Send + Sync>; + +pub trait GlobalIndexResult: Send + Sync { + fn results(&self) -> &RoaringTreemap; + + fn offset(&self, start_offset: u64) -> Box { + if start_offset == 0 { + let bitmap = self.results().clone(); + return Box::new(LazyGlobalIndexResult::new_ready(bitmap)); + } + let bitmap = self.results(); + let mut offset_bitmap = RoaringTreemap::new(); + for row_id in bitmap.iter() { + offset_bitmap.insert(row_id + start_offset); + } + Box::new(LazyGlobalIndexResult::new_ready(offset_bitmap)) + } + + fn and(&self, other: &dyn GlobalIndexResult) -> Box { + let result = self.results() & other.results(); + Box::new(LazyGlobalIndexResult::new_ready(result)) + } + + fn or(&self, other: &dyn GlobalIndexResult) -> Box { + let result = self.results() | other.results(); + Box::new(LazyGlobalIndexResult::new_ready(result)) + } + + fn is_empty(&self) -> bool { + self.results().is_empty() + } +} + +pub struct LazyGlobalIndexResult { + bitmap: RoaringTreemap, +} + +impl LazyGlobalIndexResult { + pub fn new_ready(bitmap: RoaringTreemap) -> Self { + Self { bitmap } + } + + pub fn create_empty() -> Self { + Self::new_ready(RoaringTreemap::new()) + } +} + +impl GlobalIndexResult for LazyGlobalIndexResult { + fn results(&self) -> &RoaringTreemap { + &self.bitmap + } +} + +pub trait ScoredGlobalIndexResult: GlobalIndexResult { + fn score_getter(&self) -> &ScoreGetter; + + fn scored_offset(&self, offset: u64) -> Box { + if offset == 0 { + let bitmap = self.results().clone(); + let sg = self.clone_score_getter(); + return Box::new(SimpleScoredGlobalIndexResult::new(bitmap, sg)); + } + let bitmap = self.results(); + let mut offset_bitmap = RoaringTreemap::new(); + for row_id in bitmap.iter() { + offset_bitmap.insert(row_id + offset); + } + let sg = self.clone_score_getter(); + Box::new(SimpleScoredGlobalIndexResult::new( + offset_bitmap, + Arc::new(move |row_id| sg(row_id - offset)), + )) + } + + fn scored_or(&self, other: &dyn ScoredGlobalIndexResult) -> Box { + let this_row_ids = self.results().clone(); + let other_row_ids = other.results().clone(); + let result_or = &this_row_ids | &other_row_ids; + let this_sg = self.clone_score_getter(); + let other_sg = other.clone_score_getter(); + let this_ids = this_row_ids; + Box::new(SimpleScoredGlobalIndexResult::new( + result_or, + Arc::new(move |row_id| { + if this_ids.contains(row_id) { + this_sg(row_id) + } else { + other_sg(row_id) + } + }), + )) + } + + fn top_k(&self, k: usize) -> Box { + let row_ids = self.results(); + if row_ids.len() as usize <= k { + let bitmap = row_ids.clone(); + let sg = self.clone_score_getter(); + return Box::new(SimpleScoredGlobalIndexResult::new(bitmap, sg)); + } + + let score_getter_fn = self.score_getter(); + + #[derive(PartialEq)] + struct ScoredEntry { + row_id: u64, + score: f32, + } + impl Eq for ScoredEntry {} + impl PartialOrd for ScoredEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Ord for ScoredEntry { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other + .score + .partial_cmp(&self.score) + .unwrap_or(std::cmp::Ordering::Equal) + } + } + + let mut min_heap: BinaryHeap = BinaryHeap::with_capacity(k + 1); + for row_id in row_ids.iter() { + let score = score_getter_fn(row_id); + if min_heap.len() < k { + min_heap.push(ScoredEntry { row_id, score }); + } else if let Some(peek) = min_heap.peek() { + if score > peek.score { + min_heap.pop(); + min_heap.push(ScoredEntry { row_id, score }); + } + } + } + + let mut top_k_ids = RoaringTreemap::new(); + for entry in &min_heap { + top_k_ids.insert(entry.row_id); + } + + let sg = self.clone_score_getter(); + Box::new(SimpleScoredGlobalIndexResult::new(top_k_ids, sg)) + } + + fn clone_score_getter(&self) -> ScoreGetter; +} + +pub struct SimpleScoredGlobalIndexResult { + bitmap: RoaringTreemap, + score_getter: ScoreGetter, +} + +impl SimpleScoredGlobalIndexResult { + pub fn new(bitmap: RoaringTreemap, score_getter: ScoreGetter) -> Self { + Self { + bitmap, + score_getter, + } + } + + pub fn create_empty() -> Self { + Self { + bitmap: RoaringTreemap::new(), + score_getter: Arc::new(|_| 0.0), + } + } +} + +impl GlobalIndexResult for SimpleScoredGlobalIndexResult { + fn results(&self) -> &RoaringTreemap { + &self.bitmap + } +} + +impl ScoredGlobalIndexResult for SimpleScoredGlobalIndexResult { + fn score_getter(&self) -> &ScoreGetter { + &self.score_getter + } + + fn clone_score_getter(&self) -> ScoreGetter { + self.score_getter.clone() + } +} + +pub struct DictBasedScoredIndexResult { + id_to_scores: std::collections::HashMap, + bitmap: RoaringTreemap, + score_getter_fn: ScoreGetter, +} + +impl DictBasedScoredIndexResult { + pub fn new(id_to_scores: std::collections::HashMap) -> Self { + let mut bitmap = RoaringTreemap::new(); + for &row_id in id_to_scores.keys() { + bitmap.insert(row_id); + } + let map = id_to_scores.clone(); + let score_getter_fn: ScoreGetter = + Arc::new(move |row_id| map.get(&row_id).copied().unwrap_or(0.0)); + Self { + id_to_scores, + bitmap, + score_getter_fn, + } + } +} + +impl GlobalIndexResult for DictBasedScoredIndexResult { + fn results(&self) -> &RoaringTreemap { + &self.bitmap + } +} + +impl ScoredGlobalIndexResult for DictBasedScoredIndexResult { + fn score_getter(&self) -> &ScoreGetter { + &self.score_getter_fn + } + + fn clone_score_getter(&self) -> ScoreGetter { + let map = self.id_to_scores.clone(); + Arc::new(move |row_id| map.get(&row_id).copied().unwrap_or(0.0)) + } +} + +pub struct VectorSearch { + pub vector: Vec, + pub limit: usize, + pub field_name: String, + pub include_row_ids: Option, +} + +impl VectorSearch { + pub fn new(vector: Vec, limit: usize, field_name: String) -> crate::Result { + if vector.is_empty() { + return Err(crate::Error::DataInvalid { + message: "Search vector cannot be empty".to_string(), + source: None, + }); + } + if limit == 0 { + return Err(crate::Error::DataInvalid { + message: format!("Limit must be positive, got: {}", limit), + source: None, + }); + } + if field_name.is_empty() { + return Err(crate::Error::DataInvalid { + message: "Field name cannot be null or empty".to_string(), + source: None, + }); + } + Ok(Self { + vector, + limit, + field_name, + include_row_ids: None, + }) + } + + pub fn with_include_row_ids(mut self, include_row_ids: RoaringTreemap) -> Self { + self.include_row_ids = Some(include_row_ids); + self + } + + pub fn offset_range(&self, from: u64, to: u64) -> Self { + if let Some(ref include_row_ids) = self.include_row_ids { + let mut range_bitmap = RoaringTreemap::new(); + range_bitmap.insert_range(from..to); + let and_result = include_row_ids & &range_bitmap; + let mut offset_bitmap = RoaringTreemap::new(); + for row_id in and_result.iter() { + offset_bitmap.insert(row_id - from); + } + VectorSearch { + vector: self.vector.clone(), + limit: self.limit, + field_name: self.field_name.clone(), + include_row_ids: Some(offset_bitmap), + } + } else { + VectorSearch { + vector: self.vector.clone(), + limit: self.limit, + field_name: self.field_name.clone(), + include_row_ids: None, + } + } + } +} + +impl std::fmt::Display for VectorSearch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "VectorSearch(field_name={}, limit={})", + self.field_name, self.limit + ) + } +} + +pub struct GlobalIndexIOMeta { + pub file_path: String, + pub file_size: u64, + pub metadata: Vec, +} + +impl GlobalIndexIOMeta { + pub fn new(file_path: String, file_size: u64, metadata: Vec) -> Self { + Self { + file_path, + file_size, + metadata, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Aligned with Java VectorSearchTest.testVectorSearchOffset + #[test] + fn test_vector_search_offset_range() { + let mut bitmap = RoaringTreemap::new(); + bitmap.insert_range(100..200); + let vs = VectorSearch::new(vec![1.0, 2.0], 10, "vec".to_string()) + .unwrap() + .with_include_row_ids(bitmap); + + let result = vs.offset_range(60, 150); + let ids = result.include_row_ids.unwrap(); + // [100,200) & [60,150) = [100,150), offset by -60 => [40,90) + assert_eq!(ids.len(), 50); + assert!(ids.contains(40)); + assert!(ids.contains(89)); + assert!(!ids.contains(39)); + assert!(!ids.contains(90)); + } + + // Aligned with Java LuminaVectorGlobalIndexTest.testInvalidTopK + #[test] + fn test_invalid_top_k() { + assert!(VectorSearch::new(vec![1.0], 0, "f".to_string()).is_err()); + } + + #[test] + fn test_offset_range_no_filter() { + let vs = VectorSearch::new(vec![1.0], 5, "f".to_string()).unwrap(); + let result = vs.offset_range(100, 200); + assert!(result.include_row_ids.is_none()); + } + + // Scored results: top_k, scored_offset, scored_or — exercised indirectly in Java integration tests + fn make_dict(entries: Vec<(u64, f32)>) -> DictBasedScoredIndexResult { + DictBasedScoredIndexResult::new(entries.into_iter().collect()) + } + + #[test] + fn test_top_k_selects_highest() { + let r = make_dict(vec![(1, 0.1), (2, 0.9), (3, 0.5), (4, 0.8), (5, 0.3)]); + let top = r.top_k(2); + assert_eq!(top.results().len(), 2); + assert!(top.results().contains(2)); + assert!(top.results().contains(4)); + } + + #[test] + fn test_scored_offset_preserves_scores() { + let r = make_dict(vec![(1, 0.5), (2, 0.6)]); + let o = r.scored_offset(100); + assert!(o.results().contains(101)); + assert_eq!(o.score_getter()(101), 0.5); + assert_eq!(o.score_getter()(102), 0.6); + } + + #[test] + fn test_clone_score_getter() { + let r = make_dict(vec![(10, 1.5), (20, 2.5)]); + let cloned = r.clone_score_getter(); + assert_eq!(cloned(10), 1.5); + assert_eq!(cloned(20), 2.5); + } +} diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index fe340f3d..8ee00b6d 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -29,7 +29,10 @@ pub mod arrow; pub mod catalog; mod deletion_vector; pub mod file_index; +pub mod globalindex; pub mod io; +#[cfg(feature = "lumina")] +pub mod lumina; mod predicate_stats; pub mod spec; pub mod table; diff --git a/crates/paimon/src/lumina/ffi.rs b/crates/paimon/src/lumina/ffi.rs new file mode 100644 index 00000000..38d9a3b6 --- /dev/null +++ b/crates/paimon/src/lumina/ffi.rs @@ -0,0 +1,556 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use libloading::{Library, Symbol}; +use std::collections::HashMap; +use std::ffi::{c_char, c_float, c_int, c_void, CStr, CString}; +use std::io::{Read, Seek, SeekFrom}; +use std::sync::OnceLock; + +const ERR_BUF_SIZE: usize = 4096; + +static LIBRARY: OnceLock> = OnceLock::new(); + +fn load_library() -> crate::Result<&'static Library> { + let result = LIBRARY.get_or_init(|| { + let lib_path = std::env::var("LUMINA_LIB_PATH").unwrap_or_else(|_| { + if cfg!(target_os = "macos") { + "liblumina_py.dylib".to_string() + } else { + "liblumina_py.so".to_string() + } + }); + unsafe { + Library::new(&lib_path) + .map_err(|e| format!("Failed to load lumina library from '{}': {}", lib_path, e)) + } + }); + result.as_ref().map_err(|e| crate::Error::DataInvalid { + message: e.clone(), + source: None, + }) +} + +fn check_error(ret: c_int, err_buf: &[u8; ERR_BUF_SIZE]) -> crate::Result<()> { + if ret != 0 { + let c_str = unsafe { CStr::from_ptr(err_buf.as_ptr() as *const c_char) }; + let msg = c_str.to_string_lossy().to_string(); + return Err(crate::Error::DataInvalid { + message: format!("Lumina error: {}", msg), + source: None, + }); + } + Ok(()) +} + +fn options_to_json(options: &HashMap) -> crate::Result { + let json = serde_json::to_string(options).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to serialize options: {}", e), + source: None, + })?; + CString::new(json).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to create CString: {}", e), + source: None, + }) +} + +pub struct LuminaSearcher { + handle: *mut c_void, + _stream_ctx: Option>, +} + +unsafe impl Send for LuminaSearcher {} + +impl LuminaSearcher { + pub fn create(options: &HashMap) -> crate::Result { + let lib = load_library()?; + let opts_json = options_to_json(options)?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let handle: *mut c_void = unsafe { + let func: Symbol< + unsafe extern "C" fn(*const c_char, *mut c_char, c_int) -> *mut c_void, + > = lib + .get(b"lumina_searcher_create") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_searcher_create not found: {}", e), + source: None, + })?; + func( + opts_json.as_ptr(), + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + if handle.is_null() { + let c_str = unsafe { CStr::from_ptr(err_buf.as_ptr() as *const c_char) }; + let msg = c_str.to_string_lossy().to_string(); + return Err(crate::Error::DataInvalid { + message: format!("Failed to create Lumina searcher: {}", msg), + source: None, + }); + } + + Ok(Self { + handle, + _stream_ctx: None, + }) + } + + #[allow(clippy::type_complexity)] + pub fn open_stream( + &mut self, + stream: S, + _file_size: u64, + ) -> crate::Result<()> { + let lib = load_library()?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let ctx = Box::new(StreamContext::new(stream)); + let ctx_ptr = &*ctx as *const StreamContext as *mut c_void; + + let ret: c_int = unsafe { + let func: Symbol< + unsafe extern "C" fn( + *mut c_void, + *mut c_void, + unsafe extern "C" fn(*mut c_void, *mut c_char, u64) -> c_int, + unsafe extern "C" fn(*mut c_void, u64) -> c_int, + unsafe extern "C" fn(*mut c_void) -> u64, + unsafe extern "C" fn(*mut c_void) -> u64, + *mut c_char, + c_int, + ) -> c_int, + > = lib + .get(b"lumina_searcher_open_stream") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_searcher_open_stream not found: {}", e), + source: None, + })?; + func( + self.handle, + ctx_ptr, + stream_read_cb, + stream_seek_cb, + stream_tell_cb, + stream_length_cb, + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + check_error(ret, &err_buf)?; + self._stream_ctx = Some(ctx); + Ok(()) + } + + pub fn search( + &self, + query: &[f32], + n: i32, + k: i32, + distances: &mut [f32], + labels: &mut [u64], + options: &HashMap, + ) -> crate::Result<()> { + let lib = load_library()?; + let opts_json = options_to_json(options)?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let ret: c_int = unsafe { + let func: Symbol< + unsafe extern "C" fn( + *mut c_void, + *const c_float, + c_int, + c_int, + *mut c_float, + *mut u64, + *const c_char, + *mut c_char, + c_int, + ) -> c_int, + > = lib + .get(b"lumina_searcher_search") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_searcher_search not found: {}", e), + source: None, + })?; + func( + self.handle, + query.as_ptr(), + n, + k, + distances.as_mut_ptr(), + labels.as_mut_ptr(), + opts_json.as_ptr(), + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + check_error(ret, &err_buf) + } + + #[allow(clippy::too_many_arguments, clippy::type_complexity)] + pub fn search_with_filter( + &self, + query: &[f32], + n: i32, + k: i32, + distances: &mut [f32], + labels: &mut [u64], + filter_ids: &[u64], + options: &HashMap, + ) -> crate::Result<()> { + let lib = load_library()?; + let opts_json = options_to_json(options)?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let ret: c_int = unsafe { + let func: Symbol< + unsafe extern "C" fn( + *mut c_void, + *const c_float, + c_int, + c_int, + *mut c_float, + *mut u64, + *const u64, + u64, + *const c_char, + *mut c_char, + c_int, + ) -> c_int, + > = lib + .get(b"lumina_searcher_search_with_filter") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_searcher_search_with_filter not found: {}", e), + source: None, + })?; + func( + self.handle, + query.as_ptr(), + n, + k, + distances.as_mut_ptr(), + labels.as_mut_ptr(), + filter_ids.as_ptr(), + filter_ids.len() as u64, + opts_json.as_ptr(), + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + check_error(ret, &err_buf) + } + + pub fn get_count(&self) -> crate::Result { + let lib = load_library()?; + unsafe { + let func: Symbol u64> = lib + .get(b"lumina_searcher_get_count") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_searcher_get_count not found: {}", e), + source: None, + })?; + Ok(func(self.handle)) + } + } + + pub fn get_dimension(&self) -> crate::Result { + let lib = load_library()?; + unsafe { + let func: Symbol u32> = lib + .get(b"lumina_searcher_get_dimension") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_searcher_get_dimension not found: {}", e), + source: None, + })?; + Ok(func(self.handle)) + } + } +} + +impl Drop for LuminaSearcher { + fn drop(&mut self) { + if !self.handle.is_null() { + if let Ok(lib) = load_library() { + unsafe { + if let Ok(func) = + lib.get::(b"lumina_searcher_destroy") + { + func(self.handle); + } + } + } + self.handle = std::ptr::null_mut(); + } + } +} + +pub struct LuminaBuilder { + handle: *mut c_void, +} + +unsafe impl Send for LuminaBuilder {} + +impl LuminaBuilder { + pub fn create(options: &HashMap) -> crate::Result { + let lib = load_library()?; + let opts_json = options_to_json(options)?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let handle: *mut c_void = unsafe { + let func: Symbol< + unsafe extern "C" fn(*const c_char, *mut c_char, c_int) -> *mut c_void, + > = lib + .get(b"lumina_builder_create") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_builder_create not found: {}", e), + source: None, + })?; + func( + opts_json.as_ptr(), + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + if handle.is_null() { + let c_str = unsafe { CStr::from_ptr(err_buf.as_ptr() as *const c_char) }; + let msg = c_str.to_string_lossy().to_string(); + return Err(crate::Error::DataInvalid { + message: format!("Failed to create Lumina builder: {}", msg), + source: None, + }); + } + + Ok(Self { handle }) + } + + pub fn pretrain(&self, vectors: &[f32], n: i32, dim: i32) -> crate::Result<()> { + let lib = load_library()?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let ret: c_int = unsafe { + let func: Symbol< + unsafe extern "C" fn( + *mut c_void, + *const c_float, + c_int, + c_int, + *mut c_char, + c_int, + ) -> c_int, + > = lib + .get(b"lumina_builder_pretrain") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_builder_pretrain not found: {}", e), + source: None, + })?; + func( + self.handle, + vectors.as_ptr(), + n, + dim, + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + check_error(ret, &err_buf) + } + + pub fn insert(&self, vectors: &[f32], ids: &[u64], n: i32, dim: i32) -> crate::Result<()> { + let lib = load_library()?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let ret: c_int = unsafe { + let func: Symbol< + unsafe extern "C" fn( + *mut c_void, + *const c_float, + *const u64, + c_int, + c_int, + *mut c_char, + c_int, + ) -> c_int, + > = lib + .get(b"lumina_builder_insert") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_builder_insert not found: {}", e), + source: None, + })?; + func( + self.handle, + vectors.as_ptr(), + ids.as_ptr(), + n, + dim, + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + check_error(ret, &err_buf) + } + + pub fn dump(&self, path: &str) -> crate::Result<()> { + let lib = load_library()?; + let c_path = + CString::new(path).map_err(|e| crate::Error::DataInvalid { + message: format!("Invalid path: {}", e), + source: None, + })?; + let mut err_buf = [0u8; ERR_BUF_SIZE]; + + let ret: c_int = unsafe { + let func: Symbol< + unsafe extern "C" fn(*mut c_void, *const c_char, *mut c_char, c_int) -> c_int, + > = lib + .get(b"lumina_builder_dump") + .map_err(|e| crate::Error::DataInvalid { + message: format!("Symbol lumina_builder_dump not found: {}", e), + source: None, + })?; + func( + self.handle, + c_path.as_ptr(), + err_buf.as_mut_ptr() as *mut c_char, + ERR_BUF_SIZE as c_int, + ) + }; + + check_error(ret, &err_buf) + } +} + +impl Drop for LuminaBuilder { + fn drop(&mut self) { + if !self.handle.is_null() { + if let Ok(lib) = load_library() { + unsafe { + if let Ok(func) = + lib.get::(b"lumina_builder_destroy") + { + func(self.handle); + } + } + } + self.handle = std::ptr::null_mut(); + } + } +} + +struct StreamContext { + inner: std::sync::Mutex>, +} + +trait ReadSeekLen: Read + Seek { + fn length(&self) -> u64; +} + +struct ReadSeekLenImpl { + stream: S, + len: u64, +} + +impl ReadSeekLenImpl { + fn new(mut stream: S) -> Self { + let len = stream.seek(SeekFrom::End(0)).unwrap_or(0); + let _ = stream.seek(SeekFrom::Start(0)); + Self { stream, len } + } +} + +impl Read for ReadSeekLenImpl { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.stream.read(buf) + } +} + +impl Seek for ReadSeekLenImpl { + fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + self.stream.seek(pos) + } +} + +impl ReadSeekLen for ReadSeekLenImpl { + fn length(&self) -> u64 { + self.len + } +} + +impl StreamContext { + fn new(stream: S) -> Self { + Self { + inner: std::sync::Mutex::new(Box::new(ReadSeekLenImpl::new(stream))), + } + } +} + +unsafe extern "C" fn stream_read_cb(ctx: *mut c_void, buf: *mut c_char, size: u64) -> c_int { + let ctx = &*(ctx as *const StreamContext); + let mut guard = match ctx.inner.lock() { + Ok(g) => g, + Err(_) => return -1, + }; + let slice = std::slice::from_raw_parts_mut(buf as *mut u8, size as usize); + let mut total_read = 0usize; + while total_read < size as usize { + match guard.read(&mut slice[total_read..]) { + Ok(0) => break, + Ok(n) => total_read += n, + Err(_) => return -1, + } + } + total_read as c_int +} + +unsafe extern "C" fn stream_seek_cb(ctx: *mut c_void, position: u64) -> c_int { + let ctx = &*(ctx as *const StreamContext); + let mut guard = match ctx.inner.lock() { + Ok(g) => g, + Err(_) => return -1, + }; + match guard.seek(SeekFrom::Start(position)) { + Ok(_) => 0, + Err(_) => -1, + } +} + +unsafe extern "C" fn stream_tell_cb(ctx: *mut c_void) -> u64 { + let ctx = &*(ctx as *const StreamContext); + let mut guard = match ctx.inner.lock() { + Ok(g) => g, + Err(_) => return 0, + }; + guard.seek(SeekFrom::Current(0)).unwrap_or(0) +} + +unsafe extern "C" fn stream_length_cb(ctx: *mut c_void) -> u64 { + let ctx = &*(ctx as *const StreamContext); + let guard = match ctx.inner.lock() { + Ok(g) => g, + Err(_) => return 0, + }; + guard.length() +} diff --git a/crates/paimon/src/lumina/mod.rs b/crates/paimon/src/lumina/mod.rs new file mode 100644 index 00000000..13b3f214 --- /dev/null +++ b/crates/paimon/src/lumina/mod.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod ffi; +pub mod reader; + +use std::collections::HashMap; + +pub const LUMINA_VECTOR_ANN_IDENTIFIER: &str = "lumina-vector-ann"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LuminaVectorMetric { + L2, + Cosine, + InnerProduct, +} + +impl LuminaVectorMetric { + pub fn lumina_name(&self) -> &str { + match self { + LuminaVectorMetric::L2 => "l2", + LuminaVectorMetric::Cosine => "cosine", + LuminaVectorMetric::InnerProduct => "inner_product", + } + } + + pub fn from_string(name: &str) -> crate::Result { + match name.to_uppercase().as_str() { + "L2" => Ok(LuminaVectorMetric::L2), + "COSINE" => Ok(LuminaVectorMetric::Cosine), + "INNER_PRODUCT" => Ok(LuminaVectorMetric::InnerProduct), + _ => Err(crate::Error::DataInvalid { + message: format!("Unknown metric name: {}", name), + source: None, + }), + } + } + + pub fn from_lumina_name(lumina_name: &str) -> crate::Result { + match lumina_name { + "l2" => Ok(LuminaVectorMetric::L2), + "cosine" => Ok(LuminaVectorMetric::Cosine), + "inner_product" => Ok(LuminaVectorMetric::InnerProduct), + _ => Err(crate::Error::DataInvalid { + message: format!("Unknown lumina metric name: {}", lumina_name), + source: None, + }), + } + } +} + +const LUMINA_PREFIX: &str = "lumina."; + +const SEARCH_OPTIONS_DEFAULTS: &[(&str, &str)] = &[ + ("lumina.diskann.search.beam_width", "4"), + ("lumina.search.parallel_number", "5"), +]; + +pub struct LuminaVectorIndexOptions { + pub dimension: i32, + pub metric: LuminaVectorMetric, + pub index_type: String, + lumina_options: HashMap, +} + +impl LuminaVectorIndexOptions { + pub fn new(paimon_options: &HashMap) -> crate::Result { + let dimension_str = paimon_options + .get("lumina.index.dimension") + .map(|s| s.as_str()) + .unwrap_or("128"); + let dimension: i32 = dimension_str + .parse() + .map_err(|_| crate::Error::DataInvalid { + message: format!("Invalid dimension: {}", dimension_str), + source: None, + })?; + if dimension <= 0 { + return Err(crate::Error::DataInvalid { + message: format!( + "Invalid value for 'lumina.index.dimension': {}. Must be a positive integer.", + dimension + ), + source: None, + }); + } + + let metric_str = paimon_options + .get("lumina.distance.metric") + .map(|s| s.as_str()) + .unwrap_or("inner_product"); + let metric = LuminaVectorMetric::from_lumina_name(metric_str) + .or_else(|_| LuminaVectorMetric::from_string(metric_str))?; + + let index_type = paimon_options + .get("lumina.index.type") + .cloned() + .unwrap_or_else(|| "diskann".to_string()); + + let lumina_options = strip_lumina_options(paimon_options); + + Ok(Self { + dimension, + metric, + index_type, + lumina_options, + }) + } + + pub fn to_lumina_options(&self) -> HashMap { + self.lumina_options.clone() + } +} + +pub fn strip_lumina_options(paimon_options: &HashMap) -> HashMap { + let mut result = HashMap::new(); + + for &(paimon_key, default_value) in SEARCH_OPTIONS_DEFAULTS { + let native_key = &paimon_key[LUMINA_PREFIX.len()..]; + result.insert(native_key.to_string(), default_value.to_string()); + } + + for (key, value) in paimon_options { + if let Some(native_key) = key.strip_prefix(LUMINA_PREFIX) { + result.insert(native_key.to_string(), value.to_string()); + } + } + + result +} + +pub const KEY_DIMENSION: &str = "index.dimension"; +pub const KEY_DISTANCE_METRIC: &str = "distance.metric"; +pub const KEY_INDEX_TYPE: &str = "index.type"; + +pub struct LuminaIndexMeta { + options: HashMap, +} + +impl LuminaIndexMeta { + pub fn new(options: HashMap) -> Self { + Self { options } + } + + pub fn options(&self) -> &HashMap { + &self.options + } + + pub fn dim(&self) -> crate::Result { + let val = self + .options + .get(KEY_DIMENSION) + .ok_or_else(|| crate::Error::DataInvalid { + message: format!("Missing required key: {}", KEY_DIMENSION), + source: None, + })?; + val.parse::().map_err(|_| crate::Error::DataInvalid { + message: format!("Invalid dimension value: {}", val), + source: None, + }) + } + + pub fn distance_metric(&self) -> &str { + self.options + .get(KEY_DISTANCE_METRIC) + .map(String::as_str) + .unwrap_or("") + } + + pub fn metric(&self) -> crate::Result { + LuminaVectorMetric::from_lumina_name(self.distance_metric()) + } + + pub fn index_type(&self) -> &str { + self.options + .get(KEY_INDEX_TYPE) + .map(String::as_str) + .unwrap_or("diskann") + } + + pub fn serialize(&self) -> crate::Result> { + serde_json::to_vec(&self.options).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to serialize LuminaIndexMeta: {}", e), + source: None, + }) + } + + pub fn deserialize(data: &[u8]) -> crate::Result { + let options: HashMap = + serde_json::from_slice(data).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to deserialize LuminaIndexMeta: {}", e), + source: None, + })?; + if !options.contains_key(KEY_DIMENSION) { + return Err(crate::Error::DataInvalid { + message: format!( + "Missing required key in Lumina index metadata: {}", + KEY_DIMENSION + ), + source: None, + }); + } + if !options.contains_key(KEY_DISTANCE_METRIC) { + return Err(crate::Error::DataInvalid { + message: format!( + "Missing required key in Lumina index metadata: {}", + KEY_DISTANCE_METRIC + ), + source: None, + }); + } + Ok(Self { options }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Aligned with Java: metric conversions tested indirectly in testDifferentMetrics + #[test] + fn test_metric_roundtrip() { + for metric in [ + LuminaVectorMetric::L2, + LuminaVectorMetric::Cosine, + LuminaVectorMetric::InnerProduct, + ] { + let name = metric.lumina_name(); + assert_eq!(LuminaVectorMetric::from_lumina_name(name).unwrap(), metric); + assert_eq!( + LuminaVectorMetric::from_string(&name.to_uppercase()).unwrap(), + metric + ); + } + assert!(LuminaVectorMetric::from_string("hamming").is_err()); + } + + // Aligned with Java: testReaderMetaOptionsOverrideDefaultOptions + #[test] + fn test_index_meta_serialize_deserialize() { + let mut options = HashMap::new(); + options.insert(KEY_DIMENSION.to_string(), "128".to_string()); + options.insert(KEY_DISTANCE_METRIC.to_string(), "l2".to_string()); + options.insert(KEY_INDEX_TYPE.to_string(), "diskann".to_string()); + let meta = LuminaIndexMeta::new(options); + + let bytes = meta.serialize().unwrap(); + let meta2 = LuminaIndexMeta::deserialize(&bytes).unwrap(); + assert_eq!(meta2.dim().unwrap(), 128); + assert_eq!(meta2.distance_metric(), "l2"); + assert_eq!(meta2.index_type(), "diskann"); + } + + #[test] + fn test_index_meta_deserialize_missing_fields() { + // missing dimension + let mut opts = HashMap::new(); + opts.insert(KEY_DISTANCE_METRIC.to_string(), "l2".to_string()); + assert!(LuminaIndexMeta::deserialize(&serde_json::to_vec(&opts).unwrap()).is_err()); + + // missing metric + let mut opts = HashMap::new(); + opts.insert(KEY_DIMENSION.to_string(), "128".to_string()); + assert!(LuminaIndexMeta::deserialize(&serde_json::to_vec(&opts).unwrap()).is_err()); + + // invalid json + assert!(LuminaIndexMeta::deserialize(b"not json").is_err()); + } + + // Aligned with Java: testDimensionMismatch (dim validation) + #[test] + fn test_dim_error_on_invalid() { + let mut opts = HashMap::new(); + opts.insert(KEY_DIMENSION.to_string(), "abc".to_string()); + opts.insert(KEY_DISTANCE_METRIC.to_string(), "l2".to_string()); + assert!(LuminaIndexMeta::new(opts).dim().is_err()); + } + + // Aligned with Java: testPQWithCosineRejected (options validation) + #[test] + fn test_index_options_invalid_dimension() { + let mut opts = HashMap::new(); + opts.insert("lumina.index.dimension".to_string(), "-1".to_string()); + assert!(LuminaVectorIndexOptions::new(&opts).is_err()); + } + + #[test] + fn test_strip_lumina_options() { + let mut opts = HashMap::new(); + opts.insert("lumina.index.dimension".to_string(), "128".to_string()); + opts.insert("lumina.diskann.search.beam_width".to_string(), "8".to_string()); + opts.insert("non_lumina_key".to_string(), "ignored".to_string()); + let result = strip_lumina_options(&opts); + assert_eq!(result.get("index.dimension").unwrap(), "128"); + assert_eq!(result.get("diskann.search.beam_width").unwrap(), "8"); // overrides default + assert!(!result.contains_key("non_lumina_key")); + } +} diff --git a/crates/paimon/src/lumina/reader.rs b/crates/paimon/src/lumina/reader.rs new file mode 100644 index 00000000..103b5404 --- /dev/null +++ b/crates/paimon/src/lumina/reader.rs @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::globalindex::{ + DictBasedScoredIndexResult, GlobalIndexIOMeta, ScoredGlobalIndexResult, VectorSearch, +}; +use crate::lumina::ffi::LuminaSearcher; +use crate::lumina::{strip_lumina_options, LuminaIndexMeta, LuminaVectorMetric}; +use std::collections::HashMap; +use std::io::{Read, Seek}; + +const MIN_SEARCH_LIST_SIZE: usize = 16; +const SENTINEL: u64 = 0xFFFFFFFFFFFFFFFF; + +fn ensure_search_list_size(search_options: &mut HashMap, top_k: usize) { + if !search_options.contains_key("diskann.search.list_size") { + let list_size = std::cmp::max((top_k as f64 * 1.5) as usize, MIN_SEARCH_LIST_SIZE); + search_options.insert( + "diskann.search.list_size".to_string(), + list_size.to_string(), + ); + } +} + +fn convert_distance_to_score(distance: f32, metric: LuminaVectorMetric) -> f32 { + match metric { + LuminaVectorMetric::L2 => 1.0 / (1.0 + distance), + LuminaVectorMetric::Cosine => 1.0 - distance, + LuminaVectorMetric::InnerProduct => distance, + } +} + +fn filter_search_options(options: &HashMap) -> HashMap { + options + .iter() + .filter(|(k, _)| k.starts_with("search.") || k.starts_with("diskann.search.")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + +pub struct LuminaVectorGlobalIndexReader { + io_meta: GlobalIndexIOMeta, + options: HashMap, + searcher: Option, + index_meta: Option, + search_options: Option>, +} + +impl LuminaVectorGlobalIndexReader { + pub fn new( + io_metas: Vec, + options: HashMap, + ) -> crate::Result { + if io_metas.len() != 1 { + return Err(crate::Error::DataInvalid { + message: "Expected exactly one index file per shard".to_string(), + source: None, + }); + } + let mut metas = io_metas; + let io_meta = metas.remove(0); + Ok(Self { + io_meta, + options, + searcher: None, + index_meta: None, + search_options: None, + }) + } + + pub fn visit_vector_search( + &mut self, + vector_search: &VectorSearch, + stream_fn: impl FnOnce(&str) -> crate::Result, + ) -> crate::Result>> { + self.ensure_loaded(stream_fn)?; + self.search(vector_search) + } + + fn search( + &self, + vector_search: &VectorSearch, + ) -> crate::Result>> { + let index_meta = self.index_meta.as_ref().unwrap(); + let searcher = self.searcher.as_ref().unwrap(); + let search_options_base = self.search_options.as_ref().unwrap(); + + let expected_dim = index_meta.dim()? as usize; + if vector_search.vector.len() != expected_dim { + return Err(crate::Error::DataInvalid { + message: format!( + "Query vector dimension mismatch: index expects {}, but got {}", + expected_dim, + vector_search.vector.len() + ), + source: None, + }); + } + + let limit = vector_search.limit; + let index_metric = index_meta.metric()?; + let count = searcher.get_count()? as usize; + let effective_k = std::cmp::min(limit, count); + if effective_k == 0 { + return Ok(None); + } + + let include_row_ids = &vector_search.include_row_ids; + + let (distances, labels) = if let Some(ref include_ids) = include_row_ids { + let filter_id_list: Vec = include_ids.iter().collect(); + if filter_id_list.is_empty() { + return Ok(None); + } + let ek = std::cmp::min(effective_k, filter_id_list.len()); + let mut distances = vec![0.0f32; ek]; + let mut labels = vec![0u64; ek]; + let mut search_opts: HashMap = search_options_base.clone(); + search_opts.insert("search.thread_safe_filter".to_string(), "true".to_string()); + ensure_search_list_size(&mut search_opts, ek); + let filtered_opts = filter_search_options(&search_opts); + searcher.search_with_filter( + &vector_search.vector, + 1, + ek as i32, + &mut distances, + &mut labels, + &filter_id_list, + &filtered_opts, + )?; + (distances, labels) + } else { + let mut distances = vec![0.0f32; effective_k]; + let mut labels = vec![0u64; effective_k]; + let mut search_opts: HashMap = search_options_base.clone(); + ensure_search_list_size(&mut search_opts, effective_k); + let filtered_opts = filter_search_options(&search_opts); + searcher.search( + &vector_search.vector, + 1, + effective_k as i32, + &mut distances, + &mut labels, + &filtered_opts, + )?; + (distances, labels) + }; + + let mut id_to_scores: HashMap = HashMap::new(); + for i in 0..labels.len() { + let row_id = labels[i]; + if row_id == SENTINEL { + continue; + } + let score = convert_distance_to_score(distances[i], index_metric); + id_to_scores.insert(row_id, score); + } + + Ok(Some(Box::new(DictBasedScoredIndexResult::new( + id_to_scores, + )))) + } + + fn ensure_loaded( + &mut self, + stream_fn: impl FnOnce(&str) -> crate::Result, + ) -> crate::Result<()> { + if self.searcher.is_some() { + return Ok(()); + } + + let index_meta = LuminaIndexMeta::deserialize(&self.io_meta.metadata)?; + + let mut searcher_options = strip_lumina_options(&self.options); + for (k, v) in index_meta.options().iter() { + searcher_options.insert(k.to_string(), v.to_string()); + } + + let searcher_opts_map: HashMap = searcher_options.into_iter().collect(); + let mut searcher = LuminaSearcher::create(&searcher_opts_map)?; + + let stream = stream_fn(&self.io_meta.file_path)?; + searcher.open_stream(stream, self.io_meta.file_size)?; + + self.search_options = Some(searcher_opts_map); + self.index_meta = Some(index_meta); + self.searcher = Some(searcher); + Ok(()) + } + + pub fn close(&mut self) { + self.searcher = None; + self.index_meta = None; + self.search_options = None; + } +} + +impl Drop for LuminaVectorGlobalIndexReader { + fn drop(&mut self) { + self.close(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::globalindex::GlobalIndexIOMeta; + + // Aligned with Java: testDifferentMetrics — score conversion per metric + #[test] + fn test_convert_distance_to_score() { + assert_eq!(convert_distance_to_score(0.0, LuminaVectorMetric::L2), 1.0); + assert_eq!(convert_distance_to_score(1.0, LuminaVectorMetric::L2), 0.5); + assert_eq!(convert_distance_to_score(0.0, LuminaVectorMetric::Cosine), 1.0); + assert_eq!(convert_distance_to_score(1.0, LuminaVectorMetric::Cosine), 0.0); + assert_eq!(convert_distance_to_score(0.75, LuminaVectorMetric::InnerProduct), 0.75); + } + + #[test] + fn test_ensure_search_list_size() { + let mut opts = HashMap::new(); + ensure_search_list_size(&mut opts, 10); + assert_eq!(opts.get("diskann.search.list_size").unwrap(), "16"); // max(15, 16) + + let mut opts = HashMap::new(); + ensure_search_list_size(&mut opts, 100); + assert_eq!(opts.get("diskann.search.list_size").unwrap(), "150"); // 100*1.5 + + // does not override existing + let mut opts = HashMap::new(); + opts.insert("diskann.search.list_size".to_string(), "999".to_string()); + ensure_search_list_size(&mut opts, 100); + assert_eq!(opts.get("diskann.search.list_size").unwrap(), "999"); + } + + #[test] + fn test_filter_search_options() { + let mut opts = HashMap::new(); + opts.insert("search.beam_width".to_string(), "4".to_string()); + opts.insert("diskann.search.list_size".to_string(), "16".to_string()); + opts.insert("index.dimension".to_string(), "128".to_string()); + let filtered = filter_search_options(&opts); + assert_eq!(filtered.len(), 2); + assert!(!filtered.contains_key("index.dimension")); + } + + #[test] + fn test_reader_requires_exactly_one_meta() { + assert!(LuminaVectorGlobalIndexReader::new(vec![], HashMap::new()).is_err()); + let m1 = GlobalIndexIOMeta::new("a".into(), 100, vec![]); + let m2 = GlobalIndexIOMeta::new("b".into(), 200, vec![]); + assert!(LuminaVectorGlobalIndexReader::new(vec![m1, m2], HashMap::new()).is_err()); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 481c757b..a36dff25 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -26,6 +26,8 @@ mod source; mod stats_filter; mod table_scan; mod tag_manager; +#[cfg(feature = "lumina")] +pub mod vector_search_builder; use crate::Result; use arrow_array::RecordBatch; diff --git a/crates/paimon/src/table/vector_search_builder.rs b/crates/paimon/src/table/vector_search_builder.rs new file mode 100644 index 00000000..1221013c --- /dev/null +++ b/crates/paimon/src/table/vector_search_builder.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::globalindex::{ + GlobalIndexIOMeta, ScoredGlobalIndexResult, SimpleScoredGlobalIndexResult, VectorSearch, +}; +use crate::lumina::reader::LuminaVectorGlobalIndexReader; +use crate::lumina::LUMINA_VECTOR_ANN_IDENTIFIER; +use std::collections::HashMap; + +pub struct VectorSearchBuilder { + options: HashMap, + limit: usize, + vector_column: Option, + query_vector: Option>, +} + +impl VectorSearchBuilder { + pub fn new(options: HashMap) -> Self { + Self { + options, + limit: 0, + vector_column: None, + query_vector: None, + } + } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + pub fn with_vector_column(mut self, name: String) -> Self { + self.vector_column = Some(name); + self + } + + pub fn with_query_vector(mut self, vector: Vec) -> Self { + self.query_vector = Some(vector); + self + } + + pub fn execute_local( + &self, + io_metas: Vec, + index_type: &str, + stream_fn: F, + ) -> crate::Result> + where + F: FnOnce(&str) -> crate::Result, + S: std::io::Read + std::io::Seek + Send + 'static, + { + if self.limit == 0 { + return Err(crate::Error::DataInvalid { + message: "Limit must be positive, set via with_limit()".to_string(), + source: None, + }); + } + let vector_column = + self.vector_column + .as_ref() + .ok_or_else(|| crate::Error::DataInvalid { + message: "Vector column must be set via with_vector_column()".to_string(), + source: None, + })?; + let query_vector = self + .query_vector + .as_ref() + .ok_or_else(|| crate::Error::DataInvalid { + message: "Query vector must be set via with_query_vector()".to_string(), + source: None, + })?; + + let vector_search = + VectorSearch::new(query_vector.clone(), self.limit, vector_column.clone())?; + + if index_type != LUMINA_VECTOR_ANN_IDENTIFIER { + return Err(crate::Error::Unsupported { + message: format!("Unsupported vector index type: '{}'", index_type), + }); + } + + let mut reader = LuminaVectorGlobalIndexReader::new(io_metas, self.options.clone())?; + let result = reader.visit_vector_search(&vector_search, stream_fn)?; + match result { + Some(r) => Ok(r), + None => Ok(Box::new(SimpleScoredGlobalIndexResult::create_empty())), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dummy_stream_fn(_path: &str) -> crate::Result>> { + Ok(std::io::Cursor::new(vec![])) + } + + #[test] + fn execute_local_missing_limit() { + let builder = VectorSearchBuilder::new(HashMap::new()) + .with_vector_column("emb".to_string()) + .with_query_vector(vec![1.0]); + let result = builder.execute_local(vec![], "any", dummy_stream_fn); + assert!(result.is_err()); + } + + #[test] + fn execute_local_missing_vector_column() { + let builder = VectorSearchBuilder::new(HashMap::new()) + .with_limit(10) + .with_query_vector(vec![1.0]); + let result = builder.execute_local(vec![], "any", dummy_stream_fn); + assert!(result.is_err()); + } + + #[test] + fn execute_local_missing_query_vector() { + let builder = VectorSearchBuilder::new(HashMap::new()) + .with_limit(10) + .with_vector_column("emb".to_string()); + let result = builder.execute_local(vec![], "any", dummy_stream_fn); + assert!(result.is_err()); + } + + #[test] + fn execute_local_unsupported_index_type() { + let builder = VectorSearchBuilder::new(HashMap::new()) + .with_limit(10) + .with_vector_column("emb".to_string()) + .with_query_vector(vec![1.0, 2.0]); + let result = builder.execute_local(vec![], "unknown-index", dummy_stream_fn); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/tests/lumina_vector_index_test.rs b/crates/paimon/tests/lumina_vector_index_test.rs new file mode 100644 index 00000000..d930feac --- /dev/null +++ b/crates/paimon/tests/lumina_vector_index_test.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for Lumina vector index, aligned with +//! pypaimon/tests/lumina_vector_index_test.py + +#[cfg(feature = "lumina")] +mod lumina_tests { + use paimon::globalindex::{GlobalIndexIOMeta, VectorSearch}; + use paimon::lumina::ffi::LuminaBuilder; + use paimon::lumina::reader::LuminaVectorGlobalIndexReader; + use paimon::lumina::LuminaIndexMeta; + use roaring::RoaringTreemap; + use std::collections::HashMap; + use std::fs; + + const DIM: i32 = 4; + const N: i32 = 100; + + fn build_options() -> HashMap { + [ + ("index.dimension", DIM.to_string()), + ("index.type", "diskann".to_string()), + ("distance.metric", "l2".to_string()), + ("encoding.type", "rawf32".to_string()), + ("diskann.build.ef_construction", "64".to_string()), + ("diskann.build.neighbor_count", "32".to_string()), + ("diskann.build.thread_count", "2".to_string()), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect() + } + + fn generate_vectors(n: usize, dim: usize, seed: u64) -> Vec { + let mut vectors = Vec::with_capacity(n * dim); + let mut val = seed as f32; + for _ in 0..(n * dim) { + // Simple deterministic pseudo-random + val = (val * 1.1 + 0.3) % 100.0; + vectors.push(val); + } + vectors + } + + fn build_index(index_path: &str, seed: u64) -> HashMap { + let opts = build_options(); + let vectors = generate_vectors(N as usize, DIM as usize, seed); + let ids: Vec = (0..N as u64).collect(); + + let builder = LuminaBuilder::create(&opts).expect("Failed to create builder"); + builder + .pretrain(&vectors, N, DIM) + .expect("Failed to pretrain"); + builder + .insert(&vectors, &ids, N, DIM) + .expect("Failed to insert"); + builder.dump(index_path).expect("Failed to dump"); + + opts + } + + fn make_index_meta(opts: &HashMap) -> Vec { + let meta = LuminaIndexMeta::new(opts.clone()); + meta.serialize().expect("Failed to serialize meta") + } + + /// Aligned with Python test_build_and_read + #[test] + fn test_build_and_read() { + let dir = tempfile::tempdir().expect("Failed to create temp dir"); + let index_path = dir.path().join("index.lmi"); + let index_path_str = index_path.to_str().unwrap(); + + let opts = build_index(index_path_str, 42); + let file_size = fs::metadata(index_path_str).unwrap().len(); + let meta_bytes = make_index_meta(&opts); + + let io_meta = GlobalIndexIOMeta::new(index_path_str.to_string(), file_size, meta_bytes); + let mut reader = + LuminaVectorGlobalIndexReader::new(vec![io_meta], HashMap::new()).unwrap(); + + // Query with the first vector + let vectors = generate_vectors(N as usize, DIM as usize, 42); + let query: Vec = vectors[..DIM as usize].to_vec(); + let vs = VectorSearch::new(query, 5, "vec".to_string()).unwrap(); + + let result = reader + .visit_vector_search(&vs, |path| { + let file = fs::File::open(path).map_err(|e| paimon::Error::DataInvalid { + message: format!("Failed to open index file: {}", e), + source: None, + })?; + Ok(std::io::BufReader::new(file)) + }) + .expect("Search failed"); + + let result = result.expect("Expected non-empty result"); + assert!(result.results().len() > 0, "Expected results"); + assert!(result.results().contains(0), "Expected row 0 (self-match)"); + let score = result.score_getter()(0); + assert!(score.is_finite(), "Score should be finite"); + } + + /// Aligned with Python test_filtered_search + #[test] + fn test_filtered_search() { + let dir = tempfile::tempdir().expect("Failed to create temp dir"); + let index_path = dir.path().join("index.lmi"); + let index_path_str = index_path.to_str().unwrap(); + + let opts = build_index(index_path_str, 99); + let file_size = fs::metadata(index_path_str).unwrap().len(); + let meta_bytes = make_index_meta(&opts); + + let io_meta = GlobalIndexIOMeta::new(index_path_str.to_string(), file_size, meta_bytes); + let mut reader = + LuminaVectorGlobalIndexReader::new(vec![io_meta], HashMap::new()).unwrap(); + + // Only include even row IDs + let mut include_ids = RoaringTreemap::new(); + for i in (0..N as u64).step_by(2) { + include_ids.insert(i); + } + + let vectors = generate_vectors(N as usize, DIM as usize, 99); + let query: Vec = vectors[..DIM as usize].to_vec(); + let vs = VectorSearch::new(query, 3, "vec".to_string()) + .unwrap() + .with_include_row_ids(include_ids); + + let result = reader + .visit_vector_search(&vs, |path| { + let file = fs::File::open(path).map_err(|e| paimon::Error::DataInvalid { + message: format!("Failed to open index file: {}", e), + source: None, + })?; + Ok(std::io::BufReader::new(file)) + }) + .expect("Search failed"); + + let result = result.expect("Expected non-empty result"); + for row_id in result.results().iter() { + assert_eq!(row_id % 2, 0, "Expected only even row IDs, got {}", row_id); + } + } +}