diff --git a/src/context_engine/cli.py b/src/context_engine/cli.py index d14ee12..33a451d 100644 --- a/src/context_engine/cli.py +++ b/src/context_engine/cli.py @@ -2,6 +2,7 @@ """CLI entry point for code-context-engine.""" import asyncio import json +import os import socket import sys from pathlib import Path @@ -2786,6 +2787,16 @@ def phase_fn(msg: str) -> None: async def _run_serve(config) -> None: """Start MCP server with live file watcher.""" import logging + import signal + # Force single-process embedding inside `cce serve` unless the user + # explicitly overrode it. The reindex worker triggered by file changes + # otherwise spawns a fastembed forkserver pool (~4 workers × ~1.6 GB on + # Linux) that orphans on abnormal exit and leaks RSS across `cce index` + # invocations (issue #66). Single-process embed is plenty for one-file + # watcher reindexes; bulk `cce index` run from a separate shell still + # gets the multiprocess path. + os.environ.setdefault("CCE_EMBED_PARALLEL", "0") + from context_engine.storage.local_backend import LocalBackend from context_engine.indexer.embedder import Embedder from context_engine.retrieval.retriever import HybridRetriever @@ -2903,9 +2914,56 @@ async def _reindex_worker(): file=sys.stderr, ) + # Install signal handlers so SIGINT (Ctrl-C), SIGTERM, and SIGHUP all + # route through the same orderly shutdown path. Previously only SIGTERM + # cancelled the MCP task — SIGINT was swallowed by stdio reads, leaving + # `cce serve` unkillable except via SIGKILL, which orphans the embed + # workers (#66). + serve_loop = asyncio.get_running_loop() + mcp_task = asyncio.create_task(mcp.run_stdio()) + + def _request_shutdown(signame: str) -> None: + if not mcp_task.done(): + _log.info("Received %s, shutting down...", signame) + mcp_task.cancel() + + # Build the candidate list with getattr so we don't reference + # `signal.SIGHUP` at the tuple-construction site — SIGHUP is + # undefined on Windows and that AttributeError would fire *before* + # the try/except below could swallow it, crashing `cce serve` on + # Windows entirely (Copilot review on #69). + installed_signals: list[int] = [] + candidate_sigs = [ + s for s in ( + getattr(signal, "SIGINT", None), + getattr(signal, "SIGTERM", None), + getattr(signal, "SIGHUP", None), + ) if s is not None + ] + for _sig in candidate_sigs: + try: + serve_loop.add_signal_handler( + _sig, _request_shutdown, _sig.name, + ) + installed_signals.append(_sig) + except (NotImplementedError, RuntimeError): + # Windows's ProactorEventLoop refuses add_signal_handler; + # asyncio also raises NotImplementedError outside the main + # thread. SIGTERM still arrives via the default Python + # handler in those environments. + pass + try: - await mcp.run_stdio() + try: + await mcp_task + except asyncio.CancelledError: + pass finally: + for _sig in installed_signals: + try: + serve_loop.remove_signal_handler(_sig) + except (NotImplementedError, RuntimeError): + pass if watcher: watcher.stop() if worker_task: diff --git a/src/context_engine/indexer/embedder.py b/src/context_engine/indexer/embedder.py index 38cc504..4917a2f 100644 --- a/src/context_engine/indexer/embedder.py +++ b/src/context_engine/indexer/embedder.py @@ -30,13 +30,34 @@ # hits 100%). On Windows, ONNX Runtime worker processes crash with # ACCESS_VIOLATION (0xC0000005) due to DLL handle inheritance issues. # Default to None on darwin and win32; allow override via CCE_EMBED_PARALLEL. +# +# Override grammar (case-insensitive): +# "0" | "none" | "off" | "false" | "no" → None (single-process) +# "N" (positive integer) → min(N, cpu_count) (cap added +# for #66: 12-CPU users on a fast +# box could otherwise CCE_EMBED_PARALLEL=64 +# and OOM themselves) +# anything else → fall through to platform default +# +# Evaluated lazily (not at import) so a caller — notably `cce serve` — can +# set CCE_EMBED_PARALLEL=0 before any Embedder is constructed and have it +# take effect for that process. +_DISABLED_TOKENS = {"0", "none", "off", "false", "no"} + + def _resolve_parallel() -> int | None: - override = os.environ.get("CCE_EMBED_PARALLEL") + override = os.environ.get("CCE_EMBED_PARALLEL", "").strip().lower() if override: + if override in _DISABLED_TOKENS: + return None try: - return max(1, int(override)) + n = int(override) except ValueError: - pass + n = None + if n is not None: + if n <= 0: + return None + return min(n, os.cpu_count() or n) if sys.platform == "darwin": return None if sys.platform == "win32": @@ -44,9 +65,6 @@ def _resolve_parallel() -> int | None: return min(os.cpu_count() or 2, 4) -_PARALLEL: int | None = _resolve_parallel() - - class Embedder: def __init__( self, @@ -141,7 +159,7 @@ def _embed_all( for i, emb in enumerate(self._model.embed( texts, batch_size=batch_size, - parallel=_PARALLEL, + parallel=_resolve_parallel(), )): chunks[i].embedding = emb.tolist() if progress_fn and ((i + 1) % batch_size == 0 or i + 1 == total): diff --git a/src/context_engine/indexer/watcher.py b/src/context_engine/indexer/watcher.py index 14eac3f..81c91a3 100644 --- a/src/context_engine/indexer/watcher.py +++ b/src/context_engine/indexer/watcher.py @@ -41,10 +41,7 @@ def _should_ignore(self, path: str) -> bool: return True return False - def on_any_event(self, event): - if event.is_directory: - return - path = event.src_path + def _enqueue(self, path: str) -> None: if self._should_ignore(path): return with self._lock: @@ -54,6 +51,40 @@ def on_any_event(self, event): self._timer = threading.Timer(self._debounce_s, self._flush) self._timer.start() + # Only the four content-changing event types trigger a reindex. Earlier + # versions used `on_any_event`, which also fires for `opened` and + # `closed_no_write` — those are emitted hundreds of times whenever a + # sibling `cce index` reads files to hash, causing the serve process to + # spawn a forkserver pool that orphaned ~5 GB on each invocation + # (issue #66). Read-only filesystem activity now goes ignored. + def on_modified(self, event): + if event.is_directory: + return + self._enqueue(event.src_path) + + def on_created(self, event): + if event.is_directory: + return + self._enqueue(event.src_path) + + def on_deleted(self, event): + if event.is_directory: + return + self._enqueue(event.src_path) + + def on_moved(self, event): + if event.is_directory: + return + # A move emits one event with both src_path and dest_path. Enqueue + # each path; _enqueue → _should_ignore drops anything that + # resolves outside the watch dir (or under .cce / an ignore + # pattern). Putting the watch-dir filter inside _enqueue keeps the + # rule in one place for every event type. + self._enqueue(event.src_path) + dest = getattr(event, "dest_path", None) + if dest and dest != event.src_path: + self._enqueue(dest) + def _flush(self): with self._lock: paths = list(self._pending.keys()) diff --git a/src/context_engine/memory/hook_server.py b/src/context_engine/memory/hook_server.py index fe40dd6..3ca8c73 100644 --- a/src/context_engine/memory/hook_server.py +++ b/src/context_engine/memory/hook_server.py @@ -44,13 +44,38 @@ async def start_hook_server( app["project_name"] = project_name add_routes(app) + # Authoritative port file lives in the project's storage_base. + port_file = Path(storage_base) / "serve.port" + # Stable rendezvous file at the *default* storage location. The hook + # shell script always looks here (`${HOME}/.cce/projects//serve.port`) + # because it has no way to read the user's config.yaml. When storage_path + # is customised, this is the only way capture stays wired up. + default_rendezvous = ( + Path.home() / ".cce" / "projects" / project_name / "serve.port" + ) + app["_port_files"] = [port_file, default_rendezvous] + async def _close_db(app): try: app["memory_db"].close() except Exception: log.exception("memory_db close failed") + async def _unlink_port_files(app): + # Covers graceful shutdown paths only. SIGKILL bypasses + # `app.on_cleanup` entirely, so the residual-port-file class of + # bugs (#66) still needs the corresponding socket-liveness probe + # in the hook shell script. What this handler does cleanly cover + # is the orderly SIGINT/SIGTERM/Ctrl-D path so the next session + # doesn't inherit a stale serve.port from a normal exit. + for f in app.get("_port_files", []): + try: + f.unlink(missing_ok=True) + except OSError as exc: + log.warning("serve.port cleanup failed for %s: %s", f, exc) + app.on_cleanup.append(_close_db) + app.on_cleanup.append(_unlink_port_files) runner = web.AppRunner(app) await runner.setup() @@ -59,18 +84,9 @@ async def _close_db(app): site = web.TCPSite(runner, host="127.0.0.1", port=port) await site.start() - # Authoritative port file lives in the project's storage_base. - port_file = Path(storage_base) / "serve.port" port_file.parent.mkdir(parents=True, exist_ok=True) port_file.write_text(str(port)) - # Stable rendezvous file at the *default* storage location. The hook - # shell script always looks here (`${HOME}/.cce/projects//serve.port`) - # because it has no way to read the user's config.yaml. When storage_path - # is customised, this is the only way capture stays wired up. - default_rendezvous = ( - Path.home() / ".cce" / "projects" / project_name / "serve.port" - ) try: if default_rendezvous.resolve() != port_file.resolve(): default_rendezvous.parent.mkdir(parents=True, exist_ok=True) diff --git a/tests/indexer/test_embedder.py b/tests/indexer/test_embedder.py index f876f6d..9f6685b 100644 --- a/tests/indexer/test_embedder.py +++ b/tests/indexer/test_embedder.py @@ -56,6 +56,7 @@ def test_resolve_parallel_linux_uses_cpu_count(monkeypatch): def test_resolve_parallel_env_override_wins(monkeypatch): monkeypatch.setenv("CCE_EMBED_PARALLEL", "2") monkeypatch.setattr("sys.platform", "darwin") + monkeypatch.setattr("os.cpu_count", lambda: 12) assert _resolve_parallel() == 2 @@ -64,3 +65,55 @@ def test_resolve_parallel_invalid_env_falls_through(monkeypatch): monkeypatch.setattr("sys.platform", "linux") monkeypatch.setattr("os.cpu_count", lambda: 2) assert _resolve_parallel() == 2 + + +# ─── Issue #66 regression coverage ────────────────────────────────────── + + +def test_resolve_parallel_zero_disables(monkeypatch): + """CCE_EMBED_PARALLEL=0 → None (single-process), not 1. + + The old behaviour `max(1, int(v))` floored 0 to 1, but parallel=1 still + takes the multiprocessing path and orphans workers on shutdown + (#66). Zero now means single-process. + """ + monkeypatch.setenv("CCE_EMBED_PARALLEL", "0") + monkeypatch.setattr("sys.platform", "linux") + assert _resolve_parallel() is None + + +@pytest.mark.parametrize("token", ["none", "off", "false", "no", "NONE", "Off"]) +def test_resolve_parallel_string_tokens_disable(monkeypatch, token): + monkeypatch.setenv("CCE_EMBED_PARALLEL", token) + monkeypatch.setattr("sys.platform", "linux") + assert _resolve_parallel() is None + + +def test_resolve_parallel_caps_at_cpu_count(monkeypatch): + """CCE_EMBED_PARALLEL=64 on a 12-CPU host must not actually spawn 64.""" + monkeypatch.setenv("CCE_EMBED_PARALLEL", "64") + monkeypatch.setattr("sys.platform", "linux") + monkeypatch.setattr("os.cpu_count", lambda: 12) + assert _resolve_parallel() == 12 + + +def test_resolve_parallel_negative_disables(monkeypatch): + monkeypatch.setenv("CCE_EMBED_PARALLEL", "-1") + monkeypatch.setattr("sys.platform", "linux") + assert _resolve_parallel() is None + + +def test_resolve_parallel_is_lazy(monkeypatch): + """Resolution must happen on each call, not at import. + + `cce serve` relies on setting CCE_EMBED_PARALLEL=0 inside the function + body and having subsequent embed calls observe it. + """ + monkeypatch.delenv("CCE_EMBED_PARALLEL", raising=False) + monkeypatch.setattr("sys.platform", "linux") + monkeypatch.setattr("os.cpu_count", lambda: 8) + first = _resolve_parallel() + assert first == 4 + monkeypatch.setenv("CCE_EMBED_PARALLEL", "0") + second = _resolve_parallel() + assert second is None diff --git a/tests/indexer/test_watcher.py b/tests/indexer/test_watcher.py index 29253ba..f35380f 100644 --- a/tests/indexer/test_watcher.py +++ b/tests/indexer/test_watcher.py @@ -111,3 +111,118 @@ async def on_change(path: str): await asyncio.sleep(0.8) watcher.stop() assert len(events) < 5 + + +# ─── Issue #66 regression coverage: event-type filtering ──────────────── + + +class _FakeEvent: + def __init__(self, src_path, *, is_directory=False, dest_path=None): + self.src_path = src_path + self.is_directory = is_directory + if dest_path is not None: + self.dest_path = dest_path + + +def _make_handler(tmp_path, queued, _registry: list | None = None): + """Build a _DebouncedHandler with a recorder swapped in for _enqueue. + + A loop is required by the constructor but never used here — + `_enqueue` is monkey-patched below, so `on_change` is never + scheduled. Loops created by this helper are tracked in a + per-test registry so the test can close them on teardown — leaving + them open emits ResourceWarnings (Copilot review on #69). + """ + from context_engine.indexer.watcher import _DebouncedHandler + loop = asyncio.new_event_loop() + if _registry is not None: + _registry.append(loop) + handler = _DebouncedHandler( + on_change=lambda p: None, + debounce_ms=10, + ignore_patterns=[], + watch_dir=str(tmp_path), + loop=loop, + ) + handler._enqueue = lambda path: queued.append(path) + return handler + + +@pytest.fixture +def synthetic_loops(): + """Per-test registry that closes any loops created via _make_handler. + + Tests that exercise the typed-handler dispatch use synthetic loops + they never run; without explicit close() each one leaks a + ResourceWarning on garbage collection. + """ + loops: list[asyncio.AbstractEventLoop] = [] + yield loops + for loop in loops: + try: + loop.close() + except Exception: + pass + + +def test_watcher_ignores_read_only_events(tmp_path, synthetic_loops): + """Read-only `opened`/`closed_no_write` events from a sibling `cce index` + must not enqueue work. This is the trigger half of the #66 leak — + without this, hundreds of read events per `cce index --path X` cascade + into the reindex worker and spawn embed pools. + """ + queued: list[str] = [] + handler = _make_handler(tmp_path, queued, synthetic_loops) + + # Read-only events flow through the base FileSystemEventHandler stubs, + # which are no-ops on our subclass. They must NOT result in anything + # being enqueued. + for read_only in ("on_opened", "on_closed", "on_closed_no_write"): + method = getattr(handler, read_only, None) + if method is None: + continue + try: + method(_FakeEvent(str(tmp_path / f"{read_only}.py"))) + except TypeError: + # Some watchdog versions require the event to have specific + # attributes — that's still proof the method isn't ours. + pass + assert queued == [], ( + "Read-only events leaked into the reindex queue: %s" % queued + ) + + # The four content-changing types DO fire. + handler.on_modified(_FakeEvent(str(tmp_path / "a.py"))) + handler.on_created(_FakeEvent(str(tmp_path / "b.py"))) + handler.on_deleted(_FakeEvent(str(tmp_path / "c.py"))) + handler.on_moved(_FakeEvent( + str(tmp_path / "old.py"), dest_path=str(tmp_path / "new.py"), + )) + assert str(tmp_path / "a.py") in queued + assert str(tmp_path / "b.py") in queued + assert str(tmp_path / "c.py") in queued + assert str(tmp_path / "old.py") in queued + assert str(tmp_path / "new.py") in queued + + +def test_watcher_skips_directory_events(tmp_path, synthetic_loops): + queued: list[str] = [] + handler = _make_handler(tmp_path, queued, synthetic_loops) + handler.on_modified(_FakeEvent(str(tmp_path / "subdir"), is_directory=True)) + handler.on_created(_FakeEvent(str(tmp_path / "subdir2"), is_directory=True)) + handler.on_deleted(_FakeEvent(str(tmp_path / "subdir3"), is_directory=True)) + handler.on_moved(_FakeEvent( + str(tmp_path / "old_dir"), is_directory=True, + dest_path=str(tmp_path / "new_dir"), + )) + assert queued == [] + + +def test_watcher_move_with_same_src_and_dest(tmp_path, synthetic_loops): + """Spurious move events sometimes report src == dest; queue only once.""" + queued: list[str] = [] + handler = _make_handler(tmp_path, queued, synthetic_loops) + handler.on_moved(_FakeEvent( + str(tmp_path / "x.py"), dest_path=str(tmp_path / "x.py"), + )) + assert queued == [str(tmp_path / "x.py")] diff --git a/tests/memory/test_hook_server_cleanup.py b/tests/memory/test_hook_server_cleanup.py new file mode 100644 index 0000000..2986bfb --- /dev/null +++ b/tests/memory/test_hook_server_cleanup.py @@ -0,0 +1,86 @@ +"""Tests for hook server lifecycle — port file cleanup on shutdown. + +Issue #66: stale serve.port files survived `cce serve` exits, and the next +session's hooks would POST to a dead port. The on_cleanup hook now unlinks +both the storage_base and rendezvous port files. +""" +from pathlib import Path + +import pytest + +from context_engine.memory.hook_server import start_hook_server + + +@pytest.mark.asyncio +async def test_port_files_created_and_cleaned_up(tmp_path, monkeypatch): + # Redirect the rendezvous file (Path.home() / .cce / projects / NAME) + # to a sandboxed home so we don't pollute the real one. + fake_home = tmp_path / "fake_home" + fake_home.mkdir() + monkeypatch.setattr(Path, "home", lambda: fake_home) + + storage_base = tmp_path / "storage" + storage_base.mkdir() + + runner, port = await start_hook_server( + storage_base=storage_base, project_name="proj_under_test", + ) + try: + port_file = storage_base / "serve.port" + rendezvous = fake_home / ".cce" / "projects" / "proj_under_test" / "serve.port" + + # Both files should exist with the bound port. + assert port_file.exists() + assert rendezvous.exists() + assert port_file.read_text() == str(port) + assert rendezvous.read_text() == str(port) + finally: + await runner.cleanup() + + # After cleanup, both files must be gone. + assert not (storage_base / "serve.port").exists() + assert not (fake_home / ".cce" / "projects" / "proj_under_test" / "serve.port").exists() + + +@pytest.mark.asyncio +async def test_cleanup_tolerates_missing_files(tmp_path, monkeypatch): + """If a port file was already removed externally, cleanup must not raise.""" + fake_home = tmp_path / "fake_home" + fake_home.mkdir() + monkeypatch.setattr(Path, "home", lambda: fake_home) + + storage_base = tmp_path / "storage" + storage_base.mkdir() + + runner, _ = await start_hook_server( + storage_base=storage_base, project_name="proj_x", + ) + + # Yank the files out from under cleanup. + (storage_base / "serve.port").unlink() + (fake_home / ".cce" / "projects" / "proj_x" / "serve.port").unlink() + + # Must not raise. + await runner.cleanup() + + +@pytest.mark.asyncio +async def test_cleanup_when_storage_equals_rendezvous(tmp_path, monkeypatch): + """If storage_base resolves to the same path as the rendezvous (default + storage layout), the unlink still happens exactly once and cleanly.""" + fake_home = tmp_path / "fake_home" + fake_home.mkdir() + monkeypatch.setattr(Path, "home", lambda: fake_home) + + # Mimic the default layout: storage IS the rendezvous parent. + storage_base = fake_home / ".cce" / "projects" / "proj_default" + storage_base.mkdir(parents=True) + + runner, _ = await start_hook_server( + storage_base=storage_base, project_name="proj_default", + ) + try: + assert (storage_base / "serve.port").exists() + finally: + await runner.cleanup() + assert not (storage_base / "serve.port").exists()