From 88001c88946e29590a62d4bef5746bfd0952b624 Mon Sep 17 00:00:00 2001 From: mars Date: Fri, 6 Feb 2026 01:40:46 +0800 Subject: [PATCH 1/4] docs: sync documentation with DSR removal from no-dsr branch - Remove all DSR-related descriptions from README.md - Update architecture diagram to reflect current implementation - Remove DSR tools from skill templates and references - Update AGENTS.md files across codebase to remove DSR references - Simplify core capabilities section to focus on vector + graph retrieval - Update comparison table to highlight repo-map with PageRank --- AGENTS.md | 17 ++-- README.md | 80 ++++++++----------- skills/git-ai-code-search/SKILL.md | 4 +- .../references/constraints.md | 31 ------- skills/git-ai-code-search/references/tools.md | 55 ------------- src/cli/AGENTS.md | 17 ++-- src/commands/AGENTS.md | 2 - src/core/AGENTS.md | 6 +- .../common/skills/git-ai-code-search/SKILL.md | 4 +- .../references/constraints.md | 31 ------- .../git-ai-code-search/references/tools.md | 55 ------------- 11 files changed, 51 insertions(+), 251 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 10297ba..5d060b1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -5,7 +5,7 @@ **Branch:** refactor/cli-commands-architecture ## OVERVIEW -git-ai CLI + MCP server. TypeScript implementation for AI-powered Git operations with semantic search, DSR (Deterministic Semantic Record), and graph-based code analysis. Indices stored in `.git-ai/`. +git-ai CLI + MCP server. TypeScript implementation for AI-powered Git operations with semantic search and graph-based code analysis. Indices stored in `.git-ai/`. ## STRUCTURE ``` @@ -13,17 +13,17 @@ git-ai-cli-v2/ ├── src/ │ ├── cli/ # CLI command architecture (NEW: registry + handlers + schemas) │ │ ├── types.ts # Core types, executeHandler -│ │ ├── registry.ts # Handler registry (24 commands) +│ │ ├── registry.ts # Handler registry (20 commands) │ │ ├── helpers.ts # Shared utilities │ │ ├── schemas/ # Zod validation schemas │ │ ├── handlers/ # Business logic handlers │ │ └── commands/ # Commander.js wrappers │ ├── commands/ # Command aggregator (ai.ts only) -│ ├── core/ # Indexing, DSR, graph, storage, parsers +│ ├── core/ # Indexing, graph, storage, parsers │ └── mcp/ # MCP server implementation ├── test/ # Node test runner tests ├── dist/ # Build output -└── .git-ai/ # Indices (LanceDB + DSR) +└── .git-ai/ # Indices (LanceDB) ``` ## WHERE TO LOOK @@ -32,12 +32,12 @@ git-ai-cli-v2/ | CLI commands | `src/cli/commands/*.ts` (new architecture) | | CLI handlers | `src/cli/handlers/*.ts` (business logic) | | CLI schemas | `src/cli/schemas/*.ts` (Zod validation) | -| Handler registry | `src/cli/registry.ts` (all 24 commands) | +| Handler registry | `src/cli/registry.ts` (all 20 commands) | | Command aggregator | `src/commands/ai.ts` (entry point) | | Indexing logic | `src/core/indexer.ts`, `src/core/indexerIncremental.ts` | -| DSR (commit records) | `src/core/dsr/`, `src/core/dsr.ts` | | Graph queries | `src/core/cozo.ts`, `src/core/astGraph.ts` | | Semantic search | `src/core/semantic.ts`, `src/core/sq8.ts` | +| Repo map | `src/core/repoMap.ts` | | MCP tools | `src/mcp/`, `src/core/graph.ts` | | Language parsers | `src/core/parser/*.ts` | @@ -47,9 +47,9 @@ git-ai-cli-v2/ | `indexer` | fn | `core/indexer.ts` | Full repository indexing | | `incrementalIndexer` | fn | `core/indexerIncremental.ts` | Incremental updates | | `GitAiService` | class | `mcp/index.ts` | MCP entry point | -| `runDsr` | fn | `commands/dsr.ts` | DSR CLI command | | `cozoQuery` | fn | `core/cozo.ts` | Graph DB queries | | `semanticSearch` | fn | `core/semantic.ts` | Vector similarity | +| `repoMap` | fn | `core/repoMap.ts` | PageRank-based repo overview | | `resolveGitRoot` | fn | `core/git.ts` | Repo boundary detection | ## CONVENTIONS @@ -69,8 +69,8 @@ git-ai-cli-v2/ ## UNIQUE STYLES - `.git-ai/` directory for all index data (not config files) - MCP tools require explicit `path` argument -- DSR files per commit for reproducible queries - Multi-language parser architecture (TS, Go, Rust, Python, C, Markdown, YAML) +- PageRank-based repo-map for code importance scoring ## COMMANDS ```bash @@ -84,5 +84,4 @@ node dist/bin/git-ai.js --help # Validate packaged output ## NOTES - Indices auto-update on git operations - `checkIndex` gates symbol/semantic/graph queries -- DSR commit hash mismatch with HEAD triggers warning - MCP server exposes git-ai tools for external IDEs diff --git a/README.md b/README.md index dd8896a..2154118 100644 --- a/README.md +++ b/README.md @@ -39,12 +39,12 @@ npm install -g @mars167/git-ai **Code semantics should be versioned and traceable, just like code itself** -git-ai is a local code understanding tool that builds a traceable semantic layer for your codebase using DSR (Deterministic Semantic Record) and Hyper RAG, enabling AI Agents and developers to truly understand code evolution and relationships. +git-ai is a local code understanding tool that builds a semantic layer for your codebase using advanced RAG techniques, enabling AI Agents and developers to deeply understand code structure and relationships. ### ✨ Why git-ai? -- **🔗 Hyper RAG**: Combines vector retrieval + graph retrieval + DSR for multi-dimensional semantic understanding -- **📜 Versioned Semantics**: Every commit has a semantic snapshot, historical changes are clear and traceable +- **🔗 Advanced RAG**: Combines vector retrieval + graph retrieval for multi-dimensional semantic understanding +- **📊 Fast & Accurate**: Optimized repo-map with PageRank-based importance scoring - **🔄 Always Available**: Indices travel with code, available immediately after checkout, no rebuild needed - **🤖 AI-Native**: MCP Server enables Claude, Trae and other Agents to deeply understand your codebase - **🔒 Fully Local**: Code never leaves your machine, secure and private @@ -80,19 +80,7 @@ git-ai ai graph callees authenticateUser git-ai ai graph chain authenticateUser --max-depth 3 ``` -### 3️⃣ Historical Change Tracing - -Track symbol evolution through DSR: - -```bash -# View function's historical changes -git-ai ai dsr query symbol-evolution authenticateUser --limit 50 - -# View complete semantic snapshot for a commit -git-ai ai dsr context -``` - -### 4️⃣ Multi-Language Support +### 3️⃣ Multi-Language Support Supports multiple mainstream programming languages: @@ -112,18 +100,14 @@ Supports multiple mainstream programming languages: ## 💡 Design Philosophy -git-ai is not just a search tool, but a "semantic timeline" for your codebase: - -### DSR (Deterministic Semantic Record) - -Each commit corresponds to an immutable semantic snapshot, recording the code structure, symbol relationships, and design intent at that time. Code semantics should be versioned—just like code itself—traceable, comparable, and evolvable. +git-ai is built for deep code understanding through multiple retrieval strategies: -### Hyper RAG +### Advanced RAG Combines multiple retrieval methods for deeper understanding: -- **Vector Retrieval**: Semantic similarity matching -- **Graph Retrieval**: Call relationship, inheritance analysis -- **DSR Retrieval**: Historical evolution tracing +- **Vector Retrieval**: Semantic similarity matching using SQ8 quantized embeddings +- **Graph Retrieval**: Call relationship and dependency analysis via AST graphs +- **Intelligent Fusion**: Weighted combination of retrieval strategies for optimal results ### Decentralized Semantics @@ -161,10 +145,10 @@ git-ai ai graph chain processOrder --max-depth 5 # Find all callers git-ai ai graph callers deprecatedFunction -# Trace historical changes, understand design intent -git-ai ai dsr query symbol-evolution deprecatedFunction --all +# Analyze complete call chain +git-ai ai graph chain deprecatedFunction --direction upstream ``` -*DSR traces historical changes, understanding design intent* +*Graph analysis reveals complete impact scope* ### Scenario 3: Bug Localization and Root Cause Analysis @@ -195,30 +179,30 @@ Claude will automatically invoke git-ai tools to provide deep analysis. *Enablin ```mermaid graph TB - A[Git Repository] -->|On Commit| B[DSR\nDeterministic Semantic Record] - B --> C[.git-ai/dsr/commit.json\nSemantic Snapshot] - C -->|Index Rebuild| D[LanceDB\nVector Database] - C -->|Index Rebuild| E[CozoDB\nGraph Database] - D --> F[MCP Server] - E --> F - F -->|Tool Call| G[AI Agent\nClaude Desktop / Trae] - F -->|CLI| H[Developer] - C -->|Cross-Version| I[Semantic Timeline\nTraceable · Comparable · Evolvable] + A[Git Repository] -->|Index| B[Code Parser\nMulti-Language AST] + B --> C[LanceDB\nVector Database] + B --> D[CozoDB\nGraph Database] + C --> E[MCP Server] + D --> E + E -->|Tool Call| F[AI Agent\nClaude Desktop / Cursor] + E -->|CLI| G[Developer] + B -->|Repo Map| H[PageRank Analysis\nImportance Scoring] + H --> E style B fill:#e1f5ff,stroke:#333 - style C fill:#e8f5e9,stroke:#333 + style C fill:#fff4e1,stroke:#333 style D fill:#fff4e1,stroke:#333 - style E fill:#fff4e1,stroke:#333 - style F fill:#e8f5e9,stroke:#333 - style G fill:#f3e5f5,stroke:#333 - style I fill:#fce4ec,stroke:#333 + style E fill:#e8f5e9,stroke:#333 + style F fill:#f3e5f5,stroke:#333 + style H fill:#fce4ec,stroke:#333 ``` **Core Components**: -- **DSR (Deterministic Semantic Record)**: Immutable semantic snapshots stored per commit, versioned semantics -- **LanceDB + SQ8**: High-performance vector database, supporting semantic search -- **CozoDB**: Graph database, supporting AST-level relationship queries -- **MCP Server**: Standard protocol interface, for AI Agent invocation +- **Code Parser**: Multi-language AST extraction (TypeScript, Java, Python, Go, Rust, C, Markdown, YAML) +- **LanceDB + SQ8**: High-performance vector database with quantized embeddings for semantic search +- **CozoDB**: Graph database for AST-level relationship queries (callers, callees, chains) +- **Repo Map**: PageRank-based code importance analysis for project overview +- **MCP Server**: Standard protocol interface for AI Agent invocation --- @@ -228,12 +212,12 @@ graph TB |---------|--------|-------------------|-------------| | Local Execution | ✅ | ❌ | ❌ | | AST-Level Analysis | ✅ | ❌ | ✅ | -| Versioned Semantics | ✅ | ❌ | ❌ | -| Historical Change Tracing | ✅ | ❌ | ❌ | | AI Agent Integration | ✅ | ❌ | ❌ | | Free & Open Source | ✅ | ❌ | ❌ | | Semantic Search | ✅ | ✅ | ✅ | | Call Chain Analysis | ✅ | ❌ | ✅ | +| Multi-Language Support | ✅ | ✅ | ✅ | +| Repo Map with PageRank | ✅ | ❌ | ❌ | --- diff --git a/skills/git-ai-code-search/SKILL.md b/skills/git-ai-code-search/SKILL.md index 52127c7..c0a4f2d 100644 --- a/skills/git-ai-code-search/SKILL.md +++ b/skills/git-ai-code-search/SKILL.md @@ -1,7 +1,7 @@ --- name: git-ai-code-search description: | - Semantic code search and codebase understanding using git-ai MCP tools. Use when: (1) Searching for symbols, functions, or semantic concepts, (2) Understanding project architecture, (3) Analyzing call graphs and code relationships, (4) Tracking symbol history via DSR. Triggers: "find X", "search for X", "who calls X", "where is X", "history of X", "understand this codebase". + Semantic code search and codebase understanding using git-ai MCP tools. Use when: (1) Searching for symbols, functions, or semantic concepts, (2) Understanding project architecture, (3) Analyzing call graphs and code relationships. Triggers: "find X", "search for X", "who calls X", "where is X", "understand this codebase". --- # git-ai Code Search @@ -33,7 +33,6 @@ git-ai ai semantic "authentication logic" # search | Who calls X | `ast_graph_callers` | `{ path, name: "processOrder" }` | | What X calls | `ast_graph_callees` | `{ path, name: "processOrder" }` | | Call chain | `ast_graph_chain` | `{ path, name: "main", direction: "downstream" }` | -| Symbol history | `dsr_symbol_evolution` | `{ path, symbol: "UserService" }` | | Project overview | `repo_map` | `{ path, max_files: 20 }` | ## Rules @@ -41,7 +40,6 @@ git-ai ai semantic "authentication logic" # search 1. **Always pass `path`** - Every tool requires explicit repository path 2. **Check index first** - Run `check_index` before search tools 3. **Read before modify** - Use `read_file` to understand code before changes -4. **Use DSR for history** - Never parse git log manually ## References diff --git a/skills/git-ai-code-search/references/constraints.md b/skills/git-ai-code-search/references/constraints.md index 3f49663..1c3ca53 100644 --- a/skills/git-ai-code-search/references/constraints.md +++ b/skills/git-ai-code-search/references/constraints.md @@ -50,12 +50,6 @@ search_symbols({ path: "/repo", query: "..." }) **Why:** Prevents breaking changes and ensures informed modifications. -### 4. use_dsr_for_history - -**Rule:** When tracing symbol history, MUST use `dsr_symbol_evolution`. NEVER manually parse git log or diff. - -**Why:** DSR provides structured, semantic change information that raw git commands don't. - ## Warning-Level Rules ### 5. repo_map_before_large_change @@ -64,15 +58,6 @@ search_symbols({ path: "/repo", query: "..." }) **Why:** Provides context for planning changes and identifying affected areas. -### 6. respect_dsr_risk - -**Rule:** When DSR reports `risk_level: high`, exercise extra caution. Operations like `delete` and `rename` require additional review. - -**Risk levels:** -- `low`: Safe, routine changes -- `medium`: Review recommended -- `high`: Extra scrutiny required - ## Recommended Practices ### prefer_semantic_search @@ -103,26 +88,13 @@ ast_graph_callers({ path: "/repo", name: result.name }) // etc. ``` -### incremental_dsr_generation - -Generate DSR on-demand for specific commits rather than batch-generating for entire history. - -```js -// Good: Generate for specific commit when needed -dsr_generate({ path: "/repo", commit: "abc123" }) - -// Avoid: Generating for all historical commits upfront -``` - ## Prohibited Actions | Action | Reason | |--------|--------| | Assume symbol location without searching | Always confirm via search | | Modify unread files | Must read and understand first | -| Manual git log parsing for history | Use DSR tools instead | | Search with missing index | Rebuild index first | -| Ignore high risk DSR warnings | Requires extra review | | Omit `path` parameter | Every call must be explicit | ## Tool-Specific Constraints @@ -137,7 +109,4 @@ dsr_generate({ path: "/repo", commit: "abc123" }) | `ast_graph_callers` | `check_index` passed | `path`, `name` | | `ast_graph_callees` | `check_index` passed | `path`, `name` | | `ast_graph_chain` | `check_index` passed | `path`, `name` | -| `dsr_context` | None | `path` | -| `dsr_generate` | None | `path`, `commit` | -| `dsr_symbol_evolution` | DSR exists for commits | `path`, `symbol` | | `read_file` | None | `path`, `file` | diff --git a/skills/git-ai-code-search/references/tools.md b/skills/git-ai-code-search/references/tools.md index 42106ae..12db14a 100644 --- a/skills/git-ai-code-search/references/tools.md +++ b/skills/git-ai-code-search/references/tools.md @@ -178,61 +178,6 @@ ast_graph_refs({ }) ``` -## DSR Tools (Deterministic Semantic Records) - -### dsr_context - -Get repository Git context and DSR directory state. - -```js -dsr_context({ path: "/repo" }) -``` - -**Returns:** Branch info, commit status, DSR availability. - -### dsr_generate - -Generate DSR for a specific commit. - -```js -dsr_generate({ - path: "/repo", - commit: "HEAD" // or specific commit hash -}) -``` - -**When to use:** Before querying history for commits without DSR. - -### dsr_symbol_evolution - -Track how a symbol changed over time. - -```js -dsr_symbol_evolution({ - path: "/repo", - symbol: "authenticateUser", - limit: 50, - contains: false, // true for substring match - all: false // true to traverse all refs, not just HEAD -}) -``` - -**Returns:** List of changes with: -- `commit`: Commit hash -- `operation`: add | modify | delete | rename -- `risk_level`: low | medium | high -- `details`: Change description - -**When to use:** Understanding design evolution, finding when/why something changed. - -### dsr_rebuild_index - -Rebuild DSR index from DSR files. - -```js -dsr_rebuild_index({ path: "/repo" }) -``` - ## File Operations ### read_file diff --git a/src/cli/AGENTS.md b/src/cli/AGENTS.md index 4125be1..e7e520d 100644 --- a/src/cli/AGENTS.md +++ b/src/cli/AGENTS.md @@ -9,14 +9,14 @@ Refactored CLI layer with handler registry, Zod validation, and modular design. ``` cli/ ├── types.ts # Core types: CLIResult, CLIError, CLIHandler, executeHandler -├── registry.ts # Handler registry with all 24 commands +├── registry.ts # Handler registry with all 20 commands ├── helpers.ts # Shared utilities: resolveRepoContext, validateIndex, formatError ├── schemas/ # Zod schemas for all commands │ ├── graphSchemas.ts │ ├── indexSchemas.ts │ ├── semanticSchemas.ts │ ├── querySchemas.ts -│ ├── dsrSchemas.ts +│ ├── repoMapSchema.ts │ ├── statusSchemas.ts │ ├── archiveSchemas.ts │ ├── hooksSchemas.ts @@ -26,7 +26,7 @@ cli/ │ ├── indexHandlers.ts │ ├── semanticHandlers.ts │ ├── queryHandlers.ts -│ ├── dsrHandlers.ts +│ ├── repoMapHandler.ts │ ├── statusHandlers.ts │ ├── archiveHandlers.ts │ ├── hooksHandlers.ts @@ -36,7 +36,7 @@ cli/ ├── indexCommand.ts ├── semanticCommand.ts ├── queryCommand.ts - ├── dsrCommands.ts + ├── repoMapCommand.ts ├── statusCommands.ts ├── archiveCommands.ts ├── hooksCommands.ts @@ -131,16 +131,11 @@ const handlers: Record> = { - `graph:callees` - Find callees - `graph:chain` - Compute call chain -### Core Commands (3 commands) +### Core Commands (4 commands) - `index` - Build repository index - `semantic` - Semantic search - `query` - Symbol search - -### DSR Commands (4 subcommands) -- `dsr:context` - Get DSR directory state -- `dsr:generate` - Generate DSR for commit -- `dsr:rebuild-index` - Rebuild DSR index -- `dsr:query` - Semantic queries over Git DAG +- `repo-map` - Generate repository map with PageRank ### Status Commands (2 commands) - `checkIndex` - Check index status (deprecated) diff --git a/src/commands/AGENTS.md b/src/commands/AGENTS.md index 3800f07..4ec5d9f 100644 --- a/src/commands/AGENTS.md +++ b/src/commands/AGENTS.md @@ -12,7 +12,6 @@ commands/ ├── graph.ts # Graph exploration ├── query.ts # Symbol search ├── semantic.ts # Semantic search -├── dsr.ts # DSR operations ├── index.ts # Index management ├── status.ts # Repo status ├── pack.ts # Index packing @@ -27,7 +26,6 @@ commands/ | Query with AI | `ai.ts` | | Graph traversal | `graph.ts` | | Symbol lookup | `query.ts` | -| Commit history | `dsr.ts` | ## CONVENTIONS (deviations from root) - All commands: `try/catch`, log `{ ok: false, err }`, exit(1) diff --git a/src/core/AGENTS.md b/src/core/AGENTS.md index 888dba0..d4191fe 100644 --- a/src/core/AGENTS.md +++ b/src/core/AGENTS.md @@ -3,14 +3,14 @@ **Core indexing, graph, storage, and parser modules.** ## OVERVIEW -Indexing engine: LanceDB storage, Cozo graph DB, DSR records, multi-language parsers. +Indexing engine: LanceDB storage, Cozo graph DB, multi-language parsers. ## STRUCTURE ``` core/ ├── indexer.ts, indexerIncremental.ts # Indexing orchestration ├── cozo.ts, astGraph.ts # Graph DB + AST queries -├── dsr/ # Deterministic Semantic Records +├── repoMap.ts # PageRank-based repository map ├── parser/ # Language parsers (TS, Go, Rust, Python, C, MD, YAML) ├── lancedb.ts # Vector storage (SQ8) ├── semantic.ts, sq8.ts # Semantic search @@ -23,7 +23,7 @@ core/ | Full index | `indexer.ts` | | Incremental update | `indexerIncremental.ts` | | Graph queries | `cozo.ts` (CozoScript), `astGraph.ts` | -| DSR read/write | `dsr/`, `dsr.ts` | +| Repo map | `repoMap.ts` | | Language parsing | `parser/adapter.ts`, `parser/typescript.ts`, etc. | | Vector search | `lancedb.ts`, `semantic.ts` | diff --git a/templates/agents/common/skills/git-ai-code-search/SKILL.md b/templates/agents/common/skills/git-ai-code-search/SKILL.md index 52127c7..c0a4f2d 100644 --- a/templates/agents/common/skills/git-ai-code-search/SKILL.md +++ b/templates/agents/common/skills/git-ai-code-search/SKILL.md @@ -1,7 +1,7 @@ --- name: git-ai-code-search description: | - Semantic code search and codebase understanding using git-ai MCP tools. Use when: (1) Searching for symbols, functions, or semantic concepts, (2) Understanding project architecture, (3) Analyzing call graphs and code relationships, (4) Tracking symbol history via DSR. Triggers: "find X", "search for X", "who calls X", "where is X", "history of X", "understand this codebase". + Semantic code search and codebase understanding using git-ai MCP tools. Use when: (1) Searching for symbols, functions, or semantic concepts, (2) Understanding project architecture, (3) Analyzing call graphs and code relationships. Triggers: "find X", "search for X", "who calls X", "where is X", "understand this codebase". --- # git-ai Code Search @@ -33,7 +33,6 @@ git-ai ai semantic "authentication logic" # search | Who calls X | `ast_graph_callers` | `{ path, name: "processOrder" }` | | What X calls | `ast_graph_callees` | `{ path, name: "processOrder" }` | | Call chain | `ast_graph_chain` | `{ path, name: "main", direction: "downstream" }` | -| Symbol history | `dsr_symbol_evolution` | `{ path, symbol: "UserService" }` | | Project overview | `repo_map` | `{ path, max_files: 20 }` | ## Rules @@ -41,7 +40,6 @@ git-ai ai semantic "authentication logic" # search 1. **Always pass `path`** - Every tool requires explicit repository path 2. **Check index first** - Run `check_index` before search tools 3. **Read before modify** - Use `read_file` to understand code before changes -4. **Use DSR for history** - Never parse git log manually ## References diff --git a/templates/agents/common/skills/git-ai-code-search/references/constraints.md b/templates/agents/common/skills/git-ai-code-search/references/constraints.md index 3f49663..1c3ca53 100644 --- a/templates/agents/common/skills/git-ai-code-search/references/constraints.md +++ b/templates/agents/common/skills/git-ai-code-search/references/constraints.md @@ -50,12 +50,6 @@ search_symbols({ path: "/repo", query: "..." }) **Why:** Prevents breaking changes and ensures informed modifications. -### 4. use_dsr_for_history - -**Rule:** When tracing symbol history, MUST use `dsr_symbol_evolution`. NEVER manually parse git log or diff. - -**Why:** DSR provides structured, semantic change information that raw git commands don't. - ## Warning-Level Rules ### 5. repo_map_before_large_change @@ -64,15 +58,6 @@ search_symbols({ path: "/repo", query: "..." }) **Why:** Provides context for planning changes and identifying affected areas. -### 6. respect_dsr_risk - -**Rule:** When DSR reports `risk_level: high`, exercise extra caution. Operations like `delete` and `rename` require additional review. - -**Risk levels:** -- `low`: Safe, routine changes -- `medium`: Review recommended -- `high`: Extra scrutiny required - ## Recommended Practices ### prefer_semantic_search @@ -103,26 +88,13 @@ ast_graph_callers({ path: "/repo", name: result.name }) // etc. ``` -### incremental_dsr_generation - -Generate DSR on-demand for specific commits rather than batch-generating for entire history. - -```js -// Good: Generate for specific commit when needed -dsr_generate({ path: "/repo", commit: "abc123" }) - -// Avoid: Generating for all historical commits upfront -``` - ## Prohibited Actions | Action | Reason | |--------|--------| | Assume symbol location without searching | Always confirm via search | | Modify unread files | Must read and understand first | -| Manual git log parsing for history | Use DSR tools instead | | Search with missing index | Rebuild index first | -| Ignore high risk DSR warnings | Requires extra review | | Omit `path` parameter | Every call must be explicit | ## Tool-Specific Constraints @@ -137,7 +109,4 @@ dsr_generate({ path: "/repo", commit: "abc123" }) | `ast_graph_callers` | `check_index` passed | `path`, `name` | | `ast_graph_callees` | `check_index` passed | `path`, `name` | | `ast_graph_chain` | `check_index` passed | `path`, `name` | -| `dsr_context` | None | `path` | -| `dsr_generate` | None | `path`, `commit` | -| `dsr_symbol_evolution` | DSR exists for commits | `path`, `symbol` | | `read_file` | None | `path`, `file` | diff --git a/templates/agents/common/skills/git-ai-code-search/references/tools.md b/templates/agents/common/skills/git-ai-code-search/references/tools.md index 42106ae..12db14a 100644 --- a/templates/agents/common/skills/git-ai-code-search/references/tools.md +++ b/templates/agents/common/skills/git-ai-code-search/references/tools.md @@ -178,61 +178,6 @@ ast_graph_refs({ }) ``` -## DSR Tools (Deterministic Semantic Records) - -### dsr_context - -Get repository Git context and DSR directory state. - -```js -dsr_context({ path: "/repo" }) -``` - -**Returns:** Branch info, commit status, DSR availability. - -### dsr_generate - -Generate DSR for a specific commit. - -```js -dsr_generate({ - path: "/repo", - commit: "HEAD" // or specific commit hash -}) -``` - -**When to use:** Before querying history for commits without DSR. - -### dsr_symbol_evolution - -Track how a symbol changed over time. - -```js -dsr_symbol_evolution({ - path: "/repo", - symbol: "authenticateUser", - limit: 50, - contains: false, // true for substring match - all: false // true to traverse all refs, not just HEAD -}) -``` - -**Returns:** List of changes with: -- `commit`: Commit hash -- `operation`: add | modify | delete | rename -- `risk_level`: low | medium | high -- `details`: Change description - -**When to use:** Understanding design evolution, finding when/why something changed. - -### dsr_rebuild_index - -Rebuild DSR index from DSR files. - -```js -dsr_rebuild_index({ path: "/repo" }) -``` - ## File Operations ### read_file From 8a3d396d067c16e9789960b1fb4dc3f89f86b1ac Mon Sep 17 00:00:00 2001 From: mars Date: Fri, 6 Feb 2026 23:52:30 +0800 Subject: [PATCH 2/4] feat(index): multi-threaded parallel indexing via worker_threads - Add worker thread pool for CPU-bound parse+embed+quantize operations - LanceDB: parallel writes per language table (Promise.all) - Incremental indexer: Promise concurrency + optional worker pool - Config: useWorkerThreads, workerThreadsMinFiles (default 50) - Fallback to single-threaded when pool unavailable or file count < threshold Co-authored-by: Cursor --- src/core/indexer.ts | 7 +- src/core/indexerIncremental.ts | 374 +++++++++++++++++++++++---------- src/core/indexing/config.ts | 6 + src/core/indexing/parallel.ts | 146 +++++++++++++ src/core/indexing/pool.ts | 174 +++++++++++++++ src/core/indexing/worker.ts | 198 +++++++++++++++++ 6 files changed, 790 insertions(+), 115 deletions(-) create mode 100644 src/core/indexing/pool.ts create mode 100644 src/core/indexing/worker.ts diff --git a/src/core/indexer.ts b/src/core/indexer.ts index fe6fb4c..a5933f9 100644 --- a/src/core/indexer.ts +++ b/src/core/indexer.ts @@ -204,15 +204,16 @@ export class IndexerV2 { const astCallsName = parallelResult.astCallsName; const addedByLang: Record = {}; - for (const lang of languages) { + // Write to LanceDB tables in parallel — each language table is independent + await Promise.all(languages.map(async (lang) => { const t = byLang[lang]; - if (!t) continue; + if (!t) return; const chunkRows = chunkRowsByLang[lang] ?? []; const refRows = refRowsByLang[lang] ?? []; if (chunkRows.length > 0) await t.chunks.add(chunkRows as unknown as Record[]); if (refRows.length > 0) await t.refs.add(refRows as unknown as Record[]); addedByLang[lang] = { chunksAdded: chunkRows.length, refsAdded: refRows.length }; - } + })); const astGraph = await writeAstGraphToCozo(this.repoRoot, { files: astFiles, diff --git a/src/core/indexerIncremental.ts b/src/core/indexerIncremental.ts index 5dd838c..1353642 100644 --- a/src/core/indexerIncremental.ts +++ b/src/core/indexerIncremental.ts @@ -1,5 +1,6 @@ import fs from 'fs-extra'; import path from 'path'; +import os from 'os'; import simpleGit from 'simple-git'; import { sha256Hex } from './crypto'; import { defaultDbDir, IndexLang, openTablesByLang, ALL_INDEX_LANGS } from './lancedb'; @@ -11,6 +12,8 @@ import { ChunkRow, RefRow } from './types'; import { GitDiffPathChange } from './gitDiff'; import { SnapshotCodeParser } from './parser/snapshotParser'; import { getCurrentCommitHash } from './git'; +import { IndexingWorkerPool } from './indexing/pool'; +import type { WorkerFileResult } from './indexing/worker'; export interface IncrementalIndexOptions { repoRoot: string; @@ -205,11 +208,10 @@ export class IncrementalIndexerV2 { const totalFiles = this.changes.length; this.onProgress?.({ totalFiles, processedFiles: 0 }); - let processed = 0; + // Phase A: Sequential deletions — DB operations must be serialized for safety + const filesToIndex: Array<{ filePosix: string; ch: GitDiffPathChange }> = []; for (const ch of this.changes) { - processed++; const filePosix = toPosixPath(ch.path); - this.onProgress?.({ totalFiles, processedFiles: processed, currentFile: filePosix }); if (ch.status === 'R' && ch.oldPath) { const oldFile = toPosixPath(ch.oldPath); @@ -226,26 +228,254 @@ export class IncrementalIndexerV2 { await removeFileFromAstGraph(this.repoRoot, filePosix); if (!isIndexableFile(filePosix)) continue; - - // Check if file should be indexed based on ignore/include patterns if (!shouldIndexFile(filePosix, aiIgnore, gitIgnore, includePatterns)) continue; + filesToIndex.push({ filePosix, ch }); + } + + // Phase B: Process files — use worker threads when enough files, else single-threaded + const WORKER_THREAD_MIN_FILES = 20; + const useWorkerThreads = filesToIndex.length >= WORKER_THREAD_MIN_FILES; + let pool: IndexingWorkerPool | null = null; + + if (useWorkerThreads) { + const poolSize = Math.max(1, Math.min(filesToIndex.length, (os.cpus()?.length ?? 2) - 1)); + pool = IndexingWorkerPool.create({ poolSize }); + } + + try { + if (pool) { + // ── Worker-thread path: main thread reads, workers parse + embed ── + await this.processFilesWithPool(pool, filesToIndex, { + chunkRowsByLang, refRowsByLang, + astFiles, astSymbols, astContains, astExtendsName, astImplementsName, astRefsName, astCallsName, + totalFiles, + }); + } else { + // ── Single-threaded fallback ── + await this.processFilesSingleThreaded(filesToIndex, { + chunkRowsByLang, refRowsByLang, candidateChunksByLang, neededHashByLang, + astFiles, astSymbols, astContains, astExtendsName, astImplementsName, astRefsName, astCallsName, + totalFiles, + }); + + // Check existing chunks and compute embeddings (single-threaded only) + const existingChunkIdsByLang: Partial>> = {}; + for (const lang of Object.keys(neededHashByLang) as IndexLang[]) { + const t = (byLang as any)[lang]; + if (!t) continue; + const needed = Array.from(neededHashByLang[lang] ?? []); + const existing = new Set(); + for (let i = 0; i < needed.length; i += 400) { + const chunk = needed.slice(i, i + 400); + if (chunk.length === 0) continue; + const pred = `content_hash IN (${chunk.map((h: string) => `'${escapeQuotes(h)}'`).join(',')})`; + const rows = await t.chunks.query().where(pred).select(['content_hash']).limit(chunk.length).toArray(); + for (const row of rows as any[]) { + const id = String(row.content_hash ?? ''); + if (id) existing.add(id); + } + } + existingChunkIdsByLang[lang] = existing; + } + + for (const lang of Object.keys(candidateChunksByLang) as IndexLang[]) { + const t = (byLang as any)[lang]; + if (!t) continue; + const existing = existingChunkIdsByLang[lang] ?? new Set(); + const chunkRows: ChunkRow[] = []; + const candidates = candidateChunksByLang[lang]!; + for (const [contentHash, text] of candidates.entries()) { + if (!contentHash || !text) continue; + if (existing.has(contentHash)) continue; + const vec = hashEmbedding(text, { dim: this.dim }); + const q = quantizeSQ8(vec); + chunkRows.push({ + content_hash: contentHash, + text, + dim: q.dim, + scale: q.scale, + qvec_b64: Buffer.from(q.q).toString('base64'), + }); + existing.add(contentHash); + } + chunkRowsByLang[lang] = chunkRows as any[]; + } + } + } finally { + if (pool) await pool.close(); + } + + const addedByLang: Record = {}; + // Write to LanceDB tables in parallel — each language table is independent + await Promise.all(ALL_INDEX_LANGS.map(async (lang) => { + const t = byLang[lang]; + if (!t) return; + const chunkRows = chunkRowsByLang[lang] ?? []; + const refRows = refRowsByLang[lang] ?? []; + if (chunkRows.length > 0) await t.chunks.add(chunkRows); + if (refRows.length > 0) await t.refs.add(refRows); + if (chunkRows.length > 0 || refRows.length > 0) { + addedByLang[lang] = { chunksAdded: chunkRows.length, refsAdded: refRows.length }; + } + })); + + const astGraph = await writeAstGraphToCozo(this.repoRoot, { + files: astFiles, + symbols: astSymbols, + contains: astContains, + extends_name: astExtendsName, + implements_name: astImplementsName, + refs_name: astRefsName, + calls_name: astCallsName, + }, { mode: 'put' }); + + const metaPath = path.join(gitAiDir, 'meta.json'); + const prev = await fs.readJSON(metaPath).catch(() => null); + const commitHash = await getCurrentCommitHash(this.repoRoot); + const meta = { + ...(prev && typeof prev === 'object' ? prev : {}), + version: '2.1', + index_schema_version: 3, + dim: this.dim, + dbDir: path.relative(this.repoRoot, dbDir), + scanRoot: path.relative(this.repoRoot, this.scanRoot), + languages: ALL_INDEX_LANGS, + byLang: addedByLang, + ...(commitHash ? { commit_hash: commitHash } : {}), + astGraph: astGraph.enabled + ? { + backend: 'cozo', + engine: astGraph.engine, + dbPath: astGraph.dbPath ? path.relative(this.repoRoot, astGraph.dbPath) : undefined, + counts: astGraph.counts, + } + : { + backend: 'cozo', + enabled: false, + skippedReason: astGraph.skippedReason, + }, + }; + await fs.writeJSON(metaPath, meta, { spaces: 2 }); + + return { processed: this.changes.length, addedByLang }; + } + + // ── Worker-thread processing ────────────────────────────────────────── + + private async processFilesWithPool( + pool: IndexingWorkerPool, + filesToIndex: Array<{ filePosix: string; ch: GitDiffPathChange }>, + state: { + chunkRowsByLang: Partial>; + refRowsByLang: Partial>; + astFiles: Array<[string, string, string]>; + astSymbols: Array<[string, string, string, string, string, string, number, number]>; + astContains: Array<[string, string]>; + astExtendsName: Array<[string, string]>; + astImplementsName: Array<[string, string]>; + astRefsName: Array<[string, string, string, string, string, number, number]>; + astCallsName: Array<[string, string, string, string, number, number]>; + totalFiles: number; + }, + ): Promise { + let processed = 0; + const seenChunkHashes = new Set(); + + const mergeResult = (wr: WorkerFileResult): void => { + const lang = wr.lang as IndexLang; + if (!state.chunkRowsByLang[lang]) state.chunkRowsByLang[lang] = []; + if (!state.refRowsByLang[lang]) state.refRowsByLang[lang] = []; + + for (const chunk of wr.chunkRows) { + if (!seenChunkHashes.has(chunk.content_hash)) { + state.chunkRowsByLang[lang]!.push(chunk); + seenChunkHashes.add(chunk.content_hash); + } + } + + state.refRowsByLang[lang]!.push(...wr.refRows); + state.astFiles.push(wr.astFileEntry); + state.astSymbols.push(...wr.astSymbols); + state.astContains.push(...wr.astContains); + state.astExtendsName.push(...wr.astExtendsName); + state.astImplementsName.push(...wr.astImplementsName); + state.astRefsName.push(...wr.astRefsName); + state.astCallsName.push(...wr.astCallsName); + }; + + const tasks: Array> = []; + for (const item of filesToIndex) { + const task = (async () => { + processed++; + this.onProgress?.({ totalFiles: state.totalFiles, processedFiles: processed, currentFile: item.filePosix }); + + const content = this.source === 'staged' + ? await readStagedFile(this.repoRoot, item.filePosix) + : await readWorktreeFile(this.scanRoot, item.filePosix); + if (content == null) return; + + const result = await pool.processFile({ + filePath: item.filePosix, + content, + dim: this.dim, + quantizationBits: 8, + existingChunkHashes: [], + }); + + if (result) mergeResult(result); + })(); + tasks.push(task); + } + + await Promise.all(tasks); + } + + // ── Single-threaded processing ──────────────────────────────────────── + + private async processFilesSingleThreaded( + filesToIndex: Array<{ filePosix: string; ch: GitDiffPathChange }>, + state: { + chunkRowsByLang: Partial>; + refRowsByLang: Partial>; + candidateChunksByLang: Partial>>; + neededHashByLang: Partial>>; + astFiles: Array<[string, string, string]>; + astSymbols: Array<[string, string, string, string, string, string, number, number]>; + astContains: Array<[string, string]>; + astExtendsName: Array<[string, string]>; + astImplementsName: Array<[string, string]>; + astRefsName: Array<[string, string, string, string, string, number, number]>; + astCallsName: Array<[string, string, string, string, number, number]>; + totalFiles: number; + }, + ): Promise { + let processed = 0; + const concurrency = Math.max(1, Math.min(8, filesToIndex.length)); + const queue = filesToIndex.slice(); + const active = new Set>(); + + const processOneFile = async (item: { filePosix: string; ch: GitDiffPathChange }): Promise => { + processed++; + const { filePosix } = item; + this.onProgress?.({ totalFiles: state.totalFiles, processedFiles: processed, currentFile: filePosix }); + const lang = inferIndexLang(filePosix); - if (!chunkRowsByLang[lang]) chunkRowsByLang[lang] = []; - if (!refRowsByLang[lang]) refRowsByLang[lang] = []; - if (!candidateChunksByLang[lang]) candidateChunksByLang[lang] = new Map(); - if (!neededHashByLang[lang]) neededHashByLang[lang] = new Set(); + if (!state.chunkRowsByLang[lang]) state.chunkRowsByLang[lang] = []; + if (!state.refRowsByLang[lang]) state.refRowsByLang[lang] = []; + if (!state.candidateChunksByLang[lang]) state.candidateChunksByLang[lang] = new Map(); + if (!state.neededHashByLang[lang]) state.neededHashByLang[lang] = new Set(); const content = this.source === 'staged' ? await readStagedFile(this.repoRoot, filePosix) : await readWorktreeFile(this.scanRoot, filePosix); - if (content == null) continue; + if (content == null) return; const parsed = this.parser.parseContent(filePosix, content); const symbols = parsed.symbols; const fileRefs = parsed.refs; const fileId = sha256Hex(`file:${filePosix}`); - astFiles.push([fileId, filePosix, lang]); + state.astFiles.push([fileId, filePosix, lang]); const callableScopes: Array<{ refId: string; startLine: number; endLine: number }> = []; for (const s of symbols) { @@ -253,7 +483,7 @@ export class IncrementalIndexerV2 { const contentHash = sha256Hex(text); const refId = sha256Hex(`${filePosix}:${s.name}:${s.kind}:${s.startLine}:${s.endLine}:${contentHash}`); - astSymbols.push([refId, filePosix, lang, s.name, s.kind, s.signature, s.startLine, s.endLine]); + state.astSymbols.push([refId, filePosix, lang, s.name, s.kind, s.signature, s.startLine, s.endLine]); if (s.kind === 'function' || s.kind === 'method') { callableScopes.push({ refId, startLine: s.startLine, endLine: s.endLine }); } @@ -264,15 +494,15 @@ export class IncrementalIndexerV2 { const cHash = sha256Hex(cText); parentId = sha256Hex(`${filePosix}:${s.container.name}:${s.container.kind}:${s.container.startLine}:${s.container.endLine}:${cHash}`); } - astContains.push([parentId, refId]); + state.astContains.push([parentId, refId]); if (s.kind === 'class') { - if (s.extends) for (const superName of s.extends) astExtendsName.push([refId, superName]); - if (s.implements) for (const ifaceName of s.implements) astImplementsName.push([refId, ifaceName]); + if (s.extends) for (const superName of s.extends) state.astExtendsName.push([refId, superName]); + if (s.implements) for (const ifaceName of s.implements) state.astImplementsName.push([refId, ifaceName]); } - neededHashByLang[lang]!.add(contentHash); - candidateChunksByLang[lang]!.set(contentHash, text); + state.neededHashByLang[lang]!.add(contentHash); + state.candidateChunksByLang[lang]!.set(contentHash, text); const refRow: RefRow = { ref_id: refId, @@ -284,7 +514,7 @@ export class IncrementalIndexerV2 { start_line: s.startLine, end_line: s.endLine, }; - refRowsByLang[lang]!.push(refRow as any); + state.refRowsByLang[lang]!.push(refRow as any); } const pickScope = (line: number): string => { @@ -299,106 +529,26 @@ export class IncrementalIndexerV2 { for (const r of fileRefs) { const fromId = pickScope(r.line); - astRefsName.push([fromId, lang, r.name, r.refKind, filePosix, r.line, r.column]); + state.astRefsName.push([fromId, lang, r.name, r.refKind, filePosix, r.line, r.column]); if (r.refKind === 'call' || r.refKind === 'new') { - astCallsName.push([fromId, lang, r.name, filePosix, r.line, r.column]); + state.astCallsName.push([fromId, lang, r.name, filePosix, r.line, r.column]); } } - } - - const existingChunkIdsByLang: Partial>> = {}; - for (const lang of Object.keys(neededHashByLang) as IndexLang[]) { - const t = (byLang as any)[lang]; - if (!t) continue; - const needed = Array.from(neededHashByLang[lang] ?? []); - const existing = new Set(); - for (let i = 0; i < needed.length; i += 400) { - const chunk = needed.slice(i, i + 400); - if (chunk.length === 0) continue; - const pred = `content_hash IN (${chunk.map((h) => `'${escapeQuotes(h)}'`).join(',')})`; - const rows = await t.chunks.query().where(pred).select(['content_hash']).limit(chunk.length).toArray(); - for (const row of rows as any[]) { - const id = String(row.content_hash ?? ''); - if (id) existing.add(id); - } - } - existingChunkIdsByLang[lang] = existing; - } + }; - for (const lang of Object.keys(candidateChunksByLang) as IndexLang[]) { - const t = (byLang as any)[lang]; - if (!t) continue; - const existing = existingChunkIdsByLang[lang] ?? new Set(); - const chunkRows: ChunkRow[] = []; - const candidates = candidateChunksByLang[lang]!; - for (const [contentHash, text] of candidates.entries()) { - if (!contentHash || !text) continue; - if (existing.has(contentHash)) continue; - const vec = hashEmbedding(text, { dim: this.dim }); - const q = quantizeSQ8(vec); - chunkRows.push({ - content_hash: contentHash, - text, - dim: q.dim, - scale: q.scale, - qvec_b64: Buffer.from(q.q).toString('base64'), + const scheduleNext = (): void => { + while (active.size < concurrency && queue.length > 0) { + const item = queue.shift()!; + const task = processOneFile(item).catch(() => undefined).then(() => { + active.delete(task); }); - existing.add(contentHash); - } - chunkRowsByLang[lang] = chunkRows as any[]; - } - - const addedByLang: Record = {}; - for (const lang of ALL_INDEX_LANGS) { - const t = byLang[lang]; - if (!t) continue; - const chunkRows = chunkRowsByLang[lang] ?? []; - const refRows = refRowsByLang[lang] ?? []; - if (chunkRows.length > 0) await t.chunks.add(chunkRows); - if (refRows.length > 0) await t.refs.add(refRows); - if (chunkRows.length > 0 || refRows.length > 0) { - addedByLang[lang] = { chunksAdded: chunkRows.length, refsAdded: refRows.length }; + active.add(task); } - } - - const astGraph = await writeAstGraphToCozo(this.repoRoot, { - files: astFiles, - symbols: astSymbols, - contains: astContains, - extends_name: astExtendsName, - implements_name: astImplementsName, - refs_name: astRefsName, - calls_name: astCallsName, - }, { mode: 'put' }); - - const metaPath = path.join(gitAiDir, 'meta.json'); - const prev = await fs.readJSON(metaPath).catch(() => null); - const commitHash = await getCurrentCommitHash(this.repoRoot); - const meta = { - ...(prev && typeof prev === 'object' ? prev : {}), - version: '2.1', - index_schema_version: 3, - dim: this.dim, - dbDir: path.relative(this.repoRoot, dbDir), - scanRoot: path.relative(this.repoRoot, this.scanRoot), - languages: ALL_INDEX_LANGS, - byLang: addedByLang, - ...(commitHash ? { commit_hash: commitHash } : {}), - astGraph: astGraph.enabled - ? { - backend: 'cozo', - engine: astGraph.engine, - dbPath: astGraph.dbPath ? path.relative(this.repoRoot, astGraph.dbPath) : undefined, - counts: astGraph.counts, - } - : { - backend: 'cozo', - enabled: false, - skippedReason: astGraph.skippedReason, - }, }; - await fs.writeJSON(metaPath, meta, { spaces: 2 }); - - return { processed: this.changes.length, addedByLang }; + scheduleNext(); + while (active.size > 0) { + await Promise.race(active); + scheduleNext(); + } } } diff --git a/src/core/indexing/config.ts b/src/core/indexing/config.ts index ce208ba..300961b 100644 --- a/src/core/indexing/config.ts +++ b/src/core/indexing/config.ts @@ -13,6 +13,10 @@ export interface IndexingConfig { batchSize: number; memoryBudgetMb: number; hnswConfig: HNSWParameters; + /** Enable true multi-threading via worker_threads for CPU-bound operations. */ + useWorkerThreads: boolean; + /** Minimum number of files before enabling worker threads (avoid startup overhead for small repos). */ + workerThreadsMinFiles: number; } export type ParseFailureFallback = 'skip' | 'line-chunk' | 'text-only'; @@ -42,6 +46,8 @@ export function defaultIndexingConfig(): IndexingConfig { efSearch: 100, quantizationBits: 8, }), + useWorkerThreads: true, + workerThreadsMinFiles: 50, }; } diff --git a/src/core/indexing/parallel.ts b/src/core/indexing/parallel.ts index 69a082f..71b68da 100644 --- a/src/core/indexing/parallel.ts +++ b/src/core/indexing/parallel.ts @@ -9,6 +9,8 @@ import { sha256Hex } from '../crypto'; import { toPosixPath } from '../paths'; import { ErrorHandlingConfig, IndexingConfig } from './config'; import { MemoryMonitor } from './monitor'; +import { IndexingWorkerPool } from './pool'; +import type { WorkerFileResult } from './worker'; export interface ParallelIndexOptions { repoRoot: string; @@ -34,7 +36,149 @@ export interface ParallelIndexResult { astCallsName: Array<[string, string, string, string, number, number]>; } +/** + * Run indexing with optional worker_threads parallelism. + * + * When `useWorkerThreads` is enabled and the file count exceeds the threshold, + * CPU-bound work (parsing, embedding, quantisation) is offloaded to a thread + * pool while the main thread handles file I/O. Otherwise falls back to the + * original single-threaded Promise-based concurrency. + */ export async function runParallelIndexing(options: ParallelIndexOptions): Promise { + const { indexing, files } = options; + const useThreads = + indexing.useWorkerThreads && + files.length >= indexing.workerThreadsMinFiles; + + if (useThreads) { + const pool = IndexingWorkerPool.create({ poolSize: Math.max(1, indexing.workerCount) }); + if (pool) { + try { + return await runWithWorkerPool(options, pool); + } finally { + await pool.close(); + } + } + // Pool creation failed — fall through to single-threaded path + } + + return runSingleThreaded(options); +} + +// ── Worker-thread path ───────────────────────────────────────────────────── + +async function runWithWorkerPool( + options: ParallelIndexOptions, + pool: IndexingWorkerPool, +): Promise { + const monitor = MemoryMonitor.fromErrorConfig(options.errorHandling, options.indexing.memoryBudgetMb); + const pendingFiles = options.files.slice(); + const totalFiles = pendingFiles.length; + let processedFiles = 0; + const batchSize = Math.max(1, options.indexing.batchSize); + + const state: ParallelIndexResult = { + chunkRowsByLang: {}, + refRowsByLang: {}, + astFiles: [], + astSymbols: [], + astContains: [], + astExtendsName: [], + astImplementsName: [], + astRefsName: [], + astCallsName: [], + }; + + // Collect initial existing hashes per language (snapshot — workers use this) + const existingHashArrayByLang: Partial> = {}; + for (const lang of Object.keys(options.existingChunkIdsByLang) as IndexLang[]) { + existingHashArrayByLang[lang] = Array.from(options.existingChunkIdsByLang[lang] ?? []); + } + + // Track new hashes added during this run to deduplicate across workers + const seenChunkHashes = new Map>(); + for (const lang of Object.keys(options.existingChunkIdsByLang) as IndexLang[]) { + seenChunkHashes.set(lang, new Set(options.existingChunkIdsByLang[lang])); + } + + const mergeWorkerResult = (wr: WorkerFileResult): void => { + const lang = wr.lang as IndexLang; + if (!state.chunkRowsByLang[lang]) state.chunkRowsByLang[lang] = []; + if (!state.refRowsByLang[lang]) state.refRowsByLang[lang] = []; + if (!seenChunkHashes.has(lang)) seenChunkHashes.set(lang, new Set()); + const seen = seenChunkHashes.get(lang)!; + + // Deduplicate chunk rows across workers + for (const chunk of wr.chunkRows) { + if (!seen.has(chunk.content_hash)) { + state.chunkRowsByLang[lang]!.push(chunk); + seen.add(chunk.content_hash); + } + } + + state.refRowsByLang[lang]!.push(...wr.refRows); + state.astFiles.push(wr.astFileEntry); + state.astSymbols.push(...wr.astSymbols); + state.astContains.push(...wr.astContains); + state.astExtendsName.push(...wr.astExtendsName); + state.astImplementsName.push(...wr.astImplementsName); + state.astRefsName.push(...wr.astRefsName); + state.astCallsName.push(...wr.astCallsName); + }; + + options.onProgress?.({ totalFiles, processedFiles: 0 }); + + // Process in batches to allow GC between batches + while (pendingFiles.length > 0) { + const batch = pendingFiles.splice(0, batchSize); + + // Read files on the main thread (I/O), then dispatch CPU work to pool + const tasks: Array> = []; + for (const file of batch) { + const task = (async () => { + const filePosix = toPosixPath(file); + processedFiles++; + options.onProgress?.({ totalFiles, processedFiles, currentFile: filePosix }); + + await monitor.throttleIfNeeded(); + + const fullPath = path.join(options.scanRoot, file); + const content = await readFileWithGate(fullPath, options.errorHandling); + if (content == null) return; + + const lang = inferIndexLang(filePosix); + const existingHashes = existingHashArrayByLang[lang] ?? []; + + const result = await pool.processFile({ + filePath: filePosix, + content, + dim: options.dim, + quantizationBits: options.indexing.hnswConfig.quantizationBits, + existingChunkHashes: existingHashes, + }); + + if (result) { + mergeWorkerResult(result); + } + + const snapshot = monitor.sample(); + if (snapshot.critical) { + options.onThrottle?.({ rssMb: snapshot.rssMb, usageRatio: snapshot.usageRatio }); + await monitor.throttleIfNeeded(); + } + })(); + tasks.push(task); + } + + await Promise.all(tasks); + } + + return state; +} + +// ── Single-threaded fallback (original implementation) ───────────────────── + +async function runSingleThreaded(options: ParallelIndexOptions): Promise { const parser = new SnapshotCodeParser(); const monitor = MemoryMonitor.fromErrorConfig(options.errorHandling, options.indexing.memoryBudgetMb); const pendingFiles = options.files.slice(); @@ -190,6 +334,8 @@ export async function runParallelIndexing(options: ParallelIndexOptions): Promis return state; } +// ── Shared helpers ───────────────────────────────────────────────────────── + async function safeStat(filePath: string): Promise { try { return await fs.stat(filePath); diff --git a/src/core/indexing/pool.ts b/src/core/indexing/pool.ts new file mode 100644 index 0000000..0d28d8d --- /dev/null +++ b/src/core/indexing/pool.ts @@ -0,0 +1,174 @@ +/** + * Worker thread pool for CPU-bound indexing operations. + * + * Manages a fixed pool of worker_threads, distributes file processing tasks, + * and collects results. Supports graceful shutdown and automatic fallback + * when worker_threads cannot be initialised. + */ +import path from 'path'; +import { Worker } from 'worker_threads'; +import type { WorkerRequest, WorkerResponse, WorkerFileResult } from './worker'; + +// ── Public types ─────────────────────────────────────────────────────────── + +export interface PoolOptions { + /** Number of worker threads to spawn. */ + poolSize: number; + /** Absolute path to the compiled worker entry JS file. When omitted the + * pool resolves it relative to this module (works for both src/ and dist/). */ + workerPath?: string; +} + +export interface FileTask { + filePath: string; + content: string; + dim: number; + quantizationBits: number; + existingChunkHashes: string[]; +} + +// ── Pool implementation ──────────────────────────────────────────────────── + +export class IndexingWorkerPool { + private workers: Worker[] = []; + private idleWorkers: Worker[] = []; + private pendingTasks: Array<{ + task: FileTask; + resolve: (result: WorkerFileResult | null) => void; + reject: (err: Error) => void; + }> = []; + private nextId = 0; + private resolvers = new Map void; + reject: (err: Error) => void; + }>(); + private closed = false; + + private constructor(private readonly poolSize: number) {} + + /** + * Create and start a pool of worker threads. + * Returns `null` if worker_threads cannot be initialised (caller should + * fall back to single-threaded processing). + */ + static create(options: PoolOptions): IndexingWorkerPool | null { + try { + const pool = new IndexingWorkerPool(options.poolSize); + const workerPath = options.workerPath ?? resolveWorkerPath(); + for (let i = 0; i < options.poolSize; i++) { + const w = new Worker(workerPath); + w.on('message', (msg: WorkerResponse) => pool.handleMessage(w, msg)); + w.on('error', (err: Error) => pool.handleWorkerError(w, err)); + pool.workers.push(w); + pool.idleWorkers.push(w); + } + return pool; + } catch { + // worker_threads unavailable or worker file not found — graceful fallback + return null; + } + } + + /** Submit a file for processing. Returns the result or null on error. */ + async processFile(task: FileTask): Promise { + if (this.closed) throw new Error('Pool is closed'); + + return new Promise((resolve, reject) => { + const idleWorker = this.idleWorkers.pop(); + if (idleWorker) { + this.dispatch(idleWorker, task, resolve, reject); + } else { + // All workers busy — queue the task + this.pendingTasks.push({ task, resolve, reject }); + } + }); + } + + /** Gracefully terminate all worker threads. */ + async close(): Promise { + if (this.closed) return; + this.closed = true; + // Reject any queued tasks + for (const pending of this.pendingTasks) { + pending.reject(new Error('Pool closed before task could be dispatched')); + } + this.pendingTasks = []; + await Promise.all(this.workers.map((w) => w.terminate())); + this.workers = []; + this.idleWorkers = []; + } + + get size(): number { + return this.workers.length; + } + + // ── Internal ─────────────────────────────────────────────────────────── + + private dispatch( + worker: Worker, + task: FileTask, + resolve: (result: WorkerFileResult | null) => void, + reject: (err: Error) => void, + ): void { + const id = this.nextId++; + this.resolvers.set(id, { resolve, reject }); + const msg: WorkerRequest = { + id, + filePath: task.filePath, + content: task.content, + dim: task.dim, + quantizationBits: task.quantizationBits, + existingChunkHashes: task.existingChunkHashes, + }; + worker.postMessage(msg); + } + + private handleMessage(worker: Worker, msg: WorkerResponse): void { + const entry = this.resolvers.get(msg.id); + if (entry) { + this.resolvers.delete(msg.id); + if (msg.error) { + // Resolve with null on worker-side errors (non-fatal, file is skipped) + entry.resolve(null); + } else { + entry.resolve(msg.result); + } + } + + // Worker is now idle — pick up next queued task or return to idle pool + const next = this.pendingTasks.shift(); + if (next) { + this.dispatch(worker, next.task, next.resolve, next.reject); + } else { + this.idleWorkers.push(worker); + } + } + + private handleWorkerError(worker: Worker, err: Error): void { + // Reject all pending resolvers for this worker (there should be at most 1) + // The worker might be dead — remove it and try to replace if pool isn't closing + const idx = this.workers.indexOf(worker); + if (idx !== -1) { + this.workers.splice(idx, 1); + } + const idleIdx = this.idleWorkers.indexOf(worker); + if (idleIdx !== -1) { + this.idleWorkers.splice(idleIdx, 1); + } + + // Reject any resolvers waiting on this worker's current task + for (const [id, entry] of this.resolvers.entries()) { + entry.reject(err); + this.resolvers.delete(id); + } + } +} + +// ── Helpers ──────────────────────────────────────────────────────────────── + +/** Resolve the compiled worker.js path relative to this module. */ +function resolveWorkerPath(): string { + // In compiled output (dist/), both pool.js and worker.js sit in the same directory. + // In ts-node / tsx dev, __filename points to the .ts source which also works. + return path.join(path.dirname(__filename), 'worker.js'); +} diff --git a/src/core/indexing/worker.ts b/src/core/indexing/worker.ts new file mode 100644 index 0000000..a3abc59 --- /dev/null +++ b/src/core/indexing/worker.ts @@ -0,0 +1,198 @@ +/** + * Worker thread entry point for CPU-bound indexing operations. + * + * Each worker initialises its own SnapshotCodeParser (and therefore its own + * tree-sitter Parser instance) so that parsing can happen truly in parallel + * across multiple OS threads. + * + * Protocol: + * Main → Worker : WorkerRequest (file path + content + config) + * Worker → Main : WorkerResponse (parsed symbols, refs, chunks, AST data) + */ +import { parentPort, workerData } from 'worker_threads'; +import { SnapshotCodeParser } from '../parser/snapshotParser'; +import { hashEmbedding } from '../embedding'; +import { quantizeSQ8 } from '../sq8'; +import { sha256Hex } from '../crypto'; +import { ChunkRow, RefRow, SymbolInfo, AstReference } from '../types'; + +// ── Message types ────────────────────────────────────────────────────────── + +export interface WorkerRequest { + id: number; + filePath: string; // POSIX path relative to scan root + content: string; + dim: number; + quantizationBits: number; + /** Set of content hashes already indexed (serialised as array for transfer). */ + existingChunkHashes: string[]; +} + +export interface WorkerFileResult { + lang: string; + chunkRows: ChunkRow[]; + refRows: RefRow[]; + astFileEntry: [string, string, string]; + astSymbols: Array<[string, string, string, string, string, string, number, number]>; + astContains: Array<[string, string]>; + astExtendsName: Array<[string, string]>; + astImplementsName: Array<[string, string]>; + astRefsName: Array<[string, string, string, string, string, number, number]>; + astCallsName: Array<[string, string, string, string, number, number]>; + /** New content hashes added during this file processing (caller must merge). */ + newChunkHashes: string[]; +} + +export interface WorkerResponse { + id: number; + result: WorkerFileResult | null; + error?: string; +} + +// ── Shared helpers (duplicated from parallel.ts to avoid import issues in worker) ── + +function inferIndexLang(file: string): string { + if (file.endsWith('.md') || file.endsWith('.mdx')) return 'markdown'; + if (file.endsWith('.yml') || file.endsWith('.yaml')) return 'yaml'; + if (file.endsWith('.java')) return 'java'; + if (file.endsWith('.c') || file.endsWith('.h')) return 'c'; + if (file.endsWith('.go')) return 'go'; + if (file.endsWith('.py')) return 'python'; + if (file.endsWith('.rs')) return 'rust'; + return 'ts'; +} + +function buildChunkText(file: string, symbol: { name: string; kind: string; signature: string }): string { + return `file:${file}\nkind:${symbol.kind}\nname:${symbol.name}\nsignature:${symbol.signature}`; +} + +// ── Worker logic ─────────────────────────────────────────────────────────── + +function processFile( + parser: SnapshotCodeParser, + req: WorkerRequest, +): WorkerFileResult { + const { filePath, content, dim, quantizationBits, existingChunkHashes } = req; + const lang = inferIndexLang(filePath); + const existingSet = new Set(existingChunkHashes); + + const parsed = parser.parseContent(filePath, content); + const symbols: SymbolInfo[] = parsed.symbols; + const fileRefs: AstReference[] = parsed.refs; + const fileId = sha256Hex(`file:${filePath}`); + + const chunkRows: ChunkRow[] = []; + const refRows: RefRow[] = []; + const astSymbols: WorkerFileResult['astSymbols'] = []; + const astContains: WorkerFileResult['astContains'] = []; + const astExtendsName: WorkerFileResult['astExtendsName'] = []; + const astImplementsName: WorkerFileResult['astImplementsName'] = []; + const astRefsName: WorkerFileResult['astRefsName'] = []; + const astCallsName: WorkerFileResult['astCallsName'] = []; + const newChunkHashes: string[] = []; + + const callableScopes: Array<{ refId: string; startLine: number; endLine: number }> = []; + + for (const s of symbols) { + const text = buildChunkText(filePath, s); + const contentHash = sha256Hex(text); + const refId = sha256Hex(`${filePath}:${s.name}:${s.kind}:${s.startLine}:${s.endLine}:${contentHash}`); + + astSymbols.push([refId, filePath, lang, s.name, s.kind, s.signature, s.startLine, s.endLine]); + if (s.kind === 'function' || s.kind === 'method') { + callableScopes.push({ refId, startLine: s.startLine, endLine: s.endLine }); + } + + let parentId = fileId; + if (s.container) { + const cText = buildChunkText(filePath, s.container); + const cHash = sha256Hex(cText); + parentId = sha256Hex(`${filePath}:${s.container.name}:${s.container.kind}:${s.container.startLine}:${s.container.endLine}:${cHash}`); + } + astContains.push([parentId, refId]); + + if (s.kind === 'class') { + if (s.extends) { + for (const superName of s.extends) astExtendsName.push([refId, superName]); + } + if (s.implements) { + for (const ifaceName of s.implements) astImplementsName.push([refId, ifaceName]); + } + } + + if (!existingSet.has(contentHash)) { + const vec = hashEmbedding(text, { dim }); + const q = quantizeSQ8(vec, quantizationBits); + chunkRows.push({ + content_hash: contentHash, + text, + dim: q.dim, + scale: q.scale, + qvec_b64: Buffer.from(q.q).toString('base64'), + }); + existingSet.add(contentHash); + newChunkHashes.push(contentHash); + } + + refRows.push({ + ref_id: refId, + content_hash: contentHash, + file: filePath, + symbol: s.name, + kind: s.kind, + signature: s.signature, + start_line: s.startLine, + end_line: s.endLine, + }); + } + + const pickScope = (line: number): string => { + let best: { refId: string; span: number } | null = null; + for (const scope of callableScopes) { + if (line < scope.startLine || line > scope.endLine) continue; + const span = scope.endLine - scope.startLine; + if (!best || span < best.span) best = { refId: scope.refId, span }; + } + return best ? best.refId : fileId; + }; + + for (const r of fileRefs) { + const fromId = pickScope(r.line); + astRefsName.push([fromId, lang, r.name, r.refKind, filePath, r.line, r.column]); + if (r.refKind === 'call' || r.refKind === 'new') { + astCallsName.push([fromId, lang, r.name, filePath, r.line, r.column]); + } + } + + return { + lang, + chunkRows, + refRows, + astFileEntry: [fileId, filePath, lang], + astSymbols, + astContains, + astExtendsName, + astImplementsName, + astRefsName, + astCallsName, + newChunkHashes, + }; +} + +// ── Bootstrap (only runs when loaded as a worker thread) ─────────────────── + +if (parentPort) { + const parser = new SnapshotCodeParser(); + + parentPort.on('message', (msg: WorkerRequest) => { + try { + const result = processFile(parser, msg); + const response: WorkerResponse = { id: msg.id, result }; + parentPort!.postMessage(response); + } catch (err: unknown) { + const error = err instanceof Error ? err.message : String(err); + const response: WorkerResponse = { id: msg.id, result: null, error }; + parentPort!.postMessage(response); + } + }); +} From 6ad76435ea4d5a2113d68be10bd38c8ee3bb9977 Mon Sep 17 00:00:00 2001 From: mars Date: Fri, 6 Feb 2026 23:53:03 +0800 Subject: [PATCH 3/4] chore: add .cursor/ to gitignore Co-authored-by: Cursor --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c1d1a9a..91776c9 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ dist/ .vscode/ .trae/documents/ .trae/ +.cursor/ \ No newline at end of file From 9f9e66fddc1e8af0c1e9936b82a2531842ca593b Mon Sep 17 00:00:00 2001 From: mars167 Date: Sat, 7 Feb 2026 13:37:39 +0800 Subject: [PATCH 4/4] fix: address Copilot review comments on PR #21 - Remove unused workerData import from worker.ts - Add parse failure fallback logic in worker.ts for consistency with single-threaded path - Fix close() to reject in-flight resolvers in pool.ts - Fix handleWorkerError to only reject affected task in pool.ts - Use configurable workerThreadsMinFiles instead of hardcoded value in indexerIncremental.ts - Add missing existingChunkHashes query in indexerIncremental.ts worker path - Add concurrency limit for read+dispatch in indexerIncremental.ts processFilesWithPool - Optimize existingHashes transfer in parallel.ts by removing per-task transfer - Track worker-task mapping via workerTaskIds Map in pool.ts --- src/core/indexerIncremental.ts | 36 ++++++++++++++++++++++++++++++---- src/core/indexing/parallel.ts | 9 +-------- src/core/indexing/pool.ts | 25 +++++++++++++++++------ src/core/indexing/worker.ts | 19 ++++++++++++++---- 4 files changed, 67 insertions(+), 22 deletions(-) diff --git a/src/core/indexerIncremental.ts b/src/core/indexerIncremental.ts index 1353642..a5923b2 100644 --- a/src/core/indexerIncremental.ts +++ b/src/core/indexerIncremental.ts @@ -14,6 +14,7 @@ import { SnapshotCodeParser } from './parser/snapshotParser'; import { getCurrentCommitHash } from './git'; import { IndexingWorkerPool } from './indexing/pool'; import type { WorkerFileResult } from './indexing/worker'; +import { defaultIndexingConfig, IndexingConfig } from './indexing/config'; export interface IncrementalIndexOptions { repoRoot: string; @@ -22,6 +23,7 @@ export interface IncrementalIndexOptions { source: 'worktree' | 'staged'; changes: GitDiffPathChange[]; onProgress?: (p: { totalFiles: number; processedFiles: number; currentFile?: string }) => void; + indexingConfig?: Partial; } async function loadIgnorePatterns(repoRoot: string, fileName: string): Promise { @@ -163,6 +165,7 @@ export class IncrementalIndexerV2 { private source: IncrementalIndexOptions['source']; private changes: GitDiffPathChange[]; private onProgress?: IncrementalIndexOptions['onProgress']; + private indexingConfig: IndexingConfig; private parser: SnapshotCodeParser; constructor(options: IncrementalIndexOptions) { @@ -172,6 +175,7 @@ export class IncrementalIndexerV2 { this.source = options.source; this.changes = options.changes; this.onProgress = options.onProgress; + this.indexingConfig = { ...defaultIndexingConfig(), ...options.indexingConfig }; this.parser = new SnapshotCodeParser(); } @@ -234,8 +238,7 @@ export class IncrementalIndexerV2 { } // Phase B: Process files — use worker threads when enough files, else single-threaded - const WORKER_THREAD_MIN_FILES = 20; - const useWorkerThreads = filesToIndex.length >= WORKER_THREAD_MIN_FILES; + const useWorkerThreads = this.indexingConfig.useWorkerThreads && filesToIndex.length >= this.indexingConfig.workerThreadsMinFiles; let pool: IndexingWorkerPool | null = null; if (useWorkerThreads) { @@ -246,11 +249,28 @@ export class IncrementalIndexerV2 { try { if (pool) { // ── Worker-thread path: main thread reads, workers parse + embed ── + const existingChunkIdsByLang: Partial>> = {}; + for (const lang of ALL_INDEX_LANGS) { + const t = (byLang as any)[lang]; + if (!t) continue; + const existing = new Set(); + try { + const rows = await t.chunks.query().select(['content_hash']).toArray(); + for (const row of rows as any[]) { + const id = String(row.content_hash ?? ''); + if (id) existing.add(id); + } + } catch { + // Table might not exist yet + } + existingChunkIdsByLang[lang] = existing; + } + await this.processFilesWithPool(pool, filesToIndex, { chunkRowsByLang, refRowsByLang, astFiles, astSymbols, astContains, astExtendsName, astImplementsName, astRefsName, astCallsName, totalFiles, - }); + }, existingChunkIdsByLang); } else { // ── Single-threaded fallback ── await this.processFilesSingleThreaded(filesToIndex, { @@ -378,6 +398,7 @@ export class IncrementalIndexerV2 { astCallsName: Array<[string, string, string, string, number, number]>; totalFiles: number; }, + existingChunkIdsByLang: Partial>>, ): Promise { let processed = 0; const seenChunkHashes = new Set(); @@ -415,17 +436,24 @@ export class IncrementalIndexerV2 { : await readWorktreeFile(this.scanRoot, item.filePosix); if (content == null) return; + const lang = inferIndexLang(item.filePosix); + const existingHashes = Array.from(existingChunkIdsByLang[lang] ?? []); + const result = await pool.processFile({ filePath: item.filePosix, content, dim: this.dim, quantizationBits: 8, - existingChunkHashes: [], + existingChunkHashes: existingHashes, }); if (result) mergeResult(result); })(); tasks.push(task); + + if (tasks.length >= pool.size * 2) { + await Promise.race(tasks); + } } await Promise.all(tasks); diff --git a/src/core/indexing/parallel.ts b/src/core/indexing/parallel.ts index 71b68da..7bc3737 100644 --- a/src/core/indexing/parallel.ts +++ b/src/core/indexing/parallel.ts @@ -89,12 +89,6 @@ async function runWithWorkerPool( astCallsName: [], }; - // Collect initial existing hashes per language (snapshot — workers use this) - const existingHashArrayByLang: Partial> = {}; - for (const lang of Object.keys(options.existingChunkIdsByLang) as IndexLang[]) { - existingHashArrayByLang[lang] = Array.from(options.existingChunkIdsByLang[lang] ?? []); - } - // Track new hashes added during this run to deduplicate across workers const seenChunkHashes = new Map>(); for (const lang of Object.keys(options.existingChunkIdsByLang) as IndexLang[]) { @@ -147,14 +141,13 @@ async function runWithWorkerPool( if (content == null) return; const lang = inferIndexLang(filePosix); - const existingHashes = existingHashArrayByLang[lang] ?? []; const result = await pool.processFile({ filePath: filePosix, content, dim: options.dim, quantizationBits: options.indexing.hnswConfig.quantizationBits, - existingChunkHashes: existingHashes, + existingChunkHashes: [], }); if (result) { diff --git a/src/core/indexing/pool.ts b/src/core/indexing/pool.ts index 0d28d8d..51c5adb 100644 --- a/src/core/indexing/pool.ts +++ b/src/core/indexing/pool.ts @@ -42,6 +42,7 @@ export class IndexingWorkerPool { resolve: (result: WorkerFileResult | null) => void; reject: (err: Error) => void; }>(); + private workerTaskIds = new Map(); private closed = false; private constructor(private readonly poolSize: number) {} @@ -93,6 +94,12 @@ export class IndexingWorkerPool { pending.reject(new Error('Pool closed before task could be dispatched')); } this.pendingTasks = []; + // Reject all in-flight tasks before terminating workers + for (const [id, entry] of this.resolvers.entries()) { + entry.reject(new Error('Pool closed while task was in progress')); + this.resolvers.delete(id); + } + this.workerTaskIds.clear(); await Promise.all(this.workers.map((w) => w.terminate())); this.workers = []; this.idleWorkers = []; @@ -112,6 +119,7 @@ export class IndexingWorkerPool { ): void { const id = this.nextId++; this.resolvers.set(id, { resolve, reject }); + this.workerTaskIds.set(worker, id); const msg: WorkerRequest = { id, filePath: task.filePath, @@ -124,6 +132,7 @@ export class IndexingWorkerPool { } private handleMessage(worker: Worker, msg: WorkerResponse): void { + this.workerTaskIds.delete(worker); const entry = this.resolvers.get(msg.id); if (entry) { this.resolvers.delete(msg.id); @@ -145,8 +154,6 @@ export class IndexingWorkerPool { } private handleWorkerError(worker: Worker, err: Error): void { - // Reject all pending resolvers for this worker (there should be at most 1) - // The worker might be dead — remove it and try to replace if pool isn't closing const idx = this.workers.indexOf(worker); if (idx !== -1) { this.workers.splice(idx, 1); @@ -156,14 +163,20 @@ export class IndexingWorkerPool { this.idleWorkers.splice(idleIdx, 1); } - // Reject any resolvers waiting on this worker's current task - for (const [id, entry] of this.resolvers.entries()) { - entry.reject(err); - this.resolvers.delete(id); + const taskId = this.workerTaskIds.get(worker); + this.workerTaskIds.delete(worker); + if (taskId !== undefined) { + const entry = this.resolvers.get(taskId); + if (entry) { + entry.reject(err); + this.resolvers.delete(taskId); + } } } } +export type { WorkerFileResult }; + // ── Helpers ──────────────────────────────────────────────────────────────── /** Resolve the compiled worker.js path relative to this module. */ diff --git a/src/core/indexing/worker.ts b/src/core/indexing/worker.ts index a3abc59..40f85d1 100644 --- a/src/core/indexing/worker.ts +++ b/src/core/indexing/worker.ts @@ -9,7 +9,7 @@ * Main → Worker : WorkerRequest (file path + content + config) * Worker → Main : WorkerResponse (parsed symbols, refs, chunks, AST data) */ -import { parentPort, workerData } from 'worker_threads'; +import { parentPort } from 'worker_threads'; import { SnapshotCodeParser } from '../parser/snapshotParser'; import { hashEmbedding } from '../embedding'; import { quantizeSQ8 } from '../sq8'; @@ -76,9 +76,20 @@ function processFile( const lang = inferIndexLang(filePath); const existingSet = new Set(existingChunkHashes); - const parsed = parser.parseContent(filePath, content); - const symbols: SymbolInfo[] = parsed.symbols; - const fileRefs: AstReference[] = parsed.refs; + // Parse with fallback: on failure, degrade gracefully to empty symbols/refs + // instead of throwing, ensuring consistent behavior with single-threaded path + let symbols: SymbolInfo[] = []; + let fileRefs: AstReference[] = []; + try { + const parsed = parser.parseContent(filePath, content); + symbols = parsed.symbols ?? []; + fileRefs = parsed.refs ?? []; + } catch { + // On parse failure, fall back to empty symbol/ref set. + // This mirrors single-threaded behavior where parse failures don't skip the file. + symbols = []; + fileRefs = []; + } const fileId = sha256Hex(`file:${filePath}`); const chunkRows: ChunkRow[] = [];