From e49e0d1a4e92f83a03e6a6b4cda732109be340a5 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 18 Mar 2026 16:16:13 +0530 Subject: [PATCH 1/2] fix(sqlite-provider): use caller-provided key column name The SqliteLookupProvider previously hardcoded "row_idx" as the key column name in CREATE TABLE and WHERE clauses. This caused errors when callers used a different key column name (e.g. "_key"). Now derives the key column name from the first field in the provided schema, making the provider work with any key column name. --- src/sqlite_provider.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 2273ced..19572df 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -2,9 +2,12 @@ // // Stores all non-embedding columns in a local SQLite database (bundled libsqlite3). // Scalar columns map to INTEGER/TEXT/REAL; list columns are serialised as JSON TEXT. -// Lookups use `WHERE row_idx IN (?, ...)` against the INTEGER PRIMARY KEY B-tree. +// Lookups use `WHERE IN (?, ...)` against the INTEGER PRIMARY KEY B-tree. // -// Schema: row_idx INTEGER PRIMARY KEY, TEXT/INTEGER/REAL, ... +// Schema: INTEGER PRIMARY KEY, TEXT/INTEGER/REAL, ... +// +// The key column name is caller-provided (e.g. "_key") and must match the first +// field in the schema passed to `open_or_build`. // // Persistence: the database is written once to the given path and reused on // subsequent runs. The first build reads all parquet files and inserts rows @@ -42,6 +45,7 @@ use crate::lookup::PointLookupProvider; pub struct SqliteLookupProvider { schema: SchemaRef, table_name: String, + key_col: String, pool: Arc>>, sem: Arc, } @@ -117,6 +121,8 @@ impl SqliteLookupProvider { schema: SchemaRef, parquet_col_indices: &[usize], ) -> DFResult { + // The first field in the schema is the key column (INTEGER PRIMARY KEY). + let key_col = schema.field(0).name().clone(); if pool_size == 0 { return Err(DataFusionError::Execution( "pool_size must be at least 1".into(), @@ -167,6 +173,7 @@ impl SqliteLookupProvider { Ok(Self { schema, table_name: table_name.to_string(), + key_col, pool: Arc::new(Mutex::new(conns)), sem: Arc::new(Semaphore::new(pool_size)), }) @@ -202,6 +209,7 @@ impl PointLookupProvider for SqliteLookupProvider { let keys_vec = keys.to_vec(); let pool = self.pool.clone(); let table_name = self.table_name.clone(); + let key_col = self.key_col.clone(); // Acquire a semaphore permit to bound concurrency to the pool size, // then run the synchronous SQLite query on a blocking thread. @@ -227,6 +235,7 @@ impl PointLookupProvider for SqliteLookupProvider { &keys_vec, &out_schema, &table_name, + &key_col, ); drop(guard); // explicit but not required — Drop handles it res @@ -243,6 +252,7 @@ fn execute_query_sync( keys: &[u64], out_schema: &SchemaRef, table_name: &str, + key_col: &str, ) -> DFResult> { let placeholders = keys.iter().map(|_| "?").collect::>().join(", "); // Select only the columns in out_schema (already projection-applied by the @@ -253,8 +263,9 @@ fn execute_query_sync( .map(|f| quote_ident(f.name())) .collect::>() .join(", "); + let qk = quote_ident(key_col); let sql = format!( - "SELECT {col_list} FROM {tn} WHERE row_idx IN ({placeholders}) ORDER BY row_idx", + "SELECT {col_list} FROM {tn} WHERE {qk} IN ({placeholders}) ORDER BY {qk}", tn = quote_ident(table_name) ); @@ -586,14 +597,16 @@ fn build_table( schema: &SchemaRef, parquet_col_indices: &[usize], ) -> DFResult<()> { + // The first field is the key column (INTEGER PRIMARY KEY). + let key_col_name = schema.field(0).name(); let col_defs = schema .fields() .iter() .map(|f| { - let sql_type = arrow_type_to_sql(f.data_type()); - if f.name() == "row_idx" { - "row_idx INTEGER PRIMARY KEY".to_string() + if f.name() == key_col_name { + format!("{} INTEGER PRIMARY KEY", quote_ident(f.name())) } else { + let sql_type = arrow_type_to_sql(f.data_type()); format!("{} {}", quote_ident(f.name()), sql_type) } }) From 6934d3aefb8b5cae4adedc2f9d57d83c431f91fd Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 18 Mar 2026 16:31:04 +0530 Subject: [PATCH 2/2] test(sqlite-provider): add test for custom key column name Exercises SqliteLookupProvider with "_key" as the key column (the scenario used by runtimedb), verifying both fetch_by_keys and projection work with non-default key column names. --- tests/sqlite_provider_test.rs | 75 +++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index f14781d..77ee09b 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -204,3 +204,78 @@ async fn test_table_name_with_spaces() { let batches = provider.fetch_by_keys(&[0], "row_idx", None).await.unwrap(); assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); } + +/// Verify that a non-default key column name (e.g. "_key") works correctly. +/// This is the scenario used by runtimedb where Parquet files have a `_key` column. +#[tokio::test] +async fn test_custom_key_column_name() { + let dir = tempdir().unwrap(); + + let parquet_schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)])); + + // Provider schema uses "_key" instead of the default "row_idx". + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("_key", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + parquet_schema.clone(), + vec![Arc::new(StringArray::from(vec![ + Some("alice"), + Some("bob"), + Some("carol"), + ]))], + ) + .unwrap(); + + let parquet_path = dir.path().join("test.parquet"); + let file = std::fs::File::create(&parquet_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, parquet_schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let db_path = dir.path().join("test_key.db"); + let provider = SqliteLookupProvider::open_or_build( + db_path.to_str().unwrap(), + "vectors", + 2, + &[parquet_path.to_str().unwrap().to_string()], + provider_schema, + &[0], + ) + .unwrap(); + + // fetch_by_keys should work with the custom key column + let batches = provider.fetch_by_keys(&[0, 2], "_key", None).await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + let names: Vec = batches + .iter() + .flat_map(|b| { + b.column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap().to_string()) + .collect::>() + }) + .collect(); + assert_eq!(names, vec!["alice", "carol"]); + + // projection to only the key column should also work + let batches = provider + .fetch_by_keys(&[1], "_key", Some(&[0])) + .await + .unwrap(); + assert_eq!(batches[0].schema().field(0).name(), "_key"); + let key_col = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(key_col.value(0), 1); +}