Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 49 additions & 21 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,20 @@ async fn usearch_execute(

if params.physical_filters.is_empty() {
// ── Unfiltered path ───────────────────────────────────────────────
let matches = usearch_search(
&registered.index,
&params.query_vec,
params.k,
params.scalar_kind,
)?;
let matches = {
let _span = tracing::info_span!(
"usearch_hnsw_search",
usearch.k = params.k,
usearch.dims = params.query_vec.len(),
)
.entered();
usearch_search(
&registered.index,
&params.query_vec,
params.k,
params.scalar_kind,
)?
};

if matches.keys.is_empty() {
return Ok(vec![]);
Expand All @@ -358,12 +366,21 @@ async fn usearch_execute(
.map(|(&k, &d)| (k, d))
.collect();

let data_batches = registered
.lookup_provider
.fetch_by_keys(&matches.keys, &params.key_col, None)
.await?;
let fetch_keys_count = matches.keys.len();
let data_batches = async {
registered
.lookup_provider
.fetch_by_keys(&matches.keys, &params.key_col, None)
.await
}
.instrument(tracing::info_span!(
"usearch_sqlite_fetch",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion (non-blocking): The span name "usearch_sqlite_fetch" is baked into lookup_provider.fetch_by_keys(...), but lookup_provider is provider-agnostic — it could be a ParquetLookupProvider. This will appear as usearch_sqlite_fetch in traces even when no SQLite is involved, which may confuse consumers of the telemetry.

Consider "usearch_lookup_fetch" (or similar) here and at the corresponding site in adaptive_filtered_execute (~line 517).

usearch.fetch_keys = fetch_keys_count,
))
.await?;

let key_col_idx = provider_key_col_idx(&registered)?;
let _span = tracing::info_span!("usearch_attach_distances").entered();
attach_distances(data_batches, key_col_idx, &key_to_dist, &params.schema)
} else {
// ── Adaptive filtered path ────────────────────────────────────────
Expand Down Expand Up @@ -490,17 +507,28 @@ async fn adaptive_filtered_execute(
.map(|(&k, &d)| (k, d))
.collect();

let data_batches = registered
.lookup_provider
.fetch_by_keys(&matches.keys, &params.key_col, None)
.await?;

let result_batches = attach_distances(
data_batches,
lookup_key_col_idx,
&key_to_dist,
&params.schema,
)?;
let fetch_keys_count = matches.keys.len();
let data_batches = async {
registered
.lookup_provider
.fetch_by_keys(&matches.keys, &params.key_col, None)
.await
}
.instrument(tracing::info_span!(
"usearch_sqlite_fetch",
usearch.fetch_keys = fetch_keys_count,
))
.await?;

let result_batches = {
let _span = tracing::info_span!("usearch_attach_distances").entered();
attach_distances(
data_batches,
lookup_key_col_idx,
&key_to_dist,
&params.schema,
)?
};

tracing::Span::current().record(
"usearch.result_count",
Expand Down
86 changes: 86 additions & 0 deletions src/sqlite_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,88 @@ fn open_conn(db_path: &str) -> DFResult<Connection> {
Ok(conn)
}

/// Ensure the key column has an index. If the table was created with
/// `INTEGER PRIMARY KEY` the rowid alias already serves as the index and
/// this is a no-op. For tables created without a PK (pre-fix builds) we
/// create a secondary index so point lookups use the B-tree instead of a
/// full table scan.
fn ensure_key_index(conn: &Connection, table_name: &str, key_col: &str) -> DFResult<()> {
// Check if the key column is the INTEGER PRIMARY KEY (rowid alias).
// In that case SQLite already uses the rowid B-tree — no extra index needed.
let is_pk: bool = conn
.query_row(
&format!(
"SELECT pk FROM pragma_table_info({tn}) WHERE name = ?1",
tn = quote_ident(table_name)
),
rusqlite::params![key_col],
|row| row.get::<_, i64>(0),
)
.map(|pk| pk > 0)
.unwrap_or(false);

if is_pk {
return Ok(());
}

// Check if any existing index covers the key column using pragmas
// (avoids brittle SQL text matching against sqlite_master).
let has_index: bool = {
let mut found = false;
let mut idx_stmt = conn
.prepare(&format!(
"SELECT name FROM pragma_index_list({tn})",
tn = quote_ident(table_name)
))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let idx_names: Vec<String> = idx_stmt
.query_map([], |row| row.get::<_, String>(0))
.map_err(|e| DataFusionError::Execution(e.to_string()))?
.filter_map(|r| r.ok())
.collect();
for idx_name in idx_names {
let col_name: Option<String> = conn
.query_row(
&format!(
"SELECT name FROM pragma_index_info({idx})",
idx = quote_ident(&idx_name)
),
[],
|row| row.get::<_, String>(0),
)
.ok();
if col_name.as_deref() == Some(key_col) {
found = true;
break;
}
}
found
};

if has_index {
return Ok(());
}

tracing::warn!(
"SQLite table '{}': key column '{}' has no index — creating one (one-time migration).",
table_name,
key_col,
);
conn.execute(
&format!(
"CREATE INDEX IF NOT EXISTS {idx} ON {tn}({col})",
idx = quote_ident(&format!("idx_{table_name}_{key_col}")),
tn = quote_ident(table_name),
col = quote_ident(key_col),
),
[],
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Race condition: missing IF NOT EXISTS

If two processes start simultaneously against the same database (a realistic deployment pattern), both can pass the has_index check above and then both attempt CREATE INDEX "idx_table_key". The second attempt hits SQLITE_ERROR: index already exists and propagates as a DataFusionError, aborting provider initialization for that process.

The same failure occurs if the has_index LIKE check produces a false negative (see below) and an index with the same auto-generated name already exists in the database.

Suggested change
)
conn.execute(
&format!(
"CREATE INDEX IF NOT EXISTS {idx} ON {tn}({col})",
idx = quote_ident(&format!("idx_{table_name}_{key_col}")),
tn = quote_ident(table_name),
col = quote_ident(key_col),
),
[],
)
.map_err(|e| DataFusionError::Execution(format!("failed to create key index: {e}")))?;

.map_err(|e| DataFusionError::Execution(format!("failed to create key index: {e}")))?;

tracing::info!("Created index on '{}'.'{}'", table_name, key_col,);
Ok(())
}

impl SqliteLookupProvider {
/// Open the existing SQLite database at `db_path`, or build it from
/// parquet files on first run. Opens a pool of `pool_size` read
Expand Down Expand Up @@ -152,6 +234,10 @@ impl SqliteLookupProvider {
table_name,
n
);
// Ensure the key column is indexed. Tables built before the
// INTEGER PRIMARY KEY fix may lack any index on the key column,
// turning every point lookup into a full table scan.
ensure_key_index(&conn, table_name, &key_col)?;
} else {
tracing::info!(
"First run: building SQLite table '{}' (one-time).",
Expand Down
Loading