diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index c604e260b9..aa9d88b71c 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -2232,6 +2232,63 @@ mod tests { .unwrap() } + async fn write_skewed_fragmented_vector_dataset(uri: &str, dimension: i32) -> Dataset { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "vector", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dimension, + ), + false, + ), + ])); + let first_fragment_rows = 20_000; + let second_fragment_rows = 100; + let first_values = vec![0.0f32; first_fragment_rows * dimension as usize]; + let second_values = vec![100.0f32; second_fragment_rows * dimension as usize]; + let first_vectors = + FixedSizeListArray::try_new_from_values(Float32Array::from(first_values), dimension) + .unwrap(); + let second_vectors = + FixedSizeListArray::try_new_from_values(Float32Array::from(second_values), dimension) + .unwrap(); + let batches = vec![ + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..first_fragment_rows as i32)), + Arc::new(first_vectors), + ], + ) + .unwrap(), + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values( + first_fragment_rows as i32 + ..(first_fragment_rows + second_fragment_rows) as i32, + )), + Arc::new(second_vectors), + ], + ) + .unwrap(), + ]; + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + Dataset::write( + reader, + uri, + Some(WriteParams { + max_rows_per_group: 1024, + max_rows_per_file: first_fragment_rows, + ..Default::default() + }), + ) + .await + .unwrap() + } + async fn create_segmented_vector_index( dataset: &mut Dataset, index_name: &str, @@ -2435,6 +2492,77 @@ mod tests { ); } + #[tokio::test] + async fn test_segmented_optimize_rebalances_only_one_segment() { + const DIMENSION: i32 = 8; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let mut dataset = write_skewed_fragmented_vector_dataset(test_uri, DIMENSION).await; + create_segmented_vector_index(&mut dataset, "vector_idx", "vector", DIMENSION).await; + + let before_segments = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(before_segments.len(), 2); + let before_by_fragment = before_segments + .iter() + .map(|metadata| { + let fragments = metadata + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>(); + (fragments, metadata.uuid) + }) + .collect::>(); + assert_eq!(before_by_fragment.len(), 2); + + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + let after_segments = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(after_segments.len(), 2); + let after_by_fragment = after_segments + .iter() + .map(|metadata| { + let fragments = metadata + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>(); + (fragments, metadata.uuid) + }) + .collect::>(); + assert_eq!(after_by_fragment.len(), 2); + + assert_ne!( + before_by_fragment[&vec![0]], + after_by_fragment[&vec![0]], + "expected optimize to replace the oversized segment" + ); + assert_eq!( + before_by_fragment[&vec![1]], + after_by_fragment[&vec![1]], + "expected optimize to leave the smaller segment untouched" + ); + + let logical_index = dataset + .open_logical_vector_index("vector", "vector_idx") + .await + .unwrap(); + let partitions_per_segment = logical_index + .as_ivf() + .unwrap() + .num_partitions_per_segment() + .into_iter() + .collect::>(); + assert_eq!(partitions_per_segment[&after_by_fragment[&vec![0]]], 3); + assert_eq!(partitions_per_segment[&after_by_fragment[&vec![1]]], 2); + } + #[tokio::test] async fn test_recreate_index() { const DIM: i32 = 8; diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index b088be625f..b93e325d47 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use super::DatasetIndexInternalExt; use super::vector::LogicalVectorIndex; -use super::vector::ivf::optimize_vector_indices; +use super::vector::ivf::{optimize_vector_indices, select_segment_for_single_rebalance}; use crate::dataset::Dataset; use crate::dataset::index::LanceIndexStoreExt; use crate::dataset::rowids::load_row_id_sequences; @@ -108,7 +108,13 @@ pub async fn merge_indices<'a>( }; let unindexed = dataset.unindexed_fragments(&old_indices[0].name).await?; - merge_indices_with_unindexed_frags(dataset, old_indices, &unindexed, options).await + Box::pin(merge_indices_with_unindexed_frags( + dataset, + old_indices, + &unindexed, + options, + )) + .await } /// Merge a list of provided unindexed data, with a specific number of previous indices @@ -145,12 +151,12 @@ pub async fn merge_indices_with_unindexed_frags<'a>( } } - let mut frag_bitmap = RoaringBitmap::new(); + let mut base_unindexed_bitmap = RoaringBitmap::new(); unindexed.iter().for_each(|frag| { - frag_bitmap.insert(frag.id as u32); + base_unindexed_bitmap.insert(frag.id as u32); }); - let (new_uuid, indices_merged, created_index) = if first_is_vector_index { + let (new_uuid, removed_indices, new_fragment_bitmap, created_index) = if first_is_vector_index { let full_logical_index = dataset .open_logical_vector_index(&field_path, &old_indices[0].name) .await?; @@ -180,50 +186,130 @@ pub async fn merge_indices_with_unindexed_frags<'a>( )?; let ivf_view = logical_index.as_ivf()?; - let new_data_stream = if unindexed.is_empty() { - None - } else { - let mut scanner = dataset.scan(); - scanner - .with_fragments(unindexed.to_vec()) - .with_row_id() - .project(&[&field_path])?; - if column.nullable { - let column_expr = lance_datafusion::logical_expr::field_path_to_expr(&field_path)?; - scanner.filter_expr(column_expr.is_not_null()); + let use_single_segment_rebalance = logical_index.num_segments() > 1 + && options.num_indices_to_merge.is_none() + && !options.retrain + && unindexed.is_empty(); + + if use_single_segment_rebalance { + let Some(selected_segment_id) = select_segment_for_single_rebalance(&ivf_view)? else { + return Ok(None); + }; + let removed_segment = old_indices + .iter() + .copied() + .find(|metadata| metadata.uuid == selected_segment_id) + .ok_or_else(|| { + Error::index(format!( + "Append index: logical vector index '{}' does not contain selected segment {}", + old_indices[0].name, selected_segment_id + )) + })?; + let (selected_metadata, selected_index) = logical_index + .iter() + .find(|(metadata, _)| metadata.uuid == selected_segment_id) + .map(|(metadata, index)| (metadata.clone(), index.clone())) + .ok_or_else(|| { + Error::index(format!( + "Append index: failed to materialize selected segment {} from logical vector index '{}'", + selected_segment_id, old_indices[0].name + )) + })?; + let selected_logical_index = LogicalVectorIndex::try_new( + old_indices[0].name.clone(), + field_path.clone(), + vec![(selected_metadata, selected_index)], + )?; + let selected_ivf_view = selected_logical_index.as_ivf()?; + let (new_uuid, indices_merged) = Box::pin(optimize_vector_indices( + dataset.as_ref().clone(), + Option::< + lance_io::stream::RecordBatchStreamAdapter< + futures::stream::Empty>, + >, + >::None, + &field_path, + &selected_ivf_view, + options, + )) + .await?; + if indices_merged == 0 { + return Ok(None); } - Some(scanner.try_into_stream().await?) - }; - let (new_uuid, indices_merged) = optimize_vector_indices( - dataset.as_ref().clone(), - new_data_stream, - &field_path, - &ivf_view, - options, - ) - .boxed() - .await?; + let index_dir = dataset.indices_dir().child(new_uuid.to_string()); + let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; + let new_fragment_bitmap = removed_segment + .effective_fragment_bitmap(&dataset.fragment_bitmap) + .or_else(|| removed_segment.fragment_bitmap.clone()) + .unwrap_or_default(); + + Ok(( + new_uuid, + vec![removed_segment], + new_fragment_bitmap, + CreatedIndex { + index_details: vector_index_details(), + index_version: lance_index::IndexType::Vector.version() as u32, + files: Some(files), + }, + )) + } else { + let mut frag_bitmap = base_unindexed_bitmap.clone(); + + let new_data_stream = if unindexed.is_empty() { + None + } else { + let mut scanner = dataset.scan(); + scanner + .with_fragments(unindexed.to_vec()) + .with_row_id() + .project(&[&field_path])?; + if column.nullable { + let column_expr = + lance_datafusion::logical_expr::field_path_to_expr(&field_path)?; + scanner.filter_expr(column_expr.is_not_null()); + } + Some(scanner.try_into_stream().await?) + }; + + let (new_uuid, indices_merged) = optimize_vector_indices( + dataset.as_ref().clone(), + new_data_stream, + &field_path, + &ivf_view, + options, + ) + .boxed() + .await?; - old_indices[old_indices.len() - indices_merged..] - .iter() - .for_each(|idx| { + let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec(); + removed_indices.iter().for_each(|idx| { frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); }); + for removed in removed_indices.iter() { + if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap) + { + frag_bitmap |= &effective; + } + } - let index_dir = dataset.indices_dir().child(new_uuid.to_string()); - let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; - - Ok(( - new_uuid, - indices_merged, - CreatedIndex { - index_details: vector_index_details(), - index_version: lance_index::IndexType::Vector.version() as u32, - files: Some(files), - }, - )) + let index_dir = dataset.indices_dir().child(new_uuid.to_string()); + let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; + + Ok(( + new_uuid, + removed_indices, + frag_bitmap, + CreatedIndex { + index_details: vector_index_details(), + index_version: lance_index::IndexType::Vector.version() as u32, + files: Some(files), + }, + )) + } } else { + let mut frag_bitmap = base_unindexed_bitmap; let mut indices = Vec::with_capacity(old_indices.len()); for idx in old_indices { match dataset @@ -341,7 +427,12 @@ pub async fn merge_indices_with_unindexed_frags<'a>( }; // TODO: don't hard-code index version - Ok((new_uuid, 1, created_index)) + Ok(( + new_uuid, + vec![old_indices[old_indices.len() - 1]], + frag_bitmap, + created_index, + )) } _ => Err(Error::index(format!( "Append index: invalid index type: {:?}", @@ -350,17 +441,10 @@ pub async fn merge_indices_with_unindexed_frags<'a>( } }?; - let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec(); - for removed in removed_indices.iter() { - if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap) { - frag_bitmap |= &effective; - } - } - Ok(Some(IndexMergeResults { new_uuid, removed_indices, - new_fragment_bitmap: frag_bitmap, + new_fragment_bitmap, new_index_version: created_index.index_version as i32, new_index_details: created_index.index_details, files: created_index.files, diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 2d3594524c..887ae04084 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -157,6 +157,12 @@ impl<'a> LogicalIvfView<'a> { self.logical_index.iter().map(|(_, index)| index) } + pub(crate) fn segments( + &self, + ) -> impl ExactSizeIterator)> + '_ { + self.logical_index.iter() + } + /// Returns the partition count for each segment in this IVF index. pub fn num_partitions_per_segment(&self) -> Vec<(Uuid, usize)> { self.logical_index diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 1a4f375db2..1d388236ad 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -8,7 +8,10 @@ use super::{ pq::{PQIndex, build_pq_model}, utils::{filter_finite_training_data, maybe_sample_training_data}, }; -use super::{builder::IvfIndexBuilder, utils::PartitionLoadLock}; +use super::{ + builder::{IvfIndexBuilder, index_type_string}, + utils::PartitionLoadLock, +}; use crate::dataset::index::dataset_format_version; use crate::index::DatasetIndexInternalExt; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; @@ -66,6 +69,7 @@ use lance_index::vector::v3::shuffler::create_ivf_shuffler; use lance_index::vector::v3::subindex::{IvfSubIndex, SubIndexType}; use lance_index::{ INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, Index, IndexMetadata, IndexType, + MAX_PARTITION_SIZE_FACTOR, MIN_PARTITION_SIZE_PERCENT, optimize::OptimizeOptions, vector::{ Query, VectorIndex, @@ -270,6 +274,95 @@ impl std::fmt::Debug for IVFIndex { } } +#[derive(Clone, Copy, Debug)] +struct SegmentRebalanceCandidate { + segment_id: Uuid, + score: usize, + created_at_ms: i64, +} + +fn candidate_is_better( + candidate: SegmentRebalanceCandidate, + current_best: Option, +) -> bool { + match current_best { + None => true, + Some(current_best) => { + candidate.score > current_best.score + || (candidate.score == current_best.score + && (candidate.created_at_ms, candidate.segment_id.as_bytes()) + < ( + current_best.created_at_ms, + current_best.segment_id.as_bytes(), + )) + } + } +} + +fn index_type_for_segmented_optimize(index: &dyn VectorIndex) -> Result { + let (sub_index_type, quantization_type) = index.sub_index_type(); + IndexType::try_from(index_type_string(sub_index_type, quantization_type).as_str()) +} + +pub(crate) fn select_segment_for_single_rebalance( + logical_index: &LogicalIvfView<'_>, +) -> Result> { + let mut best_split = None; + let mut best_join = None; + + for (metadata, index) in logical_index.segments() { + let index_type = index_type_for_segmented_optimize(index.as_ref())?; + let split_threshold = MAX_PARTITION_SIZE_FACTOR * index_type.target_partition_size(); + let join_threshold = MIN_PARTITION_SIZE_PERCENT * index_type.target_partition_size() / 100; + let num_partitions = index.ivf_model().num_partitions(); + if num_partitions == 0 { + continue; + } + + let mut split_partition_count = 0usize; + let mut join_partition_count = 0usize; + for partition_id in 0..num_partitions { + let partition_size = index.partition_size(partition_id); + if partition_size > split_threshold { + split_partition_count += 1; + } + if num_partitions > 1 && partition_size < join_threshold { + join_partition_count += 1; + } + } + + let created_at_ms = metadata + .created_at + .map(|dt| dt.timestamp_millis()) + .unwrap_or(i64::MIN); + + let split_candidate = (split_partition_count > 0).then_some(SegmentRebalanceCandidate { + segment_id: metadata.uuid, + score: split_partition_count, + created_at_ms, + }); + if let Some(candidate) = split_candidate + && candidate_is_better(candidate, best_split) + { + best_split = Some(candidate); + } + + let join_candidate = (join_partition_count > 0).then_some(SegmentRebalanceCandidate { + segment_id: metadata.uuid, + score: join_partition_count, + created_at_ms, + }); + if let Some(candidate) = join_candidate + && candidate_is_better(candidate, best_join) + { + best_join = Some(candidate); + } + } + + let selected = best_split.or(best_join); + Ok(selected.map(|candidate| candidate.segment_id)) +} + // TODO: move to `lance-index` crate. /// /// Returns (new_uuid, num_indices_merged)