Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion src/context_engine/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""CLI entry point for code-context-engine."""
import asyncio
import json
import os
import socket
import sys
from pathlib import Path
Expand Down Expand Up @@ -2786,6 +2787,16 @@ def phase_fn(msg: str) -> None:
async def _run_serve(config) -> None:
"""Start MCP server with live file watcher."""
import logging
import signal
# Force single-process embedding inside `cce serve` unless the user
# explicitly overrode it. The reindex worker triggered by file changes
# otherwise spawns a fastembed forkserver pool (~4 workers × ~1.6 GB on
# Linux) that orphans on abnormal exit and leaks RSS across `cce index`
# invocations (issue #66). Single-process embed is plenty for one-file
# watcher reindexes; bulk `cce index` run from a separate shell still
# gets the multiprocess path.
os.environ.setdefault("CCE_EMBED_PARALLEL", "0")

from context_engine.storage.local_backend import LocalBackend
from context_engine.indexer.embedder import Embedder
from context_engine.retrieval.retriever import HybridRetriever
Expand Down Expand Up @@ -2903,9 +2914,56 @@ async def _reindex_worker():
file=sys.stderr,
)

# Install signal handlers so SIGINT (Ctrl-C), SIGTERM, and SIGHUP all
# route through the same orderly shutdown path. Previously only SIGTERM
# cancelled the MCP task — SIGINT was swallowed by stdio reads, leaving
# `cce serve` unkillable except via SIGKILL, which orphans the embed
# workers (#66).
serve_loop = asyncio.get_running_loop()
mcp_task = asyncio.create_task(mcp.run_stdio())

def _request_shutdown(signame: str) -> None:
if not mcp_task.done():
_log.info("Received %s, shutting down...", signame)
mcp_task.cancel()

# Build the candidate list with getattr so we don't reference
# `signal.SIGHUP` at the tuple-construction site — SIGHUP is
# undefined on Windows and that AttributeError would fire *before*
# the try/except below could swallow it, crashing `cce serve` on
# Windows entirely (Copilot review on #69).
installed_signals: list[int] = []
candidate_sigs = [
s for s in (
getattr(signal, "SIGINT", None),
getattr(signal, "SIGTERM", None),
getattr(signal, "SIGHUP", None),
) if s is not None
]
for _sig in candidate_sigs:
try:
serve_loop.add_signal_handler(
_sig, _request_shutdown, _sig.name,
)
installed_signals.append(_sig)
Comment thread
rajkumarsakthivel marked this conversation as resolved.
except (NotImplementedError, RuntimeError):
# Windows's ProactorEventLoop refuses add_signal_handler;
# asyncio also raises NotImplementedError outside the main
# thread. SIGTERM still arrives via the default Python
# handler in those environments.
pass

try:
await mcp.run_stdio()
try:
await mcp_task
except asyncio.CancelledError:
pass
finally:
for _sig in installed_signals:
try:
serve_loop.remove_signal_handler(_sig)
except (NotImplementedError, RuntimeError):
pass
if watcher:
watcher.stop()
if worker_task:
Expand Down
32 changes: 25 additions & 7 deletions src/context_engine/indexer/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,41 @@
# hits 100%). On Windows, ONNX Runtime worker processes crash with
# ACCESS_VIOLATION (0xC0000005) due to DLL handle inheritance issues.
# Default to None on darwin and win32; allow override via CCE_EMBED_PARALLEL.
#
# Override grammar (case-insensitive):
# "0" | "none" | "off" | "false" | "no" → None (single-process)
# "N" (positive integer) → min(N, cpu_count) (cap added
# for #66: 12-CPU users on a fast
# box could otherwise CCE_EMBED_PARALLEL=64
# and OOM themselves)
# anything else → fall through to platform default
Comment thread
rajkumarsakthivel marked this conversation as resolved.
#
# Evaluated lazily (not at import) so a caller — notably `cce serve` — can
# set CCE_EMBED_PARALLEL=0 before any Embedder is constructed and have it
# take effect for that process.
_DISABLED_TOKENS = {"0", "none", "off", "false", "no"}


def _resolve_parallel() -> int | None:
override = os.environ.get("CCE_EMBED_PARALLEL")
override = os.environ.get("CCE_EMBED_PARALLEL", "").strip().lower()
if override:
if override in _DISABLED_TOKENS:
return None
try:
return max(1, int(override))
n = int(override)
except ValueError:
pass
n = None
if n is not None:
if n <= 0:
return None
return min(n, os.cpu_count() or n)
if sys.platform == "darwin":
return None
if sys.platform == "win32":
return None
return min(os.cpu_count() or 2, 4)


_PARALLEL: int | None = _resolve_parallel()


class Embedder:
def __init__(
self,
Expand Down Expand Up @@ -141,7 +159,7 @@ def _embed_all(
for i, emb in enumerate(self._model.embed(
texts,
batch_size=batch_size,
parallel=_PARALLEL,
parallel=_resolve_parallel(),
)):
chunks[i].embedding = emb.tolist()
if progress_fn and ((i + 1) % batch_size == 0 or i + 1 == total):
Expand Down
39 changes: 35 additions & 4 deletions src/context_engine/indexer/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ def _should_ignore(self, path: str) -> bool:
return True
return False

def on_any_event(self, event):
if event.is_directory:
return
path = event.src_path
def _enqueue(self, path: str) -> None:
if self._should_ignore(path):
return
with self._lock:
Expand All @@ -54,6 +51,40 @@ def on_any_event(self, event):
self._timer = threading.Timer(self._debounce_s, self._flush)
self._timer.start()

# Only the four content-changing event types trigger a reindex. Earlier
# versions used `on_any_event`, which also fires for `opened` and
# `closed_no_write` — those are emitted hundreds of times whenever a
# sibling `cce index` reads files to hash, causing the serve process to
# spawn a forkserver pool that orphaned ~5 GB on each invocation
# (issue #66). Read-only filesystem activity now goes ignored.
def on_modified(self, event):
if event.is_directory:
return
self._enqueue(event.src_path)

def on_created(self, event):
if event.is_directory:
return
self._enqueue(event.src_path)

def on_deleted(self, event):
if event.is_directory:
return
self._enqueue(event.src_path)

def on_moved(self, event):
if event.is_directory:
return
# A move emits one event with both src_path and dest_path. Enqueue
# each path; _enqueue → _should_ignore drops anything that
# resolves outside the watch dir (or under .cce / an ignore
# pattern). Putting the watch-dir filter inside _enqueue keeps the
# rule in one place for every event type.
self._enqueue(event.src_path)
dest = getattr(event, "dest_path", None)
if dest and dest != event.src_path:
self._enqueue(dest)
Comment thread
rajkumarsakthivel marked this conversation as resolved.

def _flush(self):
with self._lock:
paths = list(self._pending.keys())
Expand Down
34 changes: 25 additions & 9 deletions src/context_engine/memory/hook_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,38 @@ async def start_hook_server(
app["project_name"] = project_name
add_routes(app)

# Authoritative port file lives in the project's storage_base.
port_file = Path(storage_base) / "serve.port"
# Stable rendezvous file at the *default* storage location. The hook
# shell script always looks here (`${HOME}/.cce/projects/<name>/serve.port`)
# because it has no way to read the user's config.yaml. When storage_path
# is customised, this is the only way capture stays wired up.
default_rendezvous = (
Path.home() / ".cce" / "projects" / project_name / "serve.port"
)
app["_port_files"] = [port_file, default_rendezvous]

async def _close_db(app):
try:
app["memory_db"].close()
except Exception:
log.exception("memory_db close failed")

async def _unlink_port_files(app):
# Covers graceful shutdown paths only. SIGKILL bypasses
# `app.on_cleanup` entirely, so the residual-port-file class of
# bugs (#66) still needs the corresponding socket-liveness probe
# in the hook shell script. What this handler does cleanly cover
# is the orderly SIGINT/SIGTERM/Ctrl-D path so the next session
# doesn't inherit a stale serve.port from a normal exit.
for f in app.get("_port_files", []):
try:
f.unlink(missing_ok=True)
except OSError as exc:
log.warning("serve.port cleanup failed for %s: %s", f, exc)

app.on_cleanup.append(_close_db)
app.on_cleanup.append(_unlink_port_files)

runner = web.AppRunner(app)
await runner.setup()
Expand All @@ -59,18 +84,9 @@ async def _close_db(app):
site = web.TCPSite(runner, host="127.0.0.1", port=port)
await site.start()

# Authoritative port file lives in the project's storage_base.
port_file = Path(storage_base) / "serve.port"
port_file.parent.mkdir(parents=True, exist_ok=True)
port_file.write_text(str(port))

# Stable rendezvous file at the *default* storage location. The hook
# shell script always looks here (`${HOME}/.cce/projects/<name>/serve.port`)
# because it has no way to read the user's config.yaml. When storage_path
# is customised, this is the only way capture stays wired up.
default_rendezvous = (
Path.home() / ".cce" / "projects" / project_name / "serve.port"
)
try:
if default_rendezvous.resolve() != port_file.resolve():
default_rendezvous.parent.mkdir(parents=True, exist_ok=True)
Expand Down
53 changes: 53 additions & 0 deletions tests/indexer/test_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_resolve_parallel_linux_uses_cpu_count(monkeypatch):
def test_resolve_parallel_env_override_wins(monkeypatch):
monkeypatch.setenv("CCE_EMBED_PARALLEL", "2")
monkeypatch.setattr("sys.platform", "darwin")
monkeypatch.setattr("os.cpu_count", lambda: 12)
assert _resolve_parallel() == 2


Expand All @@ -64,3 +65,55 @@ def test_resolve_parallel_invalid_env_falls_through(monkeypatch):
monkeypatch.setattr("sys.platform", "linux")
monkeypatch.setattr("os.cpu_count", lambda: 2)
assert _resolve_parallel() == 2


# ─── Issue #66 regression coverage ──────────────────────────────────────


def test_resolve_parallel_zero_disables(monkeypatch):
"""CCE_EMBED_PARALLEL=0 → None (single-process), not 1.

The old behaviour `max(1, int(v))` floored 0 to 1, but parallel=1 still
takes the multiprocessing path and orphans workers on shutdown
(#66). Zero now means single-process.
"""
monkeypatch.setenv("CCE_EMBED_PARALLEL", "0")
monkeypatch.setattr("sys.platform", "linux")
assert _resolve_parallel() is None


@pytest.mark.parametrize("token", ["none", "off", "false", "no", "NONE", "Off"])
def test_resolve_parallel_string_tokens_disable(monkeypatch, token):
monkeypatch.setenv("CCE_EMBED_PARALLEL", token)
monkeypatch.setattr("sys.platform", "linux")
assert _resolve_parallel() is None


def test_resolve_parallel_caps_at_cpu_count(monkeypatch):
"""CCE_EMBED_PARALLEL=64 on a 12-CPU host must not actually spawn 64."""
monkeypatch.setenv("CCE_EMBED_PARALLEL", "64")
monkeypatch.setattr("sys.platform", "linux")
monkeypatch.setattr("os.cpu_count", lambda: 12)
assert _resolve_parallel() == 12


def test_resolve_parallel_negative_disables(monkeypatch):
monkeypatch.setenv("CCE_EMBED_PARALLEL", "-1")
monkeypatch.setattr("sys.platform", "linux")
assert _resolve_parallel() is None


def test_resolve_parallel_is_lazy(monkeypatch):
"""Resolution must happen on each call, not at import.

`cce serve` relies on setting CCE_EMBED_PARALLEL=0 inside the function
body and having subsequent embed calls observe it.
"""
monkeypatch.delenv("CCE_EMBED_PARALLEL", raising=False)
monkeypatch.setattr("sys.platform", "linux")
monkeypatch.setattr("os.cpu_count", lambda: 8)
first = _resolve_parallel()
assert first == 4
monkeypatch.setenv("CCE_EMBED_PARALLEL", "0")
second = _resolve_parallel()
assert second is None
Loading
Loading