Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions crates/codegraph-core/src/ast_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,35 +74,42 @@ pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec<FileAstBatch>) -> u32
}

let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let mut conn = match Connection::open_with_flags(&db_path, flags) {
let conn = match Connection::open_with_flags(&db_path, flags) {
Ok(c) => c,
Err(_) => return 0,
};

// Match the JS-side performance pragmas (including busy_timeout for WAL contention)
let _ = conn.execute_batch(
"PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000",
);
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");

do_insert_ast_nodes(&conn, &batches).unwrap_or(0)
}

/// Internal implementation: insert AST nodes using an existing connection.
/// Used by both the standalone `bulk_insert_ast_nodes` function and `NativeDatabase`.
pub(crate) fn do_insert_ast_nodes(
conn: &Connection,
batches: &[FileAstBatch],
) -> rusqlite::Result<u32> {
if batches.is_empty() {
return Ok(0);
}

// Bail out if the ast_nodes table doesn't exist (schema too old)
let has_table: bool = conn
.prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='ast_nodes'")
.and_then(|mut s| s.query_row([], |_| Ok(true)))
.unwrap_or(false);
if !has_table {
return 0;
return Ok(0);
}

// ── Phase 1: Pre-fetch node definitions for parent resolution ────────
let mut file_defs: HashMap<String, Vec<NodeDef>> = HashMap::new();
{
let Ok(mut stmt) =
conn.prepare("SELECT id, line, end_line FROM nodes WHERE file = ?1")
else {
return 0;
};
let mut stmt = conn.prepare("SELECT id, line, end_line FROM nodes WHERE file = ?1")?;

for batch in &batches {
for batch in batches {
if batch.nodes.is_empty() || file_defs.contains_key(&batch.file) {
continue;
}
Expand All @@ -118,48 +125,39 @@ pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec<FileAstBatch>) -> u32
.unwrap_or_default();
file_defs.insert(batch.file.clone(), defs);
}
} // `stmt` dropped — releases the immutable borrow on `conn`
}

// ── Phase 2: Bulk insert in a single transaction ─────────────────────
let Ok(tx) = conn.transaction() else {
return 0;
};
let tx = conn.unchecked_transaction()?;

let mut total = 0u32;
{
let Ok(mut insert_stmt) = tx.prepare(
let mut insert_stmt = tx.prepare(
"INSERT INTO ast_nodes (file, line, kind, name, text, receiver, parent_node_id) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
) else {
return 0;
};
)?;

for batch in &batches {
for batch in batches {
let empty = Vec::new();
let defs = file_defs.get(&batch.file).unwrap_or(&empty);

for node in &batch.nodes {
let parent_id = find_parent_id(defs, node.line);

match insert_stmt.execute(params![
insert_stmt.execute(params![
&batch.file,
node.line,
&node.kind,
&node.name,
&node.text,
&node.receiver,
parent_id,
]) {
Ok(_) => total += 1,
Err(_) => return 0, // abort; tx rolls back on drop
}
])?;
total += 1;
}
}
} // `insert_stmt` dropped

if tx.commit().is_err() {
return 0;
}

total
tx.commit()?;
Ok(total)
}
8 changes: 4 additions & 4 deletions crates/codegraph-core/src/edges_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ pub fn bulk_insert_edges(db_path: String, edges: Vec<EdgeRow>) -> bool {
return true;
}
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let mut conn = match Connection::open_with_flags(&db_path, flags) {
let conn = match Connection::open_with_flags(&db_path, flags) {
Ok(c) => c,
Err(_) => return false,
};
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
do_insert(&mut conn, &edges).is_ok()
do_insert_edges(&conn, &edges).is_ok()
}

/// 199 rows × 5 params = 995 bind parameters per statement, safely under
/// the legacy `SQLITE_MAX_VARIABLE_NUMBER` default of 999.
const CHUNK: usize = 199;

fn do_insert(conn: &mut Connection, edges: &[EdgeRow]) -> rusqlite::Result<()> {
let tx = conn.transaction()?;
pub(crate) fn do_insert_edges(conn: &Connection, edges: &[EdgeRow]) -> rusqlite::Result<()> {
let tx = conn.unchecked_transaction()?;

for chunk in edges.chunks(CHUNK) {
let placeholders: Vec<String> = (0..chunk.len())
Expand Down
10 changes: 5 additions & 5 deletions crates/codegraph-core/src/insert_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ pub fn bulk_insert_nodes(
removed_files: Vec<String>,
) -> bool {
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let mut conn = match Connection::open_with_flags(&db_path, flags) {
let conn = match Connection::open_with_flags(&db_path, flags) {
Ok(c) => c,
Err(_) => return false,
};

let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");

do_insert(&mut conn, &batches, &file_hashes, &removed_files).is_ok()
do_insert_nodes(&conn, &batches, &file_hashes, &removed_files).is_ok()
}

// ── Internal implementation ─────────────────────────────────────────
Expand All @@ -108,13 +108,13 @@ fn query_node_ids(
Ok(map)
}

fn do_insert(
conn: &mut Connection,
pub(crate) fn do_insert_nodes(
conn: &Connection,
batches: &[InsertNodesBatch],
file_hashes: &[FileHashEntry],
removed_files: &[String],
) -> rusqlite::Result<()> {
let tx = conn.transaction()?;
let tx = conn.unchecked_transaction()?;

// ── Phase 1: Insert file nodes + definitions + export nodes ──────
{
Expand Down
119 changes: 119 additions & 0 deletions crates/codegraph-core/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use napi_derive::napi;
use rusqlite::{params, Connection, OpenFlags};
use send_wrapper::SendWrapper;

use crate::ast_db::{self, FileAstBatch};
use crate::edges_db::{self, EdgeRow};
use crate::insert_nodes::{self, FileHashEntry, InsertNodesBatch};
use crate::roles_db::{self, RoleSummary};

// ── Migration DDL (mirrored from src/db/migrations.ts) ──────────────────

struct Migration {
Expand Down Expand Up @@ -543,6 +548,120 @@ impl NativeDatabase {
.map_err(|e| napi::Error::from_reason(format!("commit setBuildMeta failed: {e}")))?;
Ok(())
}

// ── Phase 6.15: Build pipeline write operations ─────────────────────

/// Bulk-insert nodes, children, containment edges, exports, and file hashes.
/// Reuses the persistent connection instead of opening a new one.
/// Returns `true` on success, `false` on failure.
#[napi]
pub fn bulk_insert_nodes(
&self,
batches: Vec<InsertNodesBatch>,
file_hashes: Vec<FileHashEntry>,
removed_files: Vec<String>,
) -> napi::Result<bool> {
let conn = self.conn()?;
Ok(insert_nodes::do_insert_nodes(conn, &batches, &file_hashes, &removed_files)
.inspect_err(|e| eprintln!("[NativeDatabase] bulk_insert_nodes failed: {e}"))
.is_ok())
}

/// Bulk-insert edge rows using chunked multi-value INSERT statements.
/// Returns `true` on success, `false` on failure.
Comment on lines +570 to +571
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Errors silently swallowed in bulk insert methods

Both bulk_insert_nodes and bulk_insert_edges convert Err results to Ok(false) via .is_ok(), permanently discarding the rusqlite error. The TypeScript caller falls back to the JS path on false, which handles correctness, but debugging a silent failure (e.g. a schema mismatch or WAL lock timeout) becomes very difficult.

Consider at minimum logging the error before dropping it:

Ok(insert_nodes::do_insert(conn, &batches, &file_hashes, &removed_files)
    .inspect_err(|e| eprintln!("[NativeDatabase] bulk_insert_nodes failed: {e}"))
    .is_ok())

The same pattern applies to bulk_insert_edges at the corresponding line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added .inspect_err(|e| eprintln!(...)) before .is_ok() in both bulk_insert_nodes and bulk_insert_edges. Errors will now be logged to stderr before being converted to false, making silent failures diagnosable.

#[napi]
pub fn bulk_insert_edges(&self, edges: Vec<EdgeRow>) -> napi::Result<bool> {
if edges.is_empty() {
return Ok(true);
}
let conn = self.conn()?;
Ok(edges_db::do_insert_edges(conn, &edges)
.inspect_err(|e| eprintln!("[NativeDatabase] bulk_insert_edges failed: {e}"))
.is_ok())
}

/// Bulk-insert AST nodes, resolving parent_node_id from the nodes table.
/// Returns the number of rows inserted (0 on failure).
#[napi]
pub fn bulk_insert_ast_nodes(&self, batches: Vec<FileAstBatch>) -> napi::Result<u32> {
let conn = self.conn()?;
Ok(ast_db::do_insert_ast_nodes(conn, &batches).unwrap_or(0))
}

/// Full role classification: queries all nodes, computes fan-in/fan-out,
/// classifies roles, and batch-updates the `role` column.
#[napi]
pub fn classify_roles_full(&self) -> napi::Result<Option<RoleSummary>> {
let conn = self.conn()?;
Ok(roles_db::do_classify_full(conn).ok())
}

/// Incremental role classification: only reclassifies nodes from changed
/// files plus their immediate edge neighbours.
#[napi]
pub fn classify_roles_incremental(
&self,
changed_files: Vec<String>,
) -> napi::Result<Option<RoleSummary>> {
let conn = self.conn()?;
Ok(roles_db::do_classify_incremental(conn, &changed_files).ok())
}

/// Cascade-delete all graph data for the specified files across all tables.
/// Order: dependent tables first (embeddings, cfg, dataflow, complexity,
/// metrics, ast_nodes), then edges, then nodes, then optionally file_hashes.
#[napi]
pub fn purge_files_data(
&self,
files: Vec<String>,
purge_hashes: Option<bool>,
) -> napi::Result<()> {
if files.is_empty() {
return Ok(());
}
let conn = self.conn()?;
let purge_hashes = purge_hashes.unwrap_or(true);

let tx = conn
.unchecked_transaction()
.map_err(|e| napi::Error::from_reason(format!("purge transaction failed: {e}")))?;

// Purge each file across all tables. Optional tables are silently
// skipped if they don't exist. Order: dependents → edges → nodes → hashes.
let purge_sql: &[(&str, bool)] = &[
("DELETE FROM embeddings WHERE node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
("DELETE FROM cfg_edges WHERE function_node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
("DELETE FROM cfg_blocks WHERE function_node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
("DELETE FROM dataflow WHERE source_id IN (SELECT id FROM nodes WHERE file = ?1) OR target_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
("DELETE FROM function_complexity WHERE node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
("DELETE FROM node_metrics WHERE node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
("DELETE FROM ast_nodes WHERE file = ?1", false),
// Core tables — errors propagated
("DELETE FROM edges WHERE source_id IN (SELECT id FROM nodes WHERE file = ?1) OR target_id IN (SELECT id FROM nodes WHERE file = ?1)", true),
("DELETE FROM nodes WHERE file = ?1", true),
];

for file in &files {
for &(sql, required) in purge_sql {
match tx.execute(sql, params![file]) {
Ok(_) => {}
Err(e) if required => {
return Err(napi::Error::from_reason(format!(
"purge failed for \"{file}\": {e}"
)));
}
Err(_) => {} // optional table missing — skip
}
}
if purge_hashes {
let _ = tx.execute("DELETE FROM file_hashes WHERE file = ?1", params![file]);
}
}

tx.commit()
.map_err(|e| napi::Error::from_reason(format!("purge commit failed: {e}")))?;
Ok(())
}
}

// ── Private helpers ─────────────────────────────────────────────────────
Expand Down
18 changes: 9 additions & 9 deletions crates/codegraph-core/src/roles_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ pub struct RoleSummary {
#[napi]
pub fn classify_roles_full(db_path: String) -> Option<RoleSummary> {
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let mut conn = Connection::open_with_flags(&db_path, flags).ok()?;
let conn = Connection::open_with_flags(&db_path, flags).ok()?;
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
do_classify_full(&mut conn).ok()
do_classify_full(&conn).ok()
}

/// Incremental role classification: only reclassifies nodes from changed files
Expand All @@ -88,9 +88,9 @@ pub fn classify_roles_incremental(
changed_files: Vec<String>,
) -> Option<RoleSummary> {
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let mut conn = Connection::open_with_flags(&db_path, flags).ok()?;
let conn = Connection::open_with_flags(&db_path, flags).ok()?;
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
do_classify_incremental(&mut conn, &changed_files).ok()
do_classify_incremental(&conn, &changed_files).ok()
}

// ── Shared helpers ───────────────────────────────────────────────────
Expand Down Expand Up @@ -228,8 +228,8 @@ fn batch_update_roles(

// ── Full classification ──────────────────────────────────────────────

fn do_classify_full(conn: &mut Connection) -> rusqlite::Result<RoleSummary> {
let tx = conn.transaction()?;
pub(crate) fn do_classify_full(conn: &Connection) -> rusqlite::Result<RoleSummary> {
let tx = conn.unchecked_transaction()?;
let mut summary = RoleSummary::default();

// 1. Leaf kinds → dead-leaf (skip expensive fan-in/fan-out JOINs)
Expand Down Expand Up @@ -351,11 +351,11 @@ fn do_classify_full(conn: &mut Connection) -> rusqlite::Result<RoleSummary> {

// ── Incremental classification ───────────────────────────────────────

fn do_classify_incremental(
conn: &mut Connection,
pub(crate) fn do_classify_incremental(
conn: &Connection,
changed_files: &[String],
) -> rusqlite::Result<RoleSummary> {
let tx = conn.transaction()?;
let tx = conn.unchecked_transaction()?;
let mut summary = RoleSummary::default();

// Build placeholders for changed files
Expand Down
1 change: 1 addition & 0 deletions src/domain/graph/builder/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ function initializeEngine(ctx: PipelineContext): void {
engine: ctx.opts.engine || 'auto',
dataflow: ctx.opts.dataflow !== false,
ast: ctx.opts.ast !== false,
nativeDb: ctx.nativeDb,
};
const { name: engineName, version: engineVersion } = getActiveEngine(ctx.engineOpts);
ctx.engineName = engineName as 'native' | 'wasm';
Expand Down
15 changes: 11 additions & 4 deletions src/domain/graph/builder/stages/build-edges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,23 +673,30 @@ export async function buildEdges(ctx: PipelineContext): Promise<void> {

// When using native edge insert, skip JS insert here — do it after tx commits.
// Otherwise insert edges within this transaction for atomicity.
if (!native?.bulkInsertEdges) {
const useNativeEdgeInsert = !!(ctx.nativeDb?.bulkInsertEdges || native?.bulkInsertEdges);
if (!useNativeEdgeInsert) {
batchInsertEdges(db, allEdgeRows);
}
});
computeEdgesTx();

// Phase 2: Native rusqlite bulk insert (outside better-sqlite3 transaction
// since rusqlite opens its own connection — avoids SQLITE_BUSY contention)
if (native?.bulkInsertEdges && allEdgeRows.length > 0) {
// to avoid SQLITE_BUSY contention). Prefer NativeDatabase persistent
// connection (6.15), fall back to standalone function (6.12).
if ((ctx.nativeDb?.bulkInsertEdges || native?.bulkInsertEdges) && allEdgeRows.length > 0) {
const nativeEdges = allEdgeRows.map((r) => ({
sourceId: r[0],
targetId: r[1],
kind: r[2],
confidence: r[3],
dynamic: r[4],
}));
const ok = native.bulkInsertEdges(db.name, nativeEdges);
let ok: boolean;
if (ctx.nativeDb?.bulkInsertEdges) {
ok = ctx.nativeDb.bulkInsertEdges(nativeEdges);
} else {
ok = native!.bulkInsertEdges(db.name, nativeEdges);
}
if (!ok) {
debug('Native bulkInsertEdges failed — falling back to JS batchInsertEdges');
batchInsertEdges(db, allEdgeRows);
Expand Down
Loading
Loading