Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,63 @@ mod tests {
.unwrap()
}

async fn write_skewed_fragmented_vector_dataset(uri: &str, dimension: i32) -> Dataset {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimension,
),
false,
),
]));
let first_fragment_rows = 20_000;
let second_fragment_rows = 100;
let first_values = vec![0.0f32; first_fragment_rows * dimension as usize];
let second_values = vec![100.0f32; second_fragment_rows * dimension as usize];
let first_vectors =
FixedSizeListArray::try_new_from_values(Float32Array::from(first_values), dimension)
.unwrap();
let second_vectors =
FixedSizeListArray::try_new_from_values(Float32Array::from(second_values), dimension)
.unwrap();
let batches = vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..first_fragment_rows as i32)),
Arc::new(first_vectors),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(
first_fragment_rows as i32
..(first_fragment_rows + second_fragment_rows) as i32,
)),
Arc::new(second_vectors),
],
)
.unwrap(),
];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
Dataset::write(
reader,
uri,
Some(WriteParams {
max_rows_per_group: 1024,
max_rows_per_file: first_fragment_rows,
..Default::default()
}),
)
.await
.unwrap()
}

async fn create_segmented_vector_index(
dataset: &mut Dataset,
index_name: &str,
Expand Down Expand Up @@ -2435,6 +2492,77 @@ mod tests {
);
}

#[tokio::test]
async fn test_segmented_optimize_rebalances_only_one_segment() {
const DIMENSION: i32 = 8;

let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let mut dataset = write_skewed_fragmented_vector_dataset(test_uri, DIMENSION).await;
create_segmented_vector_index(&mut dataset, "vector_idx", "vector", DIMENSION).await;

let before_segments = dataset.load_indices_by_name("vector_idx").await.unwrap();
assert_eq!(before_segments.len(), 2);
let before_by_fragment = before_segments
.iter()
.map(|metadata| {
let fragments = metadata
.fragment_bitmap
.as_ref()
.unwrap()
.iter()
.collect::<Vec<_>>();
(fragments, metadata.uuid)
})
.collect::<HashMap<_, _>>();
assert_eq!(before_by_fragment.len(), 2);

dataset
.optimize_indices(&OptimizeOptions::default())
.await
.unwrap();

let after_segments = dataset.load_indices_by_name("vector_idx").await.unwrap();
assert_eq!(after_segments.len(), 2);
let after_by_fragment = after_segments
.iter()
.map(|metadata| {
let fragments = metadata
.fragment_bitmap
.as_ref()
.unwrap()
.iter()
.collect::<Vec<_>>();
(fragments, metadata.uuid)
})
.collect::<HashMap<_, _>>();
assert_eq!(after_by_fragment.len(), 2);

assert_ne!(
before_by_fragment[&vec![0]],
after_by_fragment[&vec![0]],
"expected optimize to replace the oversized segment"
);
assert_eq!(
before_by_fragment[&vec![1]],
after_by_fragment[&vec![1]],
"expected optimize to leave the smaller segment untouched"
);

let logical_index = dataset
.open_logical_vector_index("vector", "vector_idx")
.await
.unwrap();
let partitions_per_segment = logical_index
.as_ivf()
.unwrap()
.num_partitions_per_segment()
.into_iter()
.collect::<HashMap<_, _>>();
assert_eq!(partitions_per_segment[&after_by_fragment[&vec![0]]], 3);
assert_eq!(partitions_per_segment[&after_by_fragment[&vec![1]]], 2);
}

#[tokio::test]
async fn test_recreate_index() {
const DIM: i32 = 8;
Expand Down
186 changes: 135 additions & 51 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use uuid::Uuid;

use super::DatasetIndexInternalExt;
use super::vector::LogicalVectorIndex;
use super::vector::ivf::optimize_vector_indices;
use super::vector::ivf::{optimize_vector_indices, select_segment_for_single_rebalance};
use crate::dataset::Dataset;
use crate::dataset::index::LanceIndexStoreExt;
use crate::dataset::rowids::load_row_id_sequences;
Expand Down Expand Up @@ -108,7 +108,13 @@ pub async fn merge_indices<'a>(
};

let unindexed = dataset.unindexed_fragments(&old_indices[0].name).await?;
merge_indices_with_unindexed_frags(dataset, old_indices, &unindexed, options).await
Box::pin(merge_indices_with_unindexed_frags(
dataset,
old_indices,
&unindexed,
options,
))
.await
}

/// Merge a list of provided unindexed data, with a specific number of previous indices
Expand Down Expand Up @@ -145,12 +151,12 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
}
}

let mut frag_bitmap = RoaringBitmap::new();
let mut base_unindexed_bitmap = RoaringBitmap::new();
unindexed.iter().for_each(|frag| {
frag_bitmap.insert(frag.id as u32);
base_unindexed_bitmap.insert(frag.id as u32);
});

let (new_uuid, indices_merged, created_index) = if first_is_vector_index {
let (new_uuid, removed_indices, new_fragment_bitmap, created_index) = if first_is_vector_index {
let full_logical_index = dataset
.open_logical_vector_index(&field_path, &old_indices[0].name)
.await?;
Expand Down Expand Up @@ -180,50 +186,130 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
)?;
let ivf_view = logical_index.as_ivf()?;

let new_data_stream = if unindexed.is_empty() {
None
} else {
let mut scanner = dataset.scan();
scanner
.with_fragments(unindexed.to_vec())
.with_row_id()
.project(&[&field_path])?;
if column.nullable {
let column_expr = lance_datafusion::logical_expr::field_path_to_expr(&field_path)?;
scanner.filter_expr(column_expr.is_not_null());
let use_single_segment_rebalance = logical_index.num_segments() > 1
&& options.num_indices_to_merge.is_none()
&& !options.retrain
&& unindexed.is_empty();

if use_single_segment_rebalance {
let Some(selected_segment_id) = select_segment_for_single_rebalance(&ivf_view)? else {
return Ok(None);
};
let removed_segment = old_indices
.iter()
.copied()
.find(|metadata| metadata.uuid == selected_segment_id)
.ok_or_else(|| {
Error::index(format!(
"Append index: logical vector index '{}' does not contain selected segment {}",
old_indices[0].name, selected_segment_id
))
})?;
let (selected_metadata, selected_index) = logical_index
.iter()
.find(|(metadata, _)| metadata.uuid == selected_segment_id)
.map(|(metadata, index)| (metadata.clone(), index.clone()))
.ok_or_else(|| {
Error::index(format!(
"Append index: failed to materialize selected segment {} from logical vector index '{}'",
selected_segment_id, old_indices[0].name
))
})?;
let selected_logical_index = LogicalVectorIndex::try_new(
old_indices[0].name.clone(),
field_path.clone(),
vec![(selected_metadata, selected_index)],
)?;
let selected_ivf_view = selected_logical_index.as_ivf()?;
let (new_uuid, indices_merged) = Box::pin(optimize_vector_indices(
dataset.as_ref().clone(),
Option::<
lance_io::stream::RecordBatchStreamAdapter<
futures::stream::Empty<lance_core::Result<arrow_array::RecordBatch>>,
>,
>::None,
&field_path,
&selected_ivf_view,
options,
))
.await?;
if indices_merged == 0 {
return Ok(None);
}
Some(scanner.try_into_stream().await?)
};

let (new_uuid, indices_merged) = optimize_vector_indices(
dataset.as_ref().clone(),
new_data_stream,
&field_path,
&ivf_view,
options,
)
.boxed()
.await?;
let index_dir = dataset.indices_dir().child(new_uuid.to_string());
let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?;
let new_fragment_bitmap = removed_segment
.effective_fragment_bitmap(&dataset.fragment_bitmap)
.or_else(|| removed_segment.fragment_bitmap.clone())
.unwrap_or_default();

Ok((
new_uuid,
vec![removed_segment],
new_fragment_bitmap,
CreatedIndex {
index_details: vector_index_details(),
index_version: lance_index::IndexType::Vector.version() as u32,
files: Some(files),
},
))
} else {
let mut frag_bitmap = base_unindexed_bitmap.clone();

let new_data_stream = if unindexed.is_empty() {
None
} else {
let mut scanner = dataset.scan();
scanner
.with_fragments(unindexed.to_vec())
.with_row_id()
.project(&[&field_path])?;
if column.nullable {
let column_expr =
lance_datafusion::logical_expr::field_path_to_expr(&field_path)?;
scanner.filter_expr(column_expr.is_not_null());
}
Some(scanner.try_into_stream().await?)
};

let (new_uuid, indices_merged) = optimize_vector_indices(
dataset.as_ref().clone(),
new_data_stream,
&field_path,
&ivf_view,
options,
)
.boxed()
.await?;

old_indices[old_indices.len() - indices_merged..]
.iter()
.for_each(|idx| {
let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec();
removed_indices.iter().for_each(|idx| {
frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter());
});
for removed in removed_indices.iter() {
if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap)
{
frag_bitmap |= &effective;
}
}

let index_dir = dataset.indices_dir().child(new_uuid.to_string());
let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?;

Ok((
new_uuid,
indices_merged,
CreatedIndex {
index_details: vector_index_details(),
index_version: lance_index::IndexType::Vector.version() as u32,
files: Some(files),
},
))
let index_dir = dataset.indices_dir().child(new_uuid.to_string());
let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?;

Ok((
new_uuid,
removed_indices,
frag_bitmap,
CreatedIndex {
index_details: vector_index_details(),
index_version: lance_index::IndexType::Vector.version() as u32,
files: Some(files),
},
))
}
} else {
let mut frag_bitmap = base_unindexed_bitmap;
let mut indices = Vec::with_capacity(old_indices.len());
for idx in old_indices {
match dataset
Expand Down Expand Up @@ -341,7 +427,12 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
};

// TODO: don't hard-code index version
Ok((new_uuid, 1, created_index))
Ok((
new_uuid,
vec![old_indices[old_indices.len() - 1]],
frag_bitmap,
created_index,
))
}
_ => Err(Error::index(format!(
"Append index: invalid index type: {:?}",
Expand All @@ -350,17 +441,10 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
}
}?;

let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec();
for removed in removed_indices.iter() {
if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap) {
frag_bitmap |= &effective;
}
}

Ok(Some(IndexMergeResults {
new_uuid,
removed_indices,
new_fragment_bitmap: frag_bitmap,
new_fragment_bitmap,
new_index_version: created_index.index_version as i32,
new_index_details: created_index.index_details,
files: created_index.files,
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ impl<'a> LogicalIvfView<'a> {
self.logical_index.iter().map(|(_, index)| index)
}

pub(crate) fn segments(
&self,
) -> impl ExactSizeIterator<Item = (&IndexMetadata, &Arc<dyn VectorIndex>)> + '_ {
self.logical_index.iter()
}

/// Returns the partition count for each segment in this IVF index.
pub fn num_partitions_per_segment(&self) -> Vec<(Uuid, usize)> {
self.logical_index
Expand Down
Loading
Loading