diff --git a/.gitignore b/.gitignore index 8a124db..96d7afd 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ tmp/ .env .env.* coverage/ +bench/ diff --git a/claude-code/bundle/capture.js b/claude-code/bundle/capture.js index 91f8ec8..053d9b9 100755 --- a/claude-code/bundle/capture.js +++ b/claude-code/bundle/capture.js @@ -72,6 +72,36 @@ function sqlStr(value) { // dist/src/deeplake-api.js var log2 = (msg) => log("sdk", msg); +var RETRYABLE_CODES = /* @__PURE__ */ new Set([429, 500, 502, 503, 504]); +var MAX_RETRIES = 3; +var BASE_DELAY_MS = 500; +var MAX_CONCURRENCY = 5; +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} +var Semaphore = class { + max; + waiting = []; + active = 0; + constructor(max) { + this.max = max; + } + async acquire() { + if (this.active < this.max) { + this.active++; + return; + } + await new Promise((resolve) => this.waiting.push(resolve)); + } + release() { + this.active--; + const next = this.waiting.shift(); + if (next) { + this.active++; + next(); + } + } +}; var DeeplakeApi = class { token; apiUrl; @@ -79,6 +109,7 @@ var DeeplakeApi = class { workspaceId; tableName; _pendingRows = []; + _sem = new Semaphore(MAX_CONCURRENCY); constructor(token, apiUrl, orgId, workspaceId, tableName) { this.token = token; this.apiUrl = apiUrl; @@ -86,25 +117,55 @@ var DeeplakeApi = class { this.workspaceId = workspaceId; this.tableName = tableName; } - /** Execute SQL and return results as row-objects. */ + /** Execute SQL with retry on transient errors and bounded concurrency. */ async query(sql) { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { - method: "POST", - headers: { - Authorization: `Bearer ${this.token}`, - "Content-Type": "application/json", - "X-Activeloop-Org-Id": this.orgId - }, - body: JSON.stringify({ query: sql }) - }); - if (!resp.ok) { + await this._sem.acquire(); + try { + return await this._queryWithRetry(sql); + } finally { + this._sem.release(); + } + } + async _queryWithRetry(sql) { + let lastError; + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + let resp; + try { + resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { + method: "POST", + headers: { + Authorization: `Bearer ${this.token}`, + "Content-Type": "application/json", + "X-Activeloop-Org-Id": this.orgId + }, + body: JSON.stringify({ query: sql }) + }); + } catch (e) { + lastError = e instanceof Error ? e : new Error(String(e)); + if (attempt < MAX_RETRIES) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (fetch error: ${lastError.message}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } + throw lastError; + } + if (resp.ok) { + const raw = await resp.json(); + if (!raw?.rows || !raw?.columns) + return []; + return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i) => [col, row[i]]))); + } const text = await resp.text().catch(() => ""); + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (${resp.status}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } throw new Error(`Query failed: ${resp.status}: ${text.slice(0, 200)}`); } - const raw = await resp.json(); - if (!raw?.rows || !raw?.columns) - return []; - return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i) => [col, row[i]]))); + throw lastError ?? new Error("Query failed: max retries exceeded"); } // ── Writes ────────────────────────────────────────────────────────────────── /** Queue rows for writing. Call commit() to flush. */ @@ -162,18 +223,34 @@ var DeeplakeApi = class { async createIndex(column) { await this.query(`CREATE INDEX IF NOT EXISTS idx_${sqlStr(column)}_bm25 ON "${this.tableName}" USING deeplake_index ("${column}")`); } - /** List all tables in the workspace. */ + /** List all tables in the workspace (with retry). */ async listTables() { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { - headers: { - Authorization: `Bearer ${this.token}`, - "X-Activeloop-Org-Id": this.orgId + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { + headers: { + Authorization: `Bearer ${this.token}`, + "X-Activeloop-Org-Id": this.orgId + } + }); + if (resp.ok) { + const data = await resp.json(); + return (data.tables ?? []).map((t) => t.table_name); + } + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200); + continue; + } + return []; + } catch { + if (attempt < MAX_RETRIES) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt)); + continue; + } + return []; } - }); - if (!resp.ok) - return []; - const data = await resp.json(); - return (data.tables ?? []).map((t) => t.table_name); + } + return []; } /** Create the memory table if it doesn't already exist. Migrate columns on existing tables. */ async ensureTable(name) { diff --git a/claude-code/bundle/pre-tool-use.js b/claude-code/bundle/pre-tool-use.js index 3538674..b391c1b 100755 --- a/claude-code/bundle/pre-tool-use.js +++ b/claude-code/bundle/pre-tool-use.js @@ -80,6 +80,36 @@ function sqlStr(value) { // dist/src/deeplake-api.js var log2 = (msg) => log("sdk", msg); +var RETRYABLE_CODES = /* @__PURE__ */ new Set([429, 500, 502, 503, 504]); +var MAX_RETRIES = 3; +var BASE_DELAY_MS = 500; +var MAX_CONCURRENCY = 5; +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} +var Semaphore = class { + max; + waiting = []; + active = 0; + constructor(max) { + this.max = max; + } + async acquire() { + if (this.active < this.max) { + this.active++; + return; + } + await new Promise((resolve) => this.waiting.push(resolve)); + } + release() { + this.active--; + const next = this.waiting.shift(); + if (next) { + this.active++; + next(); + } + } +}; var DeeplakeApi = class { token; apiUrl; @@ -87,6 +117,7 @@ var DeeplakeApi = class { workspaceId; tableName; _pendingRows = []; + _sem = new Semaphore(MAX_CONCURRENCY); constructor(token, apiUrl, orgId, workspaceId, tableName) { this.token = token; this.apiUrl = apiUrl; @@ -94,25 +125,55 @@ var DeeplakeApi = class { this.workspaceId = workspaceId; this.tableName = tableName; } - /** Execute SQL and return results as row-objects. */ + /** Execute SQL with retry on transient errors and bounded concurrency. */ async query(sql) { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { - method: "POST", - headers: { - Authorization: `Bearer ${this.token}`, - "Content-Type": "application/json", - "X-Activeloop-Org-Id": this.orgId - }, - body: JSON.stringify({ query: sql }) - }); - if (!resp.ok) { + await this._sem.acquire(); + try { + return await this._queryWithRetry(sql); + } finally { + this._sem.release(); + } + } + async _queryWithRetry(sql) { + let lastError; + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + let resp; + try { + resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { + method: "POST", + headers: { + Authorization: `Bearer ${this.token}`, + "Content-Type": "application/json", + "X-Activeloop-Org-Id": this.orgId + }, + body: JSON.stringify({ query: sql }) + }); + } catch (e) { + lastError = e instanceof Error ? e : new Error(String(e)); + if (attempt < MAX_RETRIES) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (fetch error: ${lastError.message}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } + throw lastError; + } + if (resp.ok) { + const raw = await resp.json(); + if (!raw?.rows || !raw?.columns) + return []; + return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i) => [col, row[i]]))); + } const text = await resp.text().catch(() => ""); + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (${resp.status}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } throw new Error(`Query failed: ${resp.status}: ${text.slice(0, 200)}`); } - const raw = await resp.json(); - if (!raw?.rows || !raw?.columns) - return []; - return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i) => [col, row[i]]))); + throw lastError ?? new Error("Query failed: max retries exceeded"); } // ── Writes ────────────────────────────────────────────────────────────────── /** Queue rows for writing. Call commit() to flush. */ @@ -170,18 +231,34 @@ var DeeplakeApi = class { async createIndex(column) { await this.query(`CREATE INDEX IF NOT EXISTS idx_${sqlStr(column)}_bm25 ON "${this.tableName}" USING deeplake_index ("${column}")`); } - /** List all tables in the workspace. */ + /** List all tables in the workspace (with retry). */ async listTables() { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { - headers: { - Authorization: `Bearer ${this.token}`, - "X-Activeloop-Org-Id": this.orgId + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { + headers: { + Authorization: `Bearer ${this.token}`, + "X-Activeloop-Org-Id": this.orgId + } + }); + if (resp.ok) { + const data = await resp.json(); + return (data.tables ?? []).map((t) => t.table_name); + } + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200); + continue; + } + return []; + } catch { + if (attempt < MAX_RETRIES) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt)); + continue; + } + return []; } - }); - if (!resp.ok) - return []; - const data = await resp.json(); - return (data.tables ?? []).map((t) => t.table_name); + } + return []; } /** Create the memory table if it doesn't already exist. Migrate columns on existing tables. */ async ensureTable(name) { @@ -462,16 +539,15 @@ async function main() { const pattern = input.tool_input.pattern ?? ""; const ignoreCase = !!input.tool_input["-i"]; log3(`direct grep: ${pattern}`); - const pathRows = await api.query(`SELECT path FROM "${table}" WHERE summary ${ignoreCase ? "ILIKE" : "LIKE"} '%${sqlStr(pattern)}%' LIMIT 10`); - if (pathRows.length > 0) { + const rows = await api.query(`SELECT path, summary FROM "${table}" WHERE summary ${ignoreCase ? "ILIKE" : "LIKE"} '%${sqlStr(pattern)}%' LIMIT 5`); + if (rows.length > 0) { const allResults = []; - for (const pr of pathRows.slice(0, 5)) { - const p = pr["path"]; - const contentRows = await api.query(`SELECT summary FROM "${table}" WHERE path = '${sqlStr(p)}' LIMIT 1`); - if (!contentRows[0]?.["summary"]) + const re = new RegExp(pattern.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"), ignoreCase ? "i" : ""); + for (const row of rows) { + const p = row["path"]; + const text = row["summary"]; + if (!text) continue; - const text = contentRows[0]["summary"]; - const re = new RegExp(pattern.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"), ignoreCase ? "i" : ""); const matches = text.split("\n").filter((line) => re.test(line)).slice(0, 5).map((line) => `${p}:${line.slice(0, 300)}`); allResults.push(...matches); } diff --git a/claude-code/bundle/session-start.js b/claude-code/bundle/session-start.js index 1e1975b..eb2cfc2 100755 --- a/claude-code/bundle/session-start.js +++ b/claude-code/bundle/session-start.js @@ -84,6 +84,36 @@ function sqlStr(value) { // dist/src/deeplake-api.js var log2 = (msg) => log("sdk", msg); +var RETRYABLE_CODES = /* @__PURE__ */ new Set([429, 500, 502, 503, 504]); +var MAX_RETRIES = 3; +var BASE_DELAY_MS = 500; +var MAX_CONCURRENCY = 5; +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} +var Semaphore = class { + max; + waiting = []; + active = 0; + constructor(max) { + this.max = max; + } + async acquire() { + if (this.active < this.max) { + this.active++; + return; + } + await new Promise((resolve) => this.waiting.push(resolve)); + } + release() { + this.active--; + const next = this.waiting.shift(); + if (next) { + this.active++; + next(); + } + } +}; var DeeplakeApi = class { token; apiUrl; @@ -91,6 +121,7 @@ var DeeplakeApi = class { workspaceId; tableName; _pendingRows = []; + _sem = new Semaphore(MAX_CONCURRENCY); constructor(token, apiUrl, orgId, workspaceId, tableName) { this.token = token; this.apiUrl = apiUrl; @@ -98,25 +129,55 @@ var DeeplakeApi = class { this.workspaceId = workspaceId; this.tableName = tableName; } - /** Execute SQL and return results as row-objects. */ + /** Execute SQL with retry on transient errors and bounded concurrency. */ async query(sql) { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { - method: "POST", - headers: { - Authorization: `Bearer ${this.token}`, - "Content-Type": "application/json", - "X-Activeloop-Org-Id": this.orgId - }, - body: JSON.stringify({ query: sql }) - }); - if (!resp.ok) { + await this._sem.acquire(); + try { + return await this._queryWithRetry(sql); + } finally { + this._sem.release(); + } + } + async _queryWithRetry(sql) { + let lastError; + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + let resp; + try { + resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { + method: "POST", + headers: { + Authorization: `Bearer ${this.token}`, + "Content-Type": "application/json", + "X-Activeloop-Org-Id": this.orgId + }, + body: JSON.stringify({ query: sql }) + }); + } catch (e) { + lastError = e instanceof Error ? e : new Error(String(e)); + if (attempt < MAX_RETRIES) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (fetch error: ${lastError.message}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } + throw lastError; + } + if (resp.ok) { + const raw = await resp.json(); + if (!raw?.rows || !raw?.columns) + return []; + return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i) => [col, row[i]]))); + } const text = await resp.text().catch(() => ""); + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (${resp.status}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } throw new Error(`Query failed: ${resp.status}: ${text.slice(0, 200)}`); } - const raw = await resp.json(); - if (!raw?.rows || !raw?.columns) - return []; - return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i) => [col, row[i]]))); + throw lastError ?? new Error("Query failed: max retries exceeded"); } // ── Writes ────────────────────────────────────────────────────────────────── /** Queue rows for writing. Call commit() to flush. */ @@ -174,18 +235,34 @@ var DeeplakeApi = class { async createIndex(column) { await this.query(`CREATE INDEX IF NOT EXISTS idx_${sqlStr(column)}_bm25 ON "${this.tableName}" USING deeplake_index ("${column}")`); } - /** List all tables in the workspace. */ + /** List all tables in the workspace (with retry). */ async listTables() { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { - headers: { - Authorization: `Bearer ${this.token}`, - "X-Activeloop-Org-Id": this.orgId + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { + headers: { + Authorization: `Bearer ${this.token}`, + "X-Activeloop-Org-Id": this.orgId + } + }); + if (resp.ok) { + const data = await resp.json(); + return (data.tables ?? []).map((t) => t.table_name); + } + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200); + continue; + } + return []; + } catch { + if (attempt < MAX_RETRIES) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt)); + continue; + } + return []; } - }); - if (!resp.ok) - return []; - const data = await resp.json(); - return (data.tables ?? []).map((t) => t.table_name); + } + return []; } /** Create the memory table if it doesn't already exist. Migrate columns on existing tables. */ async ensureTable(name) { diff --git a/claude-code/bundle/shell/deeplake-shell.js b/claude-code/bundle/shell/deeplake-shell.js index fe5f9b0..50633c5 100755 --- a/claude-code/bundle/shell/deeplake-shell.js +++ b/claude-code/bundle/shell/deeplake-shell.js @@ -66775,6 +66775,36 @@ function sqlStr(value) { // dist/src/deeplake-api.js var log2 = (msg) => log("sdk", msg); +var RETRYABLE_CODES = /* @__PURE__ */ new Set([429, 500, 502, 503, 504]); +var MAX_RETRIES = 3; +var BASE_DELAY_MS = 500; +var MAX_CONCURRENCY = 5; +function sleep(ms3) { + return new Promise((resolve5) => setTimeout(resolve5, ms3)); +} +var Semaphore = class { + max; + waiting = []; + active = 0; + constructor(max) { + this.max = max; + } + async acquire() { + if (this.active < this.max) { + this.active++; + return; + } + await new Promise((resolve5) => this.waiting.push(resolve5)); + } + release() { + this.active--; + const next = this.waiting.shift(); + if (next) { + this.active++; + next(); + } + } +}; var DeeplakeApi = class { token; apiUrl; @@ -66782,6 +66812,7 @@ var DeeplakeApi = class { workspaceId; tableName; _pendingRows = []; + _sem = new Semaphore(MAX_CONCURRENCY); constructor(token, apiUrl, orgId, workspaceId, tableName) { this.token = token; this.apiUrl = apiUrl; @@ -66789,25 +66820,55 @@ var DeeplakeApi = class { this.workspaceId = workspaceId; this.tableName = tableName; } - /** Execute SQL and return results as row-objects. */ + /** Execute SQL with retry on transient errors and bounded concurrency. */ async query(sql) { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { - method: "POST", - headers: { - Authorization: `Bearer ${this.token}`, - "Content-Type": "application/json", - "X-Activeloop-Org-Id": this.orgId - }, - body: JSON.stringify({ query: sql }) - }); - if (!resp.ok) { + await this._sem.acquire(); + try { + return await this._queryWithRetry(sql); + } finally { + this._sem.release(); + } + } + async _queryWithRetry(sql) { + let lastError; + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + let resp; + try { + resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { + method: "POST", + headers: { + Authorization: `Bearer ${this.token}`, + "Content-Type": "application/json", + "X-Activeloop-Org-Id": this.orgId + }, + body: JSON.stringify({ query: sql }) + }); + } catch (e6) { + lastError = e6 instanceof Error ? e6 : new Error(String(e6)); + if (attempt < MAX_RETRIES) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (fetch error: ${lastError.message}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } + throw lastError; + } + if (resp.ok) { + const raw = await resp.json(); + if (!raw?.rows || !raw?.columns) + return []; + return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i11) => [col, row[i11]]))); + } const text = await resp.text().catch(() => ""); + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log2(`query retry ${attempt + 1}/${MAX_RETRIES} (${resp.status}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } throw new Error(`Query failed: ${resp.status}: ${text.slice(0, 200)}`); } - const raw = await resp.json(); - if (!raw?.rows || !raw?.columns) - return []; - return raw.rows.map((row) => Object.fromEntries(raw.columns.map((col, i11) => [col, row[i11]]))); + throw lastError ?? new Error("Query failed: max retries exceeded"); } // ── Writes ────────────────────────────────────────────────────────────────── /** Queue rows for writing. Call commit() to flush. */ @@ -66865,18 +66926,34 @@ var DeeplakeApi = class { async createIndex(column) { await this.query(`CREATE INDEX IF NOT EXISTS idx_${sqlStr(column)}_bm25 ON "${this.tableName}" USING deeplake_index ("${column}")`); } - /** List all tables in the workspace. */ + /** List all tables in the workspace (with retry). */ async listTables() { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { - headers: { - Authorization: `Bearer ${this.token}`, - "X-Activeloop-Org-Id": this.orgId + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { + headers: { + Authorization: `Bearer ${this.token}`, + "X-Activeloop-Org-Id": this.orgId + } + }); + if (resp.ok) { + const data = await resp.json(); + return (data.tables ?? []).map((t6) => t6.table_name); + } + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200); + continue; + } + return []; + } catch { + if (attempt < MAX_RETRIES) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt)); + continue; + } + return []; } - }); - if (!resp.ok) - return []; - const data = await resp.json(); - return (data.tables ?? []).map((t6) => t6.table_name); + } + return []; } /** Create the memory table if it doesn't already exist. Migrate columns on existing tables. */ async ensureTable(name) { @@ -66995,31 +67072,37 @@ var DeeplakeFs = class _DeeplakeFs { const fs3 = new _DeeplakeFs(client, table, mount); fs3.sessionsTable = sessionsTable ?? null; await client.ensureTable(); - await client.query(`SELECT deeplake_sync_table('${table}')`); - const sql = `SELECT path, size_bytes, mime_type FROM "${table}" ORDER BY path`; - try { - let rows; + let sessionSyncOk = false; + const syncPromises = [ + client.query(`SELECT deeplake_sync_table('${table}')`) + ]; + if (sessionsTable) { + syncPromises.push(client.query(`SELECT deeplake_sync_table('${sessionsTable}')`).then(() => { + sessionSyncOk = true; + }).catch(() => { + })); + } + await Promise.all(syncPromises); + const memoryBootstrap = (async () => { + const sql = `SELECT path, size_bytes, mime_type FROM "${table}" ORDER BY path`; try { - rows = await client.query(sql); + const rows = await client.query(sql); + for (const row of rows) { + const p22 = row["path"]; + fs3.files.set(p22, null); + fs3.meta.set(p22, { + size: Number(row["size_bytes"] ?? 0), + mime: row["mime_type"] ?? "application/octet-stream", + mtime: /* @__PURE__ */ new Date() + }); + fs3.addToTree(p22); + fs3.flushed.add(p22); + } } catch { - rows = await client.query(sql); - } - for (const row of rows) { - const p22 = row["path"]; - fs3.files.set(p22, null); - fs3.meta.set(p22, { - size: Number(row["size_bytes"] ?? 0), - mime: row["mime_type"] ?? "application/octet-stream", - mtime: /* @__PURE__ */ new Date() - }); - fs3.addToTree(p22); - fs3.flushed.add(p22); } - } catch { - } - if (sessionsTable) { + })(); + const sessionsBootstrap = sessionsTable && sessionSyncOk ? (async () => { try { - await client.query(`SELECT deeplake_sync_table('${sessionsTable}')`); const sessionRows = await client.query(`SELECT path, SUM(size_bytes) as total_size FROM "${sessionsTable}" GROUP BY path ORDER BY path`); for (const row of sessionRows) { const p22 = row["path"]; @@ -67036,7 +67119,8 @@ var DeeplakeFs = class _DeeplakeFs { } } catch { } - } + })() : Promise.resolve(); + await Promise.all([memoryBootstrap, sessionsBootstrap]); return fs3; } // ── tree management ─────────────────────────────────────────────────────── @@ -67079,31 +67163,44 @@ var DeeplakeFs = class _DeeplakeFs { } const rows = [...this.pending.values()]; this.pending.clear(); - for (const r10 of rows) { - const hex = r10.content.toString("hex"); - const text = sqlStr(r10.contentText); - const p22 = sqlStr(r10.path); - const fname = sqlStr(r10.filename); - const mime = sqlStr(r10.mimeType); - const ts3 = (/* @__PURE__ */ new Date()).toISOString(); - const cd = r10.creationDate ?? ts3; - const lud = r10.lastUpdateDate ?? ts3; - if (this.flushed.has(r10.path)) { - let setClauses = `filename = '${fname}', content = E'\\\\x${hex}', summary = E'${text}', mime_type = '${mime}', size_bytes = ${r10.sizeBytes}, last_update_date = '${sqlStr(lud)}'`; - if (r10.project !== void 0) - setClauses += `, project = '${sqlStr(r10.project)}'`; - if (r10.description !== void 0) - setClauses += `, description = '${sqlStr(r10.description)}'`; - await this.client.query(`UPDATE "${this.table}" SET ${setClauses} WHERE path = '${p22}'`); - } else { - const id = randomUUID2(); - const cols = "id, path, filename, content, summary, mime_type, size_bytes, creation_date, last_update_date" + (r10.project !== void 0 ? ", project" : "") + (r10.description !== void 0 ? ", description" : ""); - const vals = `'${id}', '${p22}', '${fname}', E'\\\\x${hex}', E'${text}', '${mime}', ${r10.sizeBytes}, '${sqlStr(cd)}', '${sqlStr(lud)}'` + (r10.project !== void 0 ? `, '${sqlStr(r10.project)}'` : "") + (r10.description !== void 0 ? `, '${sqlStr(r10.description)}'` : ""); - await this.client.query(`INSERT INTO "${this.table}" (${cols}) VALUES (${vals})`); - this.flushed.add(r10.path); + const results = await Promise.allSettled(rows.map((r10) => this.upsertRow(r10))); + let failures = 0; + for (let i11 = 0; i11 < results.length; i11++) { + if (results[i11].status === "rejected") { + if (!this.pending.has(rows[i11].path)) { + this.pending.set(rows[i11].path, rows[i11]); + } + failures++; } } await this.client.query(`SELECT deeplake_sync_table('${this.table}')`); + if (failures > 0) { + throw new Error(`flush: ${failures}/${rows.length} writes failed and were re-queued`); + } + } + async upsertRow(r10) { + const hex = r10.content.toString("hex"); + const text = sqlStr(r10.contentText); + const p22 = sqlStr(r10.path); + const fname = sqlStr(r10.filename); + const mime = sqlStr(r10.mimeType); + const ts3 = (/* @__PURE__ */ new Date()).toISOString(); + const cd = r10.creationDate ?? ts3; + const lud = r10.lastUpdateDate ?? ts3; + if (this.flushed.has(r10.path)) { + let setClauses = `filename = '${fname}', content = E'\\\\x${hex}', summary = E'${text}', mime_type = '${mime}', size_bytes = ${r10.sizeBytes}, last_update_date = '${sqlStr(lud)}'`; + if (r10.project !== void 0) + setClauses += `, project = '${sqlStr(r10.project)}'`; + if (r10.description !== void 0) + setClauses += `, description = '${sqlStr(r10.description)}'`; + await this.client.query(`UPDATE "${this.table}" SET ${setClauses} WHERE path = '${p22}'`); + } else { + const id = randomUUID2(); + const cols = "id, path, filename, content, summary, mime_type, size_bytes, creation_date, last_update_date" + (r10.project !== void 0 ? ", project" : "") + (r10.description !== void 0 ? ", description" : ""); + const vals = `'${id}', '${p22}', '${fname}', E'\\\\x${hex}', E'${text}', '${mime}', ${r10.sizeBytes}, '${sqlStr(cd)}', '${sqlStr(lud)}'` + (r10.project !== void 0 ? `, '${sqlStr(r10.project)}'` : "") + (r10.description !== void 0 ? `, '${sqlStr(r10.description)}'` : ""); + await this.client.query(`INSERT INTO "${this.table}" (${cols}) VALUES (${vals})`); + this.flushed.add(r10.path); + } } // ── Virtual index.md generation ──────────────────────────────────────────── async generateVirtualIndex() { @@ -67133,6 +67230,40 @@ var DeeplakeFs = class _DeeplakeFs { lines.push(""); return lines.join("\n"); } + // ── batch prefetch ──────────────────────────────────────────────────────── + /** + * Prefetch multiple files into the content cache with a single SQL query. + * Skips paths that are already cached, pending, or session-backed. + * After this call, subsequent readFile() calls for these paths hit cache. + */ + async prefetch(paths) { + const uncached = []; + for (const raw of paths) { + const p22 = normPath(raw); + if (this.files.get(p22) !== null && this.files.get(p22) !== void 0) + continue; + if (this.pending.has(p22)) + continue; + if (this.sessionPaths.has(p22)) + continue; + if (!this.files.has(p22)) + continue; + uncached.push(p22); + } + if (uncached.length === 0) + return; + const inList = uncached.map((p22) => `'${sqlStr(p22)}'`).join(", "); + const rows = await this.client.query(`SELECT path, summary, content FROM "${this.table}" WHERE path IN (${inList})`); + for (const row of rows) { + const p22 = row["path"]; + const text = row["summary"]; + if (text && text.length > 0) { + this.files.set(p22, Buffer.from(text, "utf-8")); + } else if (row["content"] != null) { + this.files.set(p22, decodeContent(row["content"])); + } + } + } // ── IFileSystem: reads ──────────────────────────────────────────────────── async readFileBuffer(path2) { const p22 = normPath(path2); @@ -67180,6 +67311,9 @@ var DeeplakeFs = class _DeeplakeFs { } if (!this.files.has(p22)) throw fsErr("ENOENT", "no such file or directory", p22); + const cached = this.files.get(p22); + if (cached !== null && cached !== void 0) + return cached.toString("utf-8"); const pend = this.pending.get(p22); if (pend) return pend.contentText || pend.content.toString("utf-8"); @@ -67262,6 +67396,7 @@ var DeeplakeFs = class _DeeplakeFs { const addHex = Buffer.from(add, "utf-8").toString("hex"); const ts3 = (/* @__PURE__ */ new Date()).toISOString(); await this.client.query(`UPDATE "${this.table}" SET summary = summary || E'${sqlStr(add)}', content = content || E'\\\\x${addHex}', size_bytes = size_bytes + ${Buffer.byteLength(add, "utf-8")}, last_update_date = '${ts3}' WHERE path = '${sqlStr(p22)}'`); + this.files.set(p22, null); const m26 = this.meta.get(p22); if (m26) { m26.size += Buffer.byteLength(add, "utf-8"); @@ -68455,7 +68590,7 @@ function createGrepCommand(client, fs3, table) { candidates = fs3.getAllPaths().filter((p22) => !p22.endsWith("/")); } candidates = candidates.filter((c15) => targets.some((t6) => t6 === "/" || c15 === t6 || c15.startsWith(t6 + "/"))); - await Promise.all(candidates.map((p22) => fs3.readFile(p22).catch(() => null))); + await fs3.prefetch(candidates); const fixedString = parsed.F || parsed["fixed-strings"]; const ignoreCase = parsed.i || parsed["ignore-case"]; const showLine = parsed.n || parsed["line-number"]; diff --git a/claude-code/tests/deeplake-fs.test.ts b/claude-code/tests/deeplake-fs.test.ts index 8736762..1b0620d 100644 --- a/claude-code/tests/deeplake-fs.test.ts +++ b/claude-code/tests/deeplake-fs.test.ts @@ -39,11 +39,22 @@ function makeClient(seed: Record = {}) { return row ? [{ content: `\\x${row.content.toString("hex")}` }] : []; } // Read: SELECT summary, content FROM ... WHERE path = '...' - if (sql.includes("SELECT summary, content")) { + if (sql.includes("SELECT summary, content") && !sql.includes("IN (")) { const match = sql.match(/path = '([^']+)'/); const row = match ? rows.find(r => r.path === match[1]) : undefined; return row ? [{ summary: row.summary, content: `\\x${row.content.toString("hex")}` }] : []; } + // Prefetch: SELECT path, summary, content FROM ... WHERE path IN (...) + if (sql.includes("SELECT path, summary, content") && sql.includes("IN (")) { + const inMatch = sql.match(/IN \(([^)]+)\)/); + if (inMatch) { + const paths = inMatch[1].split(",").map(s => s.trim().replace(/^'|'$/g, "")); + return rows + .filter(r => paths.includes(r.path)) + .map(r => ({ path: r.path, summary: r.summary, content: `\\x${r.content.toString("hex")}` })); + } + return []; + } // Virtual index: SELECT path, project, description, creation_date, last_update_date FROM ... WHERE path LIKE '/summaries/%' if (sql.includes("SELECT path, project, description, creation_date, last_update_date")) { return rows @@ -527,6 +538,92 @@ describe("getAllPaths", () => { }); }); +// ── prefetch ──────────────────────────────────────────────────────────────── +describe("prefetch", () => { + it("loads multiple uncached files in a single query", async () => { + const { fs, client } = await makeFs({ + "/memory/a.txt": "alpha", + "/memory/b.txt": "bravo", + "/memory/c.txt": "charlie", + }); + client.query.mockClear(); + + await fs.prefetch(["/memory/a.txt", "/memory/b.txt", "/memory/c.txt"]); + + // Should issue exactly one SELECT ... WHERE path IN (...) query + const prefetchCalls = (client.query.mock.calls as [string][]).filter( + c => c[0].includes("SELECT path, summary, content") && c[0].includes("IN (") + ); + expect(prefetchCalls.length).toBe(1); + expect(prefetchCalls[0][0]).toContain("/memory/a.txt"); + expect(prefetchCalls[0][0]).toContain("/memory/b.txt"); + expect(prefetchCalls[0][0]).toContain("/memory/c.txt"); + + // Subsequent readFile and readFileBuffer calls should hit cache (no more queries) + client.query.mockClear(); + expect(await fs.readFile("/memory/a.txt")).toBe("alpha"); + expect(await fs.readFile("/memory/b.txt")).toBe("bravo"); + expect(await fs.readFile("/memory/c.txt")).toBe("charlie"); + expect(client.query).not.toHaveBeenCalled(); + }); + + it("skips already-cached files", async () => { + const { fs, client } = await makeFs({ "/memory/a.txt": "alpha", "/memory/b.txt": "bravo" }); + // Read a.txt to cache it + await fs.readFile("/memory/a.txt"); + client.query.mockClear(); + + await fs.prefetch(["/memory/a.txt", "/memory/b.txt"]); + + // Only b.txt should be in the IN list + const prefetchCalls = (client.query.mock.calls as [string][]).filter( + c => c[0].includes("SELECT path, summary, content") && c[0].includes("IN (") + ); + expect(prefetchCalls.length).toBe(1); + expect(prefetchCalls[0][0]).not.toContain("/memory/a.txt"); + expect(prefetchCalls[0][0]).toContain("/memory/b.txt"); + }); + + it("skips pending (unflushed) files", async () => { + const { fs, client } = await makeFs({}); + await fs.writeFile("/memory/new.txt", "pending content"); + client.query.mockClear(); + + await fs.prefetch(["/memory/new.txt"]); + + // No query should be issued — file is in pending batch + const prefetchCalls = (client.query.mock.calls as [string][]).filter( + c => c[0].includes("SELECT path, summary, content") + ); + expect(prefetchCalls.length).toBe(0); + }); + + it("skips unknown paths not in the file tree", async () => { + const { fs, client } = await makeFs({ "/memory/a.txt": "alpha" }); + client.query.mockClear(); + + await fs.prefetch(["/memory/a.txt", "/memory/nonexistent.txt"]); + + // Only a.txt should be queried, nonexistent is not in the tree + const prefetchCalls = (client.query.mock.calls as [string][]).filter( + c => c[0].includes("SELECT path, summary, content") && c[0].includes("IN (") + ); + expect(prefetchCalls.length).toBe(1); + expect(prefetchCalls[0][0]).toContain("/memory/a.txt"); + expect(prefetchCalls[0][0]).not.toContain("nonexistent"); + }); + + it("is a no-op when all files are cached", async () => { + const { fs, client } = await makeFs({ "/memory/a.txt": "alpha" }); + await fs.readFile("/memory/a.txt"); // cache it + client.query.mockClear(); + + await fs.prefetch(["/memory/a.txt"]); + + expect(client.query).not.toHaveBeenCalled(); + }); +}); + // ── Upsert: id stability & dates ───────────────────────────────────────────── describe("flush upsert", () => { it("INSERT for new file sets id, creation_date and last_update_date", async () => { diff --git a/claude-code/tests/grep-interceptor.test.ts b/claude-code/tests/grep-interceptor.test.ts index 77fa02c..7d100f5 100644 --- a/claude-code/tests/grep-interceptor.test.ts +++ b/claude-code/tests/grep-interceptor.test.ts @@ -127,16 +127,23 @@ describe("grep interceptor", () => { expect(result.stdout).not.toContain("remove match"); }); - it("prefetches candidates into fs content cache before matching", async () => { - const client = makeClient([{ path: "/memory/a.txt" }]); + it("uses batch prefetch instead of per-file reads", async () => { + const client = makeClient([ + { path: "/memory/a.txt" }, + { path: "/memory/b.txt" }, + ]); const fs = await DeeplakeFs.create(client as never, "test", "/memory"); - await fs.writeFile("/memory/a.txt", "cached content"); + await fs.writeFile("/memory/a.txt", "hello world"); + await fs.writeFile("/memory/b.txt", "hello there"); - const readFileSpy = vi.spyOn(fs, "readFile"); + const prefetchSpy = vi.spyOn(fs, "prefetch"); const cmd = createGrepCommand(client as never, fs, "test"); - await cmd.execute(["cached", "/memory"], makeCtx(fs) as never); + await cmd.execute(["hello", "/memory"], makeCtx(fs) as never); - // readFile should have been called for the candidate (prefetch + match) - expect(readFileSpy).toHaveBeenCalledWith("/memory/a.txt"); + // Should call prefetch once with all candidates, not individual readFile calls + expect(prefetchSpy).toHaveBeenCalledTimes(1); + expect(prefetchSpy).toHaveBeenCalledWith( + expect.arrayContaining(["/memory/a.txt", "/memory/b.txt"]) + ); }); }); diff --git a/src/deeplake-api.ts b/src/deeplake-api.ts index 92d8b65..e658cbf 100644 --- a/src/deeplake-api.ts +++ b/src/deeplake-api.ts @@ -4,6 +4,34 @@ import { sqlStr } from "./utils/sql.js"; const log = (msg: string) => _log("sdk", msg); +// ── Retry & concurrency primitives ────────────────────────────────────────── + +const RETRYABLE_CODES = new Set([429, 500, 502, 503, 504]); +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 500; +const MAX_CONCURRENCY = 5; + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +class Semaphore { + private waiting: (() => void)[] = []; + private active = 0; + constructor(private max: number) {} + + async acquire(): Promise { + if (this.active < this.max) { this.active++; return; } + await new Promise(resolve => this.waiting.push(resolve)); + } + + release(): void { + this.active--; + const next = this.waiting.shift(); + if (next) { this.active++; next(); } + } +} + // ── SDK-backed client (ManagedClient for all reads/writes) ─────────────────── export interface WriteRow { @@ -21,6 +49,7 @@ export interface WriteRow { export class DeeplakeApi { private _pendingRows: WriteRow[] = []; + private _sem = new Semaphore(MAX_CONCURRENCY); constructor( private token: string, @@ -30,26 +59,58 @@ export class DeeplakeApi { readonly tableName: string, ) {} - /** Execute SQL and return results as row-objects. */ + /** Execute SQL with retry on transient errors and bounded concurrency. */ async query(sql: string): Promise[]> { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { - method: "POST", - headers: { - Authorization: `Bearer ${this.token}`, - "Content-Type": "application/json", - "X-Activeloop-Org-Id": this.orgId, - }, - body: JSON.stringify({ query: sql }), - }); - if (!resp.ok) { + await this._sem.acquire(); + try { + return await this._queryWithRetry(sql); + } finally { + this._sem.release(); + } + } + + private async _queryWithRetry(sql: string): Promise[]> { + let lastError: Error | undefined; + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + let resp: Response; + try { + resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables/query`, { + method: "POST", + headers: { + Authorization: `Bearer ${this.token}`, + "Content-Type": "application/json", + "X-Activeloop-Org-Id": this.orgId, + }, + body: JSON.stringify({ query: sql }), + }); + } catch (e: unknown) { + // Network-level failure (DNS, TCP reset, timeout, etc.) + lastError = e instanceof Error ? e : new Error(String(e)); + if (attempt < MAX_RETRIES) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log(`query retry ${attempt + 1}/${MAX_RETRIES} (fetch error: ${lastError.message}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } + throw lastError; + } + if (resp.ok) { + const raw = await resp.json() as { columns?: string[]; rows?: unknown[][]; row_count?: number } | null; + if (!raw?.rows || !raw?.columns) return []; + return raw.rows.map(row => + Object.fromEntries(raw.columns!.map((col, i) => [col, row[i]])) + ); + } const text = await resp.text().catch(() => ""); + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + const delay = BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200; + log(`query retry ${attempt + 1}/${MAX_RETRIES} (${resp.status}) in ${delay.toFixed(0)}ms`); + await sleep(delay); + continue; + } throw new Error(`Query failed: ${resp.status}: ${text.slice(0, 200)}`); } - const raw = await resp.json() as { columns?: string[]; rows?: unknown[][]; row_count?: number } | null; - if (!raw?.rows || !raw?.columns) return []; - return raw.rows.map(row => - Object.fromEntries(raw.columns!.map((col, i) => [col, row[i]])) - ); + throw lastError ?? new Error("Query failed: max retries exceeded"); } // ── Writes ────────────────────────────────────────────────────────────────── @@ -118,17 +179,34 @@ export class DeeplakeApi { await this.query(`CREATE INDEX IF NOT EXISTS idx_${sqlStr(column)}_bm25 ON "${this.tableName}" USING deeplake_index ("${column}")`); } - /** List all tables in the workspace. */ + /** List all tables in the workspace (with retry). */ async listTables(): Promise { - const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { - headers: { - Authorization: `Bearer ${this.token}`, - "X-Activeloop-Org-Id": this.orgId, - }, - }); - if (!resp.ok) return []; - const data = await resp.json() as { tables?: { table_name: string }[] }; - return (data.tables ?? []).map(t => t.table_name); + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const resp = await fetch(`${this.apiUrl}/workspaces/${this.workspaceId}/tables`, { + headers: { + Authorization: `Bearer ${this.token}`, + "X-Activeloop-Org-Id": this.orgId, + }, + }); + if (resp.ok) { + const data = await resp.json() as { tables?: { table_name: string }[] }; + return (data.tables ?? []).map(t => t.table_name); + } + if (attempt < MAX_RETRIES && RETRYABLE_CODES.has(resp.status)) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 200); + continue; + } + return []; + } catch { + if (attempt < MAX_RETRIES) { + await sleep(BASE_DELAY_MS * Math.pow(2, attempt)); + continue; + } + return []; + } + } + return []; } /** Create the memory table if it doesn't already exist. Migrate columns on existing tables. */ diff --git a/src/hooks/pre-tool-use.ts b/src/hooks/pre-tool-use.ts index bc51820..b38dc5f 100644 --- a/src/hooks/pre-tool-use.ts +++ b/src/hooks/pre-tool-use.ts @@ -217,21 +217,17 @@ async function main(): Promise { const pattern = (input.tool_input.pattern as string) ?? ""; const ignoreCase = !!input.tool_input["-i"]; log(`direct grep: ${pattern}`); - // Only fetch paths first (fast), then fetch content for matches - const pathRows = await api.query( - `SELECT path FROM "${table}" WHERE summary ${ignoreCase ? "ILIKE" : "LIKE"} '%${sqlStr(pattern)}%' LIMIT 10` + // Single query: fetch path + content together (avoids N+1 round-trips) + const rows = await api.query( + `SELECT path, summary FROM "${table}" WHERE summary ${ignoreCase ? "ILIKE" : "LIKE"} '%${sqlStr(pattern)}%' LIMIT 5` ); - if (pathRows.length > 0) { - // Fetch content for matched files and extract matching lines + if (rows.length > 0) { const allResults: string[] = []; - for (const pr of pathRows.slice(0, 5)) { - const p = pr["path"] as string; - const contentRows = await api.query( - `SELECT summary FROM "${table}" WHERE path = '${sqlStr(p)}' LIMIT 1` - ); - if (!contentRows[0]?.["summary"]) continue; - const text = contentRows[0]["summary"] as string; - const re = new RegExp(pattern.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"), ignoreCase ? "i" : ""); + const re = new RegExp(pattern.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"), ignoreCase ? "i" : ""); + for (const row of rows) { + const p = row["path"] as string; + const text = row["summary"] as string; + if (!text) continue; const matches = text.split("\n") .filter(line => re.test(line)) .slice(0, 5) diff --git a/src/shell/deeplake-fs.ts b/src/shell/deeplake-fs.ts index b1793a3..4191fa9 100644 --- a/src/shell/deeplake-fs.ts +++ b/src/shell/deeplake-fs.ts @@ -114,36 +114,45 @@ export class DeeplakeFs implements IFileSystem { fs.sessionsTable = sessionsTable ?? null; // Ensure the table exists before bootstrapping. await client.ensureTable(); - // Sync table to ensure query engine sees latest writes. - await client.query(`SELECT deeplake_sync_table('${table}')`); - // Bootstrap: load path metadata from memory table. - const sql = `SELECT path, size_bytes, mime_type FROM "${table}" ORDER BY path`; - try { - let rows: Record[]; + + // Sync both tables in parallel before bootstrap queries. + // Track whether session sync succeeded — skip session bootstrap if it failed. + let sessionSyncOk = false; + const syncPromises: Promise[] = [ + client.query(`SELECT deeplake_sync_table('${table}')`), + ]; + if (sessionsTable) { + syncPromises.push( + client.query(`SELECT deeplake_sync_table('${sessionsTable}')`) + .then(() => { sessionSyncOk = true; }) + .catch(() => { /* sessions table may not exist yet */ }) + ); + } + await Promise.all(syncPromises); + + // Bootstrap memory + sessions metadata in parallel. + const memoryBootstrap = (async () => { + const sql = `SELECT path, size_bytes, mime_type FROM "${table}" ORDER BY path`; try { - rows = await client.query(sql); + const rows = await client.query(sql); + for (const row of rows) { + const p = row["path"] as string; + fs.files.set(p, null); + fs.meta.set(p, { + size: Number(row["size_bytes"] ?? 0), + mime: (row["mime_type"] as string) ?? "application/octet-stream", + mtime: new Date(), + }); + fs.addToTree(p); + fs.flushed.add(p); + } } catch { - rows = await client.query(sql); + // Table may not exist yet — start empty } - for (const row of rows) { - const p = row["path"] as string; - fs.files.set(p, null); - fs.meta.set(p, { - size: Number(row["size_bytes"] ?? 0), - mime: (row["mime_type"] as string) ?? "application/octet-stream", - mtime: new Date(), - }); - fs.addToTree(p); - fs.flushed.add(p); - } - } catch { - // Table may not exist yet — start empty - } + })(); - // Bootstrap: load session paths from sessions table (distinct paths only). - if (sessionsTable) { + const sessionsBootstrap = (sessionsTable && sessionSyncOk) ? (async () => { try { - await client.query(`SELECT deeplake_sync_table('${sessionsTable}')`); const sessionRows = await client.query( `SELECT path, SUM(size_bytes) as total_size FROM "${sessionsTable}" GROUP BY path ORDER BY path` ); @@ -163,7 +172,9 @@ export class DeeplakeFs implements IFileSystem { } catch { // Sessions table may not exist yet } - } + })() : Promise.resolve(); + + await Promise.all([memoryBootstrap, sessionsBootstrap]); return fs; } @@ -207,40 +218,56 @@ export class DeeplakeFs implements IFileSystem { const rows = [...this.pending.values()]; this.pending.clear(); - // Upsert: UPDATE existing rows, INSERT new ones. Preserves stable id. - for (const r of rows) { - const hex = r.content.toString("hex"); - const text = esc(r.contentText); - const p = esc(r.path); - const fname = esc(r.filename); - const mime = esc(r.mimeType); - const ts = new Date().toISOString(); - const cd = r.creationDate ?? ts; - const lud = r.lastUpdateDate ?? ts; - if (this.flushed.has(r.path)) { - let setClauses = `filename = '${fname}', content = E'\\\\x${hex}', summary = E'${text}', ` + - `mime_type = '${mime}', size_bytes = ${r.sizeBytes}, last_update_date = '${esc(lud)}'`; - if (r.project !== undefined) setClauses += `, project = '${esc(r.project)}'`; - if (r.description !== undefined) setClauses += `, description = '${esc(r.description)}'`; - await this.client.query( - `UPDATE "${this.table}" SET ${setClauses} WHERE path = '${p}'` - ); - } else { - const id = randomUUID(); - const cols = "id, path, filename, content, summary, mime_type, size_bytes, creation_date, last_update_date" + - (r.project !== undefined ? ", project" : "") + - (r.description !== undefined ? ", description" : ""); - const vals = `'${id}', '${p}', '${fname}', E'\\\\x${hex}', E'${text}', '${mime}', ${r.sizeBytes}, '${esc(cd)}', '${esc(lud)}'` + - (r.project !== undefined ? `, '${esc(r.project)}'` : "") + - (r.description !== undefined ? `, '${esc(r.description)}'` : ""); - await this.client.query( - `INSERT INTO "${this.table}" (${cols}) VALUES (${vals})` - ); - this.flushed.add(r.path); + // Upsert in parallel — the semaphore in DeeplakeApi.query() handles concurrency. + // Re-queue any rows that failed so they are retried on the next flush. + const results = await Promise.allSettled(rows.map(r => this.upsertRow(r))); + let failures = 0; + for (let i = 0; i < results.length; i++) { + if (results[i].status === "rejected") { + // Re-queue for next flush — don't overwrite if the caller wrote a newer version + if (!this.pending.has(rows[i].path)) { + this.pending.set(rows[i].path, rows[i]); + } + failures++; } } - // Sync so subsequent reads see the new data. + // Sync so subsequent reads see the successfully written data. await this.client.query(`SELECT deeplake_sync_table('${this.table}')`); + if (failures > 0) { + throw new Error(`flush: ${failures}/${rows.length} writes failed and were re-queued`); + } + } + + private async upsertRow(r: PendingRow): Promise { + const hex = r.content.toString("hex"); + const text = esc(r.contentText); + const p = esc(r.path); + const fname = esc(r.filename); + const mime = esc(r.mimeType); + const ts = new Date().toISOString(); + const cd = r.creationDate ?? ts; + const lud = r.lastUpdateDate ?? ts; + if (this.flushed.has(r.path)) { + let setClauses = `filename = '${fname}', content = E'\\\\x${hex}', summary = E'${text}', ` + + `mime_type = '${mime}', size_bytes = ${r.sizeBytes}, last_update_date = '${esc(lud)}'`; + if (r.project !== undefined) setClauses += `, project = '${esc(r.project)}'`; + if (r.description !== undefined) setClauses += `, description = '${esc(r.description)}'`; + await this.client.query( + `UPDATE "${this.table}" SET ${setClauses} WHERE path = '${p}'` + ); + } else { + const id = randomUUID(); + const cols = "id, path, filename, content, summary, mime_type, size_bytes, creation_date, last_update_date" + + (r.project !== undefined ? ", project" : "") + + (r.description !== undefined ? ", description" : ""); + const vals = `'${id}', '${p}', '${fname}', E'\\\\x${hex}', E'${text}', '${mime}', ${r.sizeBytes}, '${esc(cd)}', '${esc(lud)}'` + + (r.project !== undefined ? `, '${esc(r.project)}'` : "") + + (r.description !== undefined ? `, '${esc(r.description)}'` : ""); + await this.client.query( + `INSERT INTO "${this.table}" (${cols}) VALUES (${vals})` + ); + this.flushed.add(r.path); + } } // ── Virtual index.md generation ──────────────────────────────────────────── @@ -276,6 +303,40 @@ export class DeeplakeFs implements IFileSystem { return lines.join("\n"); } + // ── batch prefetch ──────────────────────────────────────────────────────── + + /** + * Prefetch multiple files into the content cache with a single SQL query. + * Skips paths that are already cached, pending, or session-backed. + * After this call, subsequent readFile() calls for these paths hit cache. + */ + async prefetch(paths: string[]): Promise { + const uncached: string[] = []; + for (const raw of paths) { + const p = normPath(raw); + if (this.files.get(p) !== null && this.files.get(p) !== undefined) continue; + if (this.pending.has(p)) continue; + if (this.sessionPaths.has(p)) continue; + if (!this.files.has(p)) continue; // unknown path + uncached.push(p); + } + if (uncached.length === 0) return; + + const inList = uncached.map(p => `'${esc(p)}'`).join(", "); + const rows = await this.client.query( + `SELECT path, summary, content FROM "${this.table}" WHERE path IN (${inList})` + ); + for (const row of rows) { + const p = row["path"] as string; + const text = row["summary"] as string; + if (text && text.length > 0) { + this.files.set(p, Buffer.from(text, "utf-8")); + } else if (row["content"] != null) { + this.files.set(p, decodeContent(row["content"])); + } + } + } + // ── IFileSystem: reads ──────────────────────────────────────────────────── async readFileBuffer(path: string): Promise { @@ -335,6 +396,10 @@ export class DeeplakeFs implements IFileSystem { if (!this.files.has(p)) throw fsErr("ENOENT", "no such file or directory", p); + // Content cache (populated by prefetch or prior reads) + const cached = this.files.get(p); + if (cached !== null && cached !== undefined) return cached.toString("utf-8"); + // Pending batch const pend = this.pending.get(p); if (pend) return pend.contentText || pend.content.toString("utf-8"); @@ -434,7 +499,8 @@ export class DeeplakeFs implements IFileSystem { `last_update_date = '${ts}' ` + `WHERE path = '${esc(p)}'` ); - // Update local metadata + // Invalidate content cache so next read fetches fresh data from SQL + this.files.set(p, null); const m = this.meta.get(p); if (m) { m.size += Buffer.byteLength(add, "utf-8"); m.mtime = new Date(ts); } } else { diff --git a/src/shell/grep-interceptor.ts b/src/shell/grep-interceptor.ts index a61483c..354f9a3 100644 --- a/src/shell/grep-interceptor.ts +++ b/src/shell/grep-interceptor.ts @@ -73,8 +73,8 @@ export function createGrepCommand( targets.some(t => t === "/" || c === t || c.startsWith(t + "/")) ); - // ── Phase 2: prefetch into content cache (parallel) ───────────────────── - await Promise.all(candidates.map(p => fs.readFile(p).catch(() => null))); + // ── Phase 2: prefetch into content cache (single batch query) ─────────── + await fs.prefetch(candidates); // ── Phase 3: fine-grained in-memory match ──────────────────────────────── const fixedString = parsed.F || parsed["fixed-strings"];