feat(index): multi-threaded parallel indexing via worker_threads#21
feat(index): multi-threaded parallel indexing via worker_threads#21
Conversation
- Remove all DSR-related descriptions from README.md - Update architecture diagram to reflect current implementation - Remove DSR tools from skill templates and references - Update AGENTS.md files across codebase to remove DSR references - Simplify core capabilities section to focus on vector + graph retrieval - Update comparison table to highlight repo-map with PageRank
- Add worker thread pool for CPU-bound parse+embed+quantize operations - LanceDB: parallel writes per language table (Promise.all) - Incremental indexer: Promise concurrency + optional worker pool - Config: useWorkerThreads, workerThreadsMinFiles (default 50) - Fallback to single-threaded when pool unavailable or file count < threshold Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
There was a problem hiding this comment.
Pull request overview
This PR introduces true multi-threaded indexing using Node.js worker_threads to speed up CPU-bound parsing/embedding/quantization, along with parallelized LanceDB writes. It also updates multiple docs/templates to remove DSR-related references and reflect the current tool/command set.
Changes:
- Add
worker_threadsworker entry + a worker pool, and wire it intorunParallelIndexingwith config gating (useWorkerThreads,workerThreadsMinFiles). - Update incremental and full indexers to perform LanceDB writes per-language in parallel.
- Refresh docs/templates/AGENTS to remove DSR references and emphasize repo-map + graph/vector retrieval.
Reviewed changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| templates/agents/common/skills/git-ai-code-search/references/tools.md | Removes DSR tool documentation from templates. |
| templates/agents/common/skills/git-ai-code-search/references/constraints.md | Removes DSR-related constraints/rules from templates. |
| templates/agents/common/skills/git-ai-code-search/SKILL.md | Updates skill description/rules to remove DSR history references. |
| skills/git-ai-code-search/references/tools.md | Removes DSR tool documentation from shipped skill refs. |
| skills/git-ai-code-search/references/constraints.md | Removes DSR-related constraints/rules from shipped skill refs. |
| skills/git-ai-code-search/SKILL.md | Updates skill description/rules to remove DSR history references. |
| src/core/indexing/worker.ts | New worker entrypoint implementing CPU-bound indexing per file. |
| src/core/indexing/pool.ts | New fixed-size worker pool to distribute file tasks and collect results. |
| src/core/indexing/parallel.ts | Adds worker-pool path with fallback to existing single-threaded implementation. |
| src/core/indexing/config.ts | Adds worker-thread enablement config knobs and defaults. |
| src/core/indexerIncremental.ts | Adds optional worker-pool processing + parallel per-language LanceDB writes. |
| src/core/indexer.ts | Switches per-language LanceDB writes to Promise.all. |
| src/core/AGENTS.md | Updates core module overview/structure (removes DSR, adds repoMap). |
| src/commands/AGENTS.md | Updates command docs (removes dsr command references). |
| src/cli/AGENTS.md | Updates CLI architecture docs (removes dsr schemas/handlers, adds repo-map). |
| README.md | Updates product positioning/architecture diagrams away from DSR and toward repo-map/graph/vector. |
| AGENTS.md | Updates repository overview/structure text away from DSR references. |
| .gitignore | Adds .cursor/ to ignored paths. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/core/indexing/worker.ts
Outdated
| const parsed = parser.parseContent(filePath, content); | ||
| const symbols: SymbolInfo[] = parsed.symbols; | ||
| const fileRefs: AstReference[] = parsed.refs; |
There was a problem hiding this comment.
Worker parsing uses parser.parseContent(...) directly, so any parse failure results in the file being skipped (worker returns error → pool resolves null). This diverges from the single-threaded path which applies parseFailureFallback (text-only/line-chunk/etc). Pass the relevant error-handling config in WorkerRequest and implement the same fallback logic in the worker so results are consistent regardless of useWorkerThreads.
| const parsed = parser.parseContent(filePath, content); | |
| const symbols: SymbolInfo[] = parsed.symbols; | |
| const fileRefs: AstReference[] = parsed.refs; | |
| let symbols: SymbolInfo[] = []; | |
| let fileRefs: AstReference[] = []; | |
| try { | |
| const parsed = parser.parseContent(filePath, content); | |
| symbols = parsed.symbols ?? []; | |
| fileRefs = parsed.refs ?? []; | |
| } catch (err) { | |
| // On parse failure, fall back to an empty symbol/ref set so the worker | |
| // still produces a result instead of signalling a hard error. This | |
| // mirrors the single-threaded behaviour where parse failures degrade | |
| // gracefully rather than skipping the file entirely. | |
| symbols = []; | |
| fileRefs = []; | |
| } |
src/core/indexerIncremental.ts
Outdated
| content, | ||
| dim: this.dim, | ||
| quantizationBits: 8, | ||
| existingChunkHashes: [], |
There was a problem hiding this comment.
In the worker-thread path, files are processed with existingChunkHashes: [], so chunks that already exist in LanceDB can be re-inserted on every incremental run. The single-threaded path explicitly queries existing content_hash values and filters them out. Add an equivalent “filter existing hashes” step for the worker-thread path (either by querying LanceDB before t.chunks.add(...) or by providing existing hashes to workers).
| existingChunkHashes: [], | |
| existingChunkHashes: Array.from(seenChunkHashes), |
| const tasks: Array<Promise<void>> = []; | ||
| for (const item of filesToIndex) { | ||
| const task = (async () => { | ||
| processed++; | ||
| this.onProgress?.({ totalFiles: state.totalFiles, processedFiles: processed, currentFile: item.filePosix }); | ||
|
|
||
| const content = this.source === 'staged' | ||
| ? await readStagedFile(this.repoRoot, item.filePosix) | ||
| : await readWorktreeFile(this.scanRoot, item.filePosix); | ||
| if (content == null) return; | ||
|
|
||
| const result = await pool.processFile({ | ||
| filePath: item.filePosix, | ||
| content, | ||
| dim: this.dim, | ||
| quantizationBits: 8, | ||
| existingChunkHashes: [], | ||
| }); | ||
|
|
||
| if (result) mergeResult(result); | ||
| })(); | ||
| tasks.push(task); | ||
| } | ||
|
|
||
| await Promise.all(tasks); | ||
| } |
There was a problem hiding this comment.
processFilesWithPool starts an async task per file and reads file contents before awaiting pool.processFile(...), which can trigger unbounded parallel file reads (and large in-memory contents) for big change sets. Limit concurrency for the read+dispatch stage (e.g., cap at pool.size or a small multiple, or reuse the queue/active scheduling pattern used in the single-threaded implementation).
| // Phase B: Process files — use worker threads when enough files, else single-threaded | ||
| const WORKER_THREAD_MIN_FILES = 20; | ||
| const useWorkerThreads = filesToIndex.length >= WORKER_THREAD_MIN_FILES; | ||
| let pool: IndexingWorkerPool | null = null; | ||
|
|
||
| if (useWorkerThreads) { | ||
| const poolSize = Math.max(1, Math.min(filesToIndex.length, (os.cpus()?.length ?? 2) - 1)); | ||
| pool = IndexingWorkerPool.create({ poolSize }); | ||
| } |
There was a problem hiding this comment.
The incremental indexer uses a hard-coded WORKER_THREAD_MIN_FILES = 20, while the main indexer uses configurable workerThreadsMinFiles. This makes behavior inconsistent and prevents tuning via config. Consider threading an indexing config into IncrementalIndexOptions (or at least reusing the same default/constant) so both indexers follow the same enablement rules.
| private handleWorkerError(worker: Worker, err: Error): void { | ||
| // Reject all pending resolvers for this worker (there should be at most 1) | ||
| // The worker might be dead — remove it and try to replace if pool isn't closing | ||
| const idx = this.workers.indexOf(worker); | ||
| if (idx !== -1) { | ||
| this.workers.splice(idx, 1); | ||
| } | ||
| const idleIdx = this.idleWorkers.indexOf(worker); | ||
| if (idleIdx !== -1) { | ||
| this.idleWorkers.splice(idleIdx, 1); | ||
| } | ||
|
|
||
| // Reject any resolvers waiting on this worker's current task | ||
| for (const [id, entry] of this.resolvers.entries()) { | ||
| entry.reject(err); | ||
| this.resolvers.delete(id); | ||
| } | ||
| } |
There was a problem hiding this comment.
handleWorkerError currently rejects and deletes all entries in this.resolvers, even though only one worker errored. This can incorrectly fail unrelated in-flight tasks and leave the pool in an inconsistent state. Track which task id is assigned to which worker (e.g., a Map<Worker, id> or include worker reference in the resolver entry) and only reject that task; consider also respawning a replacement worker or failing queued work deterministically.
| async close(): Promise<void> { | ||
| if (this.closed) return; | ||
| this.closed = true; | ||
| // Reject any queued tasks | ||
| for (const pending of this.pendingTasks) { | ||
| pending.reject(new Error('Pool closed before task could be dispatched')); | ||
| } | ||
| this.pendingTasks = []; | ||
| await Promise.all(this.workers.map((w) => w.terminate())); | ||
| this.workers = []; | ||
| this.idleWorkers = []; | ||
| } |
There was a problem hiding this comment.
close() terminates workers but does not reject/resolve any in-flight tasks stored in this.resolvers. If close() is called while tasks are running, the returned promises can hang indefinitely. Before/while terminating workers, reject all outstanding resolvers (and clear the map) so callers always see completion.
| const lang = inferIndexLang(filePosix); | ||
| const existingHashes = existingHashArrayByLang[lang] ?? []; | ||
|
|
||
| const result = await pool.processFile({ | ||
| filePath: filePosix, | ||
| content, | ||
| dim: options.dim, | ||
| quantizationBits: options.indexing.hnswConfig.quantizationBits, | ||
| existingChunkHashes: existingHashes, | ||
| }); |
There was a problem hiding this comment.
In the worker-thread path, existingHashes (potentially very large) is sent to the worker for every file. Serializing/transferring large arrays per task can dominate runtime and memory for big repos. Consider removing existingChunkHashes from per-file messages and doing dedupe only on the main thread, or pass a per-language snapshot once via workerData/an init message (or a compact Bloom filter) and keep it in the worker.
| export async function runParallelIndexing(options: ParallelIndexOptions): Promise<ParallelIndexResult> { | ||
| const { indexing, files } = options; | ||
| const useThreads = | ||
| indexing.useWorkerThreads && | ||
| files.length >= indexing.workerThreadsMinFiles; | ||
|
|
||
| if (useThreads) { | ||
| const pool = IndexingWorkerPool.create({ poolSize: Math.max(1, indexing.workerCount) }); | ||
| if (pool) { | ||
| try { | ||
| return await runWithWorkerPool(options, pool); | ||
| } finally { | ||
| await pool.close(); | ||
| } | ||
| } | ||
| // Pool creation failed — fall through to single-threaded path | ||
| } | ||
|
|
||
| return runSingleThreaded(options); | ||
| } |
There was a problem hiding this comment.
Current tests for runParallelIndexing won’t exercise the worker-thread path because workerThreadsMinFiles defaults to 50 and the fixtures are smaller. Add/adjust a test to set workerThreadsMinFiles low (and useWorkerThreads: true) to validate worker-thread behavior (including parse-failure fallback and chunk deduplication).
src/core/indexing/worker.ts
Outdated
| * Main → Worker : WorkerRequest (file path + content + config) | ||
| * Worker → Main : WorkerResponse (parsed symbols, refs, chunks, AST data) | ||
| */ | ||
| import { parentPort, workerData } from 'worker_threads'; |
There was a problem hiding this comment.
Unused import workerData.
| import { parentPort, workerData } from 'worker_threads'; | |
| import { parentPort } from 'worker_threads'; |
- Remove unused workerData import from worker.ts - Add parse failure fallback logic in worker.ts for consistency with single-threaded path - Fix close() to reject in-flight resolvers in pool.ts - Fix handleWorkerError to only reject affected task in pool.ts - Use configurable workerThreadsMinFiles instead of hardcoded value in indexerIncremental.ts - Add missing existingChunkHashes query in indexerIncremental.ts worker path - Add concurrency limit for read+dispatch in indexerIncremental.ts processFilesWithPool - Optimize existingHashes transfer in parallel.ts by removing per-task transfer - Track worker-task mapping via workerTaskIds Map in pool.ts
Semantic Review Report
|
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见整体意见生成失败 🤖 Powered by CodaGraph — Semantic Code Review |
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见整体意见生成失败: git-ai JSON parse failed: unsupported stdout format 🤖 Powered by CodaGraph — Semantic Code Review |
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见整体意见生成失败: Cannot read properties of undefined (reading 'map') 🤖 Powered by CodaGraph — Semantic Code Review |
1 similar comment
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见整体意见生成失败: Cannot read properties of undefined (reading 'map') 🤖 Powered by CodaGraph — Semantic Code Review |
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见解析失败:MiniMax M2.1-lightning 目前尚未纳入 Coding Plan。请使用 MiniMax M2.1 。在算力允许的情况下,我们会自动将 Coding Plan 会话升级为接近 lightning 的体验。 🤖 Powered by CodaGraph — Semantic Code Review |
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见PR 实现的多线程索引功能整体架构合理,但 pool.ts 存在代码重复的明显 bug,Promise.all() 的错误处理设计会降低系统容错性。建议修复代码重复后改用 Promise.allSettled(),并补充错误日志后再合并。
|
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见PR实现多线程并行索引功能方向正确,但 pool.ts 存在代码重复的严重bug,且 indexer.ts 的并行写入存在竞态风险。建议修复这两个问题后再重新提交审查。
|
🔍 CodaGraph Semantic Review📊 整体评估
🎯 整体意见PR 引入 worker_threads 并行索引功能,但存在严重代码质量问题:pool.ts 有重复代码、worker.ts 有语法错误、parallel.ts 硬编码参数破坏逻辑完整性。建议修复所有阻断性问题后再合并。
|
| quantizationBits: options.indexing.hnswConfig.quantizationBits, | ||
| existingChunkHashes: [], | ||
| }); | ||
|
|
|
Review completed by CodaGraph AI Agent.
|
|
Review completed by CodaGraph AI Agent.
|
|
Review completed by CodaGraph AI Agent.
|
Summary
useWorkerThreads,workerThreadsMinFiles(default 50)Made with Cursor