Skip to content

Commit 745d07f

Browse files
committed
feat: optimize one segmented vector segment per run
1 parent fe78601 commit 745d07f

4 files changed

Lines changed: 363 additions & 50 deletions

File tree

rust/lance/src/index.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2232,6 +2232,63 @@ mod tests {
22322232
.unwrap()
22332233
}
22342234

2235+
async fn write_skewed_fragmented_vector_dataset(uri: &str, dimension: i32) -> Dataset {
2236+
let schema = Arc::new(Schema::new(vec![
2237+
Field::new("id", DataType::Int32, false),
2238+
Field::new(
2239+
"vector",
2240+
DataType::FixedSizeList(
2241+
Arc::new(Field::new("item", DataType::Float32, true)),
2242+
dimension,
2243+
),
2244+
false,
2245+
),
2246+
]));
2247+
let first_fragment_rows = 20_000;
2248+
let second_fragment_rows = 100;
2249+
let first_values = vec![0.0f32; first_fragment_rows * dimension as usize];
2250+
let second_values = vec![100.0f32; second_fragment_rows * dimension as usize];
2251+
let first_vectors =
2252+
FixedSizeListArray::try_new_from_values(Float32Array::from(first_values), dimension)
2253+
.unwrap();
2254+
let second_vectors =
2255+
FixedSizeListArray::try_new_from_values(Float32Array::from(second_values), dimension)
2256+
.unwrap();
2257+
let batches = vec![
2258+
RecordBatch::try_new(
2259+
schema.clone(),
2260+
vec![
2261+
Arc::new(Int32Array::from_iter_values(0..first_fragment_rows as i32)),
2262+
Arc::new(first_vectors),
2263+
],
2264+
)
2265+
.unwrap(),
2266+
RecordBatch::try_new(
2267+
schema.clone(),
2268+
vec![
2269+
Arc::new(Int32Array::from_iter_values(
2270+
first_fragment_rows as i32
2271+
..(first_fragment_rows + second_fragment_rows) as i32,
2272+
)),
2273+
Arc::new(second_vectors),
2274+
],
2275+
)
2276+
.unwrap(),
2277+
];
2278+
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
2279+
Dataset::write(
2280+
reader,
2281+
uri,
2282+
Some(WriteParams {
2283+
max_rows_per_group: 1024,
2284+
max_rows_per_file: first_fragment_rows,
2285+
..Default::default()
2286+
}),
2287+
)
2288+
.await
2289+
.unwrap()
2290+
}
2291+
22352292
async fn create_segmented_vector_index(
22362293
dataset: &mut Dataset,
22372294
index_name: &str,
@@ -2435,6 +2492,77 @@ mod tests {
24352492
);
24362493
}
24372494

2495+
#[tokio::test]
2496+
async fn test_segmented_optimize_rebalances_only_one_segment() {
2497+
const DIMENSION: i32 = 8;
2498+
2499+
let test_dir = tempfile::tempdir().unwrap();
2500+
let test_uri = test_dir.path().to_str().unwrap();
2501+
let mut dataset = write_skewed_fragmented_vector_dataset(test_uri, DIMENSION).await;
2502+
create_segmented_vector_index(&mut dataset, "vector_idx", "vector", DIMENSION).await;
2503+
2504+
let before_segments = dataset.load_indices_by_name("vector_idx").await.unwrap();
2505+
assert_eq!(before_segments.len(), 2);
2506+
let before_by_fragment = before_segments
2507+
.iter()
2508+
.map(|metadata| {
2509+
let fragments = metadata
2510+
.fragment_bitmap
2511+
.as_ref()
2512+
.unwrap()
2513+
.iter()
2514+
.collect::<Vec<_>>();
2515+
(fragments, metadata.uuid)
2516+
})
2517+
.collect::<HashMap<_, _>>();
2518+
assert_eq!(before_by_fragment.len(), 2);
2519+
2520+
dataset
2521+
.optimize_indices(&OptimizeOptions::default())
2522+
.await
2523+
.unwrap();
2524+
2525+
let after_segments = dataset.load_indices_by_name("vector_idx").await.unwrap();
2526+
assert_eq!(after_segments.len(), 2);
2527+
let after_by_fragment = after_segments
2528+
.iter()
2529+
.map(|metadata| {
2530+
let fragments = metadata
2531+
.fragment_bitmap
2532+
.as_ref()
2533+
.unwrap()
2534+
.iter()
2535+
.collect::<Vec<_>>();
2536+
(fragments, metadata.uuid)
2537+
})
2538+
.collect::<HashMap<_, _>>();
2539+
assert_eq!(after_by_fragment.len(), 2);
2540+
2541+
assert_ne!(
2542+
before_by_fragment[&vec![0]],
2543+
after_by_fragment[&vec![0]],
2544+
"expected optimize to replace the oversized segment"
2545+
);
2546+
assert_eq!(
2547+
before_by_fragment[&vec![1]],
2548+
after_by_fragment[&vec![1]],
2549+
"expected optimize to leave the smaller segment untouched"
2550+
);
2551+
2552+
let logical_index = dataset
2553+
.open_logical_vector_index("vector", "vector_idx")
2554+
.await
2555+
.unwrap();
2556+
let partitions_per_segment = logical_index
2557+
.as_ivf()
2558+
.unwrap()
2559+
.num_partitions_per_segment()
2560+
.into_iter()
2561+
.collect::<HashMap<_, _>>();
2562+
assert_eq!(partitions_per_segment[&after_by_fragment[&vec![0]]], 3);
2563+
assert_eq!(partitions_per_segment[&after_by_fragment[&vec![1]]], 2);
2564+
}
2565+
24382566
#[tokio::test]
24392567
async fn test_recreate_index() {
24402568
const DIM: i32 = 8;

rust/lance/src/index/append.rs

Lines changed: 128 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use uuid::Uuid;
2121

2222
use super::DatasetIndexInternalExt;
2323
use super::vector::LogicalVectorIndex;
24-
use super::vector::ivf::optimize_vector_indices;
24+
use super::vector::ivf::{optimize_vector_indices, select_segment_for_single_rebalance};
2525
use crate::dataset::Dataset;
2626
use crate::dataset::index::LanceIndexStoreExt;
2727
use crate::dataset::rowids::load_row_id_sequences;
@@ -145,12 +145,12 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
145145
}
146146
}
147147

148-
let mut frag_bitmap = RoaringBitmap::new();
148+
let mut base_unindexed_bitmap = RoaringBitmap::new();
149149
unindexed.iter().for_each(|frag| {
150-
frag_bitmap.insert(frag.id as u32);
150+
base_unindexed_bitmap.insert(frag.id as u32);
151151
});
152152

153-
let (new_uuid, indices_merged, created_index) = if first_is_vector_index {
153+
let (new_uuid, removed_indices, new_fragment_bitmap, created_index) = if first_is_vector_index {
154154
let full_logical_index = dataset
155155
.open_logical_vector_index(&field_path, &old_indices[0].name)
156156
.await?;
@@ -180,50 +180,130 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
180180
)?;
181181
let ivf_view = logical_index.as_ivf()?;
182182

183-
let new_data_stream = if unindexed.is_empty() {
184-
None
185-
} else {
186-
let mut scanner = dataset.scan();
187-
scanner
188-
.with_fragments(unindexed.to_vec())
189-
.with_row_id()
190-
.project(&[&field_path])?;
191-
if column.nullable {
192-
let column_expr = lance_datafusion::logical_expr::field_path_to_expr(&field_path)?;
193-
scanner.filter_expr(column_expr.is_not_null());
183+
let use_single_segment_rebalance = logical_index.num_segments() > 1
184+
&& options.num_indices_to_merge.is_none()
185+
&& !options.retrain
186+
&& unindexed.is_empty();
187+
188+
if use_single_segment_rebalance {
189+
let Some(selected_segment_id) = select_segment_for_single_rebalance(&ivf_view)? else {
190+
return Ok(None);
191+
};
192+
let removed_segment = old_indices
193+
.iter()
194+
.copied()
195+
.find(|metadata| metadata.uuid == selected_segment_id)
196+
.ok_or_else(|| {
197+
Error::index(format!(
198+
"Append index: logical vector index '{}' does not contain selected segment {}",
199+
old_indices[0].name, selected_segment_id
200+
))
201+
})?;
202+
let (selected_metadata, selected_index) = logical_index
203+
.iter()
204+
.find(|(metadata, _)| metadata.uuid == selected_segment_id)
205+
.map(|(metadata, index)| (metadata.clone(), index.clone()))
206+
.ok_or_else(|| {
207+
Error::index(format!(
208+
"Append index: failed to materialize selected segment {} from logical vector index '{}'",
209+
selected_segment_id, old_indices[0].name
210+
))
211+
})?;
212+
let selected_logical_index = LogicalVectorIndex::try_new(
213+
old_indices[0].name.clone(),
214+
field_path.clone(),
215+
vec![(selected_metadata, selected_index)],
216+
)?;
217+
let selected_ivf_view = selected_logical_index.as_ivf()?;
218+
let (new_uuid, indices_merged) = optimize_vector_indices(
219+
dataset.as_ref().clone(),
220+
Option::<
221+
lance_io::stream::RecordBatchStreamAdapter<
222+
futures::stream::Empty<lance_core::Result<arrow_array::RecordBatch>>,
223+
>,
224+
>::None,
225+
&field_path,
226+
&selected_ivf_view,
227+
options,
228+
)
229+
.await?;
230+
if indices_merged == 0 {
231+
return Ok(None);
194232
}
195-
Some(scanner.try_into_stream().await?)
196-
};
197233

198-
let (new_uuid, indices_merged) = optimize_vector_indices(
199-
dataset.as_ref().clone(),
200-
new_data_stream,
201-
&field_path,
202-
&ivf_view,
203-
options,
204-
)
205-
.boxed()
206-
.await?;
234+
let index_dir = dataset.indices_dir().child(new_uuid.to_string());
235+
let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?;
236+
let new_fragment_bitmap = removed_segment
237+
.effective_fragment_bitmap(&dataset.fragment_bitmap)
238+
.or_else(|| removed_segment.fragment_bitmap.clone())
239+
.unwrap_or_default();
240+
241+
Ok((
242+
new_uuid,
243+
vec![removed_segment],
244+
new_fragment_bitmap,
245+
CreatedIndex {
246+
index_details: vector_index_details(),
247+
index_version: lance_index::IndexType::Vector.version() as u32,
248+
files: Some(files),
249+
},
250+
))
251+
} else {
252+
let mut frag_bitmap = base_unindexed_bitmap.clone();
253+
254+
let new_data_stream = if unindexed.is_empty() {
255+
None
256+
} else {
257+
let mut scanner = dataset.scan();
258+
scanner
259+
.with_fragments(unindexed.to_vec())
260+
.with_row_id()
261+
.project(&[&field_path])?;
262+
if column.nullable {
263+
let column_expr =
264+
lance_datafusion::logical_expr::field_path_to_expr(&field_path)?;
265+
scanner.filter_expr(column_expr.is_not_null());
266+
}
267+
Some(scanner.try_into_stream().await?)
268+
};
269+
270+
let (new_uuid, indices_merged) = optimize_vector_indices(
271+
dataset.as_ref().clone(),
272+
new_data_stream,
273+
&field_path,
274+
&ivf_view,
275+
options,
276+
)
277+
.boxed()
278+
.await?;
207279

208-
old_indices[old_indices.len() - indices_merged..]
209-
.iter()
210-
.for_each(|idx| {
280+
let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec();
281+
removed_indices.iter().for_each(|idx| {
211282
frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter());
212283
});
284+
for removed in removed_indices.iter() {
285+
if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap)
286+
{
287+
frag_bitmap |= &effective;
288+
}
289+
}
213290

214-
let index_dir = dataset.indices_dir().child(new_uuid.to_string());
215-
let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?;
216-
217-
Ok((
218-
new_uuid,
219-
indices_merged,
220-
CreatedIndex {
221-
index_details: vector_index_details(),
222-
index_version: lance_index::IndexType::Vector.version() as u32,
223-
files: Some(files),
224-
},
225-
))
291+
let index_dir = dataset.indices_dir().child(new_uuid.to_string());
292+
let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?;
293+
294+
Ok((
295+
new_uuid,
296+
removed_indices,
297+
frag_bitmap,
298+
CreatedIndex {
299+
index_details: vector_index_details(),
300+
index_version: lance_index::IndexType::Vector.version() as u32,
301+
files: Some(files),
302+
},
303+
))
304+
}
226305
} else {
306+
let mut frag_bitmap = base_unindexed_bitmap;
227307
let mut indices = Vec::with_capacity(old_indices.len());
228308
for idx in old_indices {
229309
match dataset
@@ -341,7 +421,12 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
341421
};
342422

343423
// TODO: don't hard-code index version
344-
Ok((new_uuid, 1, created_index))
424+
Ok((
425+
new_uuid,
426+
vec![old_indices[old_indices.len() - 1]],
427+
frag_bitmap,
428+
created_index,
429+
))
345430
}
346431
_ => Err(Error::index(format!(
347432
"Append index: invalid index type: {:?}",
@@ -350,17 +435,10 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
350435
}
351436
}?;
352437

353-
let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec();
354-
for removed in removed_indices.iter() {
355-
if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap) {
356-
frag_bitmap |= &effective;
357-
}
358-
}
359-
360438
Ok(Some(IndexMergeResults {
361439
new_uuid,
362440
removed_indices,
363-
new_fragment_bitmap: frag_bitmap,
441+
new_fragment_bitmap,
364442
new_index_version: created_index.index_version as i32,
365443
new_index_details: created_index.index_details,
366444
files: created_index.files,

0 commit comments

Comments
 (0)