From 639f45f143313c21cbeee0f564efcec5f50c75fc Mon Sep 17 00:00:00 2001 From: Fazle Elahee Date: Tue, 12 May 2026 10:43:30 +0100 Subject: [PATCH 1/2] fix: stop forkserver workers from orphaning when serve+index race (#66) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five fixes for the WSL OOM AZagatti reported, all in one PR because they form a connected failure mode: a sibling `cce index` triggers read-only events that `cce serve` mis-routes into its reindex worker, which spawns a fastembed forkserver pool that orphans on shutdown. 1. watcher: switch _DebouncedHandler from on_any_event to typed on_modified/on_created/on_deleted/on_moved handlers. Read-only `opened`/`closed_no_write` events from the sibling indexer no longer queue reindex work. This is the actual trigger. 2. embedder._resolve_parallel: CCE_EMBED_PARALLEL=0 (and the strings "none"/"off"/"false"/"no", case-insensitive) now map to None (single-process), matching the darwin/win32 default. Previously max(1, int(v)) floored to 1, but parallel=1 still takes the multiprocessing path. Also caps positive overrides at cpu_count so `CCE_EMBED_PARALLEL=64` on a 12-CPU host can't over-spawn. 3. embedder: _resolve_parallel is now evaluated per embed call instead of at import. Required for `cce serve` to set the env var inside the function and have it take effect. 4. cli._run_serve: defaults CCE_EMBED_PARALLEL=0 via os.environ.setdefault so any reindex worker triggered inside `cce serve` runs single-process. User override (CCE_EMBED_PARALLEL=4 in .mcp.json) is respected. Bulk `cce index` from a separate shell still gets the multiprocess path. 5. cli._run_serve: SIGINT, SIGTERM, and SIGHUP all route through one asyncio signal handler that cancels the MCP task, so Ctrl-C now triggers the existing finally cleanup instead of being swallowed by stdio reads. Previously only SIGTERM worked, leaving SIGKILL as the common path — which is exactly what produced the orphan workers. 6. hook_server: register an on_cleanup callback that unlinks both serve.port files (storage_base + ~/.cce/projects//serve.port). Stale port files used to survive across exits and silently break memory capture in the next session. Tests added: - Six new test_embedder.py cases: CCE_EMBED_PARALLEL=0, the string tokens, cpu_count cap, negative values, lazy re-evaluation - Three new test_watcher.py cases: read-only event suppression, directory event skip, idempotent moves with src==dest - New tests/memory/test_hook_server_cleanup.py: port files created and removed, tolerates missing files, default-layout edge case Suite: 891 passed, 1 skipped, 0 failed. Ruff clean. --- src/context_engine/cli.py | 46 +++++++++++- src/context_engine/indexer/embedder.py | 32 +++++++-- src/context_engine/indexer/watcher.py | 37 ++++++++-- src/context_engine/memory/hook_server.py | 33 ++++++--- tests/indexer/test_embedder.py | 53 ++++++++++++++ tests/indexer/test_watcher.py | 92 ++++++++++++++++++++++++ tests/memory/test_hook_server_cleanup.py | 86 ++++++++++++++++++++++ 7 files changed, 358 insertions(+), 21 deletions(-) create mode 100644 tests/memory/test_hook_server_cleanup.py diff --git a/src/context_engine/cli.py b/src/context_engine/cli.py index d14ee12..03ab2ab 100644 --- a/src/context_engine/cli.py +++ b/src/context_engine/cli.py @@ -2,6 +2,7 @@ """CLI entry point for code-context-engine.""" import asyncio import json +import os import socket import sys from pathlib import Path @@ -2786,6 +2787,16 @@ def phase_fn(msg: str) -> None: async def _run_serve(config) -> None: """Start MCP server with live file watcher.""" import logging + import signal + # Force single-process embedding inside `cce serve` unless the user + # explicitly overrode it. The reindex worker triggered by file changes + # otherwise spawns a fastembed forkserver pool (~4 workers × ~1.6 GB on + # Linux) that orphans on abnormal exit and leaks RSS across `cce index` + # invocations (issue #66). Single-process embed is plenty for one-file + # watcher reindexes; bulk `cce index` run from a separate shell still + # gets the multiprocess path. + os.environ.setdefault("CCE_EMBED_PARALLEL", "0") + from context_engine.storage.local_backend import LocalBackend from context_engine.indexer.embedder import Embedder from context_engine.retrieval.retriever import HybridRetriever @@ -2903,9 +2914,42 @@ async def _reindex_worker(): file=sys.stderr, ) + # Install signal handlers so SIGINT (Ctrl-C), SIGTERM, and SIGHUP all + # route through the same orderly shutdown path. Previously only SIGTERM + # cancelled the MCP task — SIGINT was swallowed by stdio reads, leaving + # `cce serve` unkillable except via SIGKILL, which orphans the embed + # workers (#66). + serve_loop = asyncio.get_running_loop() + mcp_task = asyncio.create_task(mcp.run_stdio()) + + def _request_shutdown(signame: str) -> None: + if not mcp_task.done(): + _log.info("Received %s, shutting down...", signame) + mcp_task.cancel() + + installed_signals: list[int] = [] + for _sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP): + try: + serve_loop.add_signal_handler( + _sig, _request_shutdown, _sig.name, + ) + installed_signals.append(_sig) + except (NotImplementedError, RuntimeError, AttributeError): + # Windows lacks add_signal_handler for SIGHUP, and asyncio + # raises NotImplementedError outside the main thread. + pass + try: - await mcp.run_stdio() + try: + await mcp_task + except asyncio.CancelledError: + pass finally: + for _sig in installed_signals: + try: + serve_loop.remove_signal_handler(_sig) + except (NotImplementedError, RuntimeError): + pass if watcher: watcher.stop() if worker_task: diff --git a/src/context_engine/indexer/embedder.py b/src/context_engine/indexer/embedder.py index 38cc504..9650942 100644 --- a/src/context_engine/indexer/embedder.py +++ b/src/context_engine/indexer/embedder.py @@ -30,13 +30,34 @@ # hits 100%). On Windows, ONNX Runtime worker processes crash with # ACCESS_VIOLATION (0xC0000005) due to DLL handle inheritance issues. # Default to None on darwin and win32; allow override via CCE_EMBED_PARALLEL. +# +# Override grammar (case-insensitive): +# "0" | "none" | "off" | "false" | "no" → None (single-process) +# "=N" → min(N, cpu_count) (cap added +# for #66: 12-CPU users on a fast +# box could otherwise CCE_EMBED_PARALLEL=64 +# and OOM themselves) +# anything else → fall through to platform default +# +# Evaluated lazily (not at import) so a caller — notably `cce serve` — can +# set CCE_EMBED_PARALLEL=0 before any Embedder is constructed and have it +# take effect for that process. +_DISABLED_TOKENS = {"0", "none", "off", "false", "no"} + + def _resolve_parallel() -> int | None: - override = os.environ.get("CCE_EMBED_PARALLEL") + override = os.environ.get("CCE_EMBED_PARALLEL", "").strip().lower() if override: + if override in _DISABLED_TOKENS: + return None try: - return max(1, int(override)) + n = int(override) except ValueError: - pass + n = None + if n is not None: + if n <= 0: + return None + return min(n, os.cpu_count() or n) if sys.platform == "darwin": return None if sys.platform == "win32": @@ -44,9 +65,6 @@ def _resolve_parallel() -> int | None: return min(os.cpu_count() or 2, 4) -_PARALLEL: int | None = _resolve_parallel() - - class Embedder: def __init__( self, @@ -141,7 +159,7 @@ def _embed_all( for i, emb in enumerate(self._model.embed( texts, batch_size=batch_size, - parallel=_PARALLEL, + parallel=_resolve_parallel(), )): chunks[i].embedding = emb.tolist() if progress_fn and ((i + 1) % batch_size == 0 or i + 1 == total): diff --git a/src/context_engine/indexer/watcher.py b/src/context_engine/indexer/watcher.py index 14eac3f..18ffff2 100644 --- a/src/context_engine/indexer/watcher.py +++ b/src/context_engine/indexer/watcher.py @@ -41,10 +41,7 @@ def _should_ignore(self, path: str) -> bool: return True return False - def on_any_event(self, event): - if event.is_directory: - return - path = event.src_path + def _enqueue(self, path: str) -> None: if self._should_ignore(path): return with self._lock: @@ -54,6 +51,38 @@ def on_any_event(self, event): self._timer = threading.Timer(self._debounce_s, self._flush) self._timer.start() + # Only the four content-changing event types trigger a reindex. Earlier + # versions used `on_any_event`, which also fires for `opened` and + # `closed_no_write` — those are emitted hundreds of times whenever a + # sibling `cce index` reads files to hash, causing the serve process to + # spawn a forkserver pool that orphaned ~5 GB on each invocation + # (issue #66). Read-only filesystem activity now goes ignored. + def on_modified(self, event): + if event.is_directory: + return + self._enqueue(event.src_path) + + def on_created(self, event): + if event.is_directory: + return + self._enqueue(event.src_path) + + def on_deleted(self, event): + if event.is_directory: + return + self._enqueue(event.src_path) + + def on_moved(self, event): + if event.is_directory: + return + # A move emits one event with both src_path and dest_path; queue + # whichever side is inside the watch dir (or both, if it's a rename + # within the project). + self._enqueue(event.src_path) + dest = getattr(event, "dest_path", None) + if dest and dest != event.src_path: + self._enqueue(dest) + def _flush(self): with self._lock: paths = list(self._pending.keys()) diff --git a/src/context_engine/memory/hook_server.py b/src/context_engine/memory/hook_server.py index fe40dd6..5636f21 100644 --- a/src/context_engine/memory/hook_server.py +++ b/src/context_engine/memory/hook_server.py @@ -44,13 +44,37 @@ async def start_hook_server( app["project_name"] = project_name add_routes(app) + # Authoritative port file lives in the project's storage_base. + port_file = Path(storage_base) / "serve.port" + # Stable rendezvous file at the *default* storage location. The hook + # shell script always looks here (`${HOME}/.cce/projects//serve.port`) + # because it has no way to read the user's config.yaml. When storage_path + # is customised, this is the only way capture stays wired up. + default_rendezvous = ( + Path.home() / ".cce" / "projects" / project_name / "serve.port" + ) + app["_port_files"] = [port_file, default_rendezvous] + async def _close_db(app): try: app["memory_db"].close() except Exception: log.exception("memory_db close failed") + async def _unlink_port_files(app): + # Without this, stale serve.port files survived `cce serve` exits + # (especially SIGKILL after the embed-worker leak in #66). The hook + # shell script would then POST to a dead port and capture would + # silently break for the next session. Best-effort unlink — if the + # file is already gone or unwritable, that's fine. + for f in app.get("_port_files", []): + try: + f.unlink(missing_ok=True) + except OSError as exc: + log.warning("serve.port cleanup failed for %s: %s", f, exc) + app.on_cleanup.append(_close_db) + app.on_cleanup.append(_unlink_port_files) runner = web.AppRunner(app) await runner.setup() @@ -59,18 +83,9 @@ async def _close_db(app): site = web.TCPSite(runner, host="127.0.0.1", port=port) await site.start() - # Authoritative port file lives in the project's storage_base. - port_file = Path(storage_base) / "serve.port" port_file.parent.mkdir(parents=True, exist_ok=True) port_file.write_text(str(port)) - # Stable rendezvous file at the *default* storage location. The hook - # shell script always looks here (`${HOME}/.cce/projects//serve.port`) - # because it has no way to read the user's config.yaml. When storage_path - # is customised, this is the only way capture stays wired up. - default_rendezvous = ( - Path.home() / ".cce" / "projects" / project_name / "serve.port" - ) try: if default_rendezvous.resolve() != port_file.resolve(): default_rendezvous.parent.mkdir(parents=True, exist_ok=True) diff --git a/tests/indexer/test_embedder.py b/tests/indexer/test_embedder.py index f876f6d..9f6685b 100644 --- a/tests/indexer/test_embedder.py +++ b/tests/indexer/test_embedder.py @@ -56,6 +56,7 @@ def test_resolve_parallel_linux_uses_cpu_count(monkeypatch): def test_resolve_parallel_env_override_wins(monkeypatch): monkeypatch.setenv("CCE_EMBED_PARALLEL", "2") monkeypatch.setattr("sys.platform", "darwin") + monkeypatch.setattr("os.cpu_count", lambda: 12) assert _resolve_parallel() == 2 @@ -64,3 +65,55 @@ def test_resolve_parallel_invalid_env_falls_through(monkeypatch): monkeypatch.setattr("sys.platform", "linux") monkeypatch.setattr("os.cpu_count", lambda: 2) assert _resolve_parallel() == 2 + + +# ─── Issue #66 regression coverage ────────────────────────────────────── + + +def test_resolve_parallel_zero_disables(monkeypatch): + """CCE_EMBED_PARALLEL=0 → None (single-process), not 1. + + The old behaviour `max(1, int(v))` floored 0 to 1, but parallel=1 still + takes the multiprocessing path and orphans workers on shutdown + (#66). Zero now means single-process. + """ + monkeypatch.setenv("CCE_EMBED_PARALLEL", "0") + monkeypatch.setattr("sys.platform", "linux") + assert _resolve_parallel() is None + + +@pytest.mark.parametrize("token", ["none", "off", "false", "no", "NONE", "Off"]) +def test_resolve_parallel_string_tokens_disable(monkeypatch, token): + monkeypatch.setenv("CCE_EMBED_PARALLEL", token) + monkeypatch.setattr("sys.platform", "linux") + assert _resolve_parallel() is None + + +def test_resolve_parallel_caps_at_cpu_count(monkeypatch): + """CCE_EMBED_PARALLEL=64 on a 12-CPU host must not actually spawn 64.""" + monkeypatch.setenv("CCE_EMBED_PARALLEL", "64") + monkeypatch.setattr("sys.platform", "linux") + monkeypatch.setattr("os.cpu_count", lambda: 12) + assert _resolve_parallel() == 12 + + +def test_resolve_parallel_negative_disables(monkeypatch): + monkeypatch.setenv("CCE_EMBED_PARALLEL", "-1") + monkeypatch.setattr("sys.platform", "linux") + assert _resolve_parallel() is None + + +def test_resolve_parallel_is_lazy(monkeypatch): + """Resolution must happen on each call, not at import. + + `cce serve` relies on setting CCE_EMBED_PARALLEL=0 inside the function + body and having subsequent embed calls observe it. + """ + monkeypatch.delenv("CCE_EMBED_PARALLEL", raising=False) + monkeypatch.setattr("sys.platform", "linux") + monkeypatch.setattr("os.cpu_count", lambda: 8) + first = _resolve_parallel() + assert first == 4 + monkeypatch.setenv("CCE_EMBED_PARALLEL", "0") + second = _resolve_parallel() + assert second is None diff --git a/tests/indexer/test_watcher.py b/tests/indexer/test_watcher.py index 29253ba..b0f6c53 100644 --- a/tests/indexer/test_watcher.py +++ b/tests/indexer/test_watcher.py @@ -111,3 +111,95 @@ async def on_change(path: str): await asyncio.sleep(0.8) watcher.stop() assert len(events) < 5 + + +# ─── Issue #66 regression coverage: event-type filtering ──────────────── + + +class _FakeEvent: + def __init__(self, src_path, *, is_directory=False, dest_path=None): + self.src_path = src_path + self.is_directory = is_directory + if dest_path is not None: + self.dest_path = dest_path + + +def _make_handler(tmp_path, queued): + from context_engine.indexer.watcher import _DebouncedHandler + # A loop is required by the handler constructor but never used here — + # _enqueue is monkey-patched below, so on_change is never scheduled. + loop = asyncio.new_event_loop() + handler = _DebouncedHandler( + on_change=lambda p: None, + debounce_ms=10, + ignore_patterns=[], + watch_dir=str(tmp_path), + loop=loop, + ) + # Swap in a recorder so we can assert what was enqueued without timing + # out on the real debounce timer. + handler._enqueue = lambda path: queued.append(path) + return handler + + +def test_watcher_ignores_read_only_events(tmp_path): + """Read-only `opened`/`closed_no_write` events from a sibling `cce index` + must not enqueue work. This is the trigger half of the #66 leak — + without this, hundreds of read events per `cce index --path X` cascade + into the reindex worker and spawn embed pools. + """ + queued: list[str] = [] + handler = _make_handler(tmp_path, queued) + + # Read-only events flow through the base FileSystemEventHandler stubs, + # which are no-ops on our subclass. They must NOT result in anything + # being enqueued. + for read_only in ("on_opened", "on_closed", "on_closed_no_write"): + method = getattr(handler, read_only, None) + if method is None: + continue + try: + method(_FakeEvent(str(tmp_path / f"{read_only}.py"))) + except TypeError: + # Some watchdog versions require the event to have specific + # attributes — that's still proof the method isn't ours. + pass + assert queued == [], ( + "Read-only events leaked into the reindex queue: %s" % queued + ) + + # The four content-changing types DO fire. + handler.on_modified(_FakeEvent(str(tmp_path / "a.py"))) + handler.on_created(_FakeEvent(str(tmp_path / "b.py"))) + handler.on_deleted(_FakeEvent(str(tmp_path / "c.py"))) + handler.on_moved(_FakeEvent( + str(tmp_path / "old.py"), dest_path=str(tmp_path / "new.py"), + )) + assert str(tmp_path / "a.py") in queued + assert str(tmp_path / "b.py") in queued + assert str(tmp_path / "c.py") in queued + assert str(tmp_path / "old.py") in queued + assert str(tmp_path / "new.py") in queued + + +def test_watcher_skips_directory_events(tmp_path): + queued: list[str] = [] + handler = _make_handler(tmp_path, queued) + handler.on_modified(_FakeEvent(str(tmp_path / "subdir"), is_directory=True)) + handler.on_created(_FakeEvent(str(tmp_path / "subdir2"), is_directory=True)) + handler.on_deleted(_FakeEvent(str(tmp_path / "subdir3"), is_directory=True)) + handler.on_moved(_FakeEvent( + str(tmp_path / "old_dir"), is_directory=True, + dest_path=str(tmp_path / "new_dir"), + )) + assert queued == [] + + +def test_watcher_move_with_same_src_and_dest(tmp_path): + """Spurious move events sometimes report src == dest; queue only once.""" + queued: list[str] = [] + handler = _make_handler(tmp_path, queued) + handler.on_moved(_FakeEvent( + str(tmp_path / "x.py"), dest_path=str(tmp_path / "x.py"), + )) + assert queued == [str(tmp_path / "x.py")] diff --git a/tests/memory/test_hook_server_cleanup.py b/tests/memory/test_hook_server_cleanup.py new file mode 100644 index 0000000..2986bfb --- /dev/null +++ b/tests/memory/test_hook_server_cleanup.py @@ -0,0 +1,86 @@ +"""Tests for hook server lifecycle — port file cleanup on shutdown. + +Issue #66: stale serve.port files survived `cce serve` exits, and the next +session's hooks would POST to a dead port. The on_cleanup hook now unlinks +both the storage_base and rendezvous port files. +""" +from pathlib import Path + +import pytest + +from context_engine.memory.hook_server import start_hook_server + + +@pytest.mark.asyncio +async def test_port_files_created_and_cleaned_up(tmp_path, monkeypatch): + # Redirect the rendezvous file (Path.home() / .cce / projects / NAME) + # to a sandboxed home so we don't pollute the real one. + fake_home = tmp_path / "fake_home" + fake_home.mkdir() + monkeypatch.setattr(Path, "home", lambda: fake_home) + + storage_base = tmp_path / "storage" + storage_base.mkdir() + + runner, port = await start_hook_server( + storage_base=storage_base, project_name="proj_under_test", + ) + try: + port_file = storage_base / "serve.port" + rendezvous = fake_home / ".cce" / "projects" / "proj_under_test" / "serve.port" + + # Both files should exist with the bound port. + assert port_file.exists() + assert rendezvous.exists() + assert port_file.read_text() == str(port) + assert rendezvous.read_text() == str(port) + finally: + await runner.cleanup() + + # After cleanup, both files must be gone. + assert not (storage_base / "serve.port").exists() + assert not (fake_home / ".cce" / "projects" / "proj_under_test" / "serve.port").exists() + + +@pytest.mark.asyncio +async def test_cleanup_tolerates_missing_files(tmp_path, monkeypatch): + """If a port file was already removed externally, cleanup must not raise.""" + fake_home = tmp_path / "fake_home" + fake_home.mkdir() + monkeypatch.setattr(Path, "home", lambda: fake_home) + + storage_base = tmp_path / "storage" + storage_base.mkdir() + + runner, _ = await start_hook_server( + storage_base=storage_base, project_name="proj_x", + ) + + # Yank the files out from under cleanup. + (storage_base / "serve.port").unlink() + (fake_home / ".cce" / "projects" / "proj_x" / "serve.port").unlink() + + # Must not raise. + await runner.cleanup() + + +@pytest.mark.asyncio +async def test_cleanup_when_storage_equals_rendezvous(tmp_path, monkeypatch): + """If storage_base resolves to the same path as the rendezvous (default + storage layout), the unlink still happens exactly once and cleanly.""" + fake_home = tmp_path / "fake_home" + fake_home.mkdir() + monkeypatch.setattr(Path, "home", lambda: fake_home) + + # Mimic the default layout: storage IS the rendezvous parent. + storage_base = fake_home / ".cce" / "projects" / "proj_default" + storage_base.mkdir(parents=True) + + runner, _ = await start_hook_server( + storage_base=storage_base, project_name="proj_default", + ) + try: + assert (storage_base / "serve.port").exists() + finally: + await runner.cleanup() + assert not (storage_base / "serve.port").exists() From 86b2de8fbc26ada0be205af02049bd8e4bc83da9 Mon Sep 17 00:00:00 2001 From: Fazle Elahee Date: Tue, 12 May 2026 13:58:29 +0100 Subject: [PATCH 2/2] fix: address Copilot review feedback on #69 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five review findings, all resolved: 1. Windows crash: `signal.SIGHUP` is not defined on Windows, so the literal tuple `(signal.SIGINT, signal.SIGTERM, signal.SIGHUP)` raised AttributeError *before* the try/except could swallow it, making `cce serve` unrunnable on Windows. Build the candidate list with `getattr(signal, 'SIGHUP', None)` and skip None entries. 2. Test event loop leak: `_make_handler` created an `asyncio.new_event_loop()` that was never closed, emitting ResourceWarnings on GC and risking flakes under -W error. Tracked the loops in a per-test `synthetic_loops` fixture that closes them on teardown. 3. hook_server cleanup comment used to claim it covered SIGKILL paths, but `app.on_cleanup` doesn't fire on SIGKILL. Reworded to describe what it actually covers (graceful exits) and pointed at the socket-liveness probe for the ungraceful case. 4. watcher.on_moved comment said the handler queued "whichever side is inside the watch dir" but the implementation didn't actually filter — _enqueue/_should_ignore does. Reworded the comment to match the real flow. 5. embedder._resolve_parallel docstring used a misleading `"=N"` placeholder. Replaced with `"N"` to match the actual parser. 891 passed, 1 skipped, 0 failed. Ruff clean. --- src/context_engine/cli.py | 22 +++++++++--- src/context_engine/indexer/embedder.py | 2 +- src/context_engine/indexer/watcher.py | 8 +++-- src/context_engine/memory/hook_server.py | 11 +++--- tests/indexer/test_watcher.py | 45 ++++++++++++++++++------ 5 files changed, 64 insertions(+), 24 deletions(-) diff --git a/src/context_engine/cli.py b/src/context_engine/cli.py index 03ab2ab..33a451d 100644 --- a/src/context_engine/cli.py +++ b/src/context_engine/cli.py @@ -2927,16 +2927,30 @@ def _request_shutdown(signame: str) -> None: _log.info("Received %s, shutting down...", signame) mcp_task.cancel() + # Build the candidate list with getattr so we don't reference + # `signal.SIGHUP` at the tuple-construction site — SIGHUP is + # undefined on Windows and that AttributeError would fire *before* + # the try/except below could swallow it, crashing `cce serve` on + # Windows entirely (Copilot review on #69). installed_signals: list[int] = [] - for _sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP): + candidate_sigs = [ + s for s in ( + getattr(signal, "SIGINT", None), + getattr(signal, "SIGTERM", None), + getattr(signal, "SIGHUP", None), + ) if s is not None + ] + for _sig in candidate_sigs: try: serve_loop.add_signal_handler( _sig, _request_shutdown, _sig.name, ) installed_signals.append(_sig) - except (NotImplementedError, RuntimeError, AttributeError): - # Windows lacks add_signal_handler for SIGHUP, and asyncio - # raises NotImplementedError outside the main thread. + except (NotImplementedError, RuntimeError): + # Windows's ProactorEventLoop refuses add_signal_handler; + # asyncio also raises NotImplementedError outside the main + # thread. SIGTERM still arrives via the default Python + # handler in those environments. pass try: diff --git a/src/context_engine/indexer/embedder.py b/src/context_engine/indexer/embedder.py index 9650942..4917a2f 100644 --- a/src/context_engine/indexer/embedder.py +++ b/src/context_engine/indexer/embedder.py @@ -33,7 +33,7 @@ # # Override grammar (case-insensitive): # "0" | "none" | "off" | "false" | "no" → None (single-process) -# "=N" → min(N, cpu_count) (cap added +# "N" (positive integer) → min(N, cpu_count) (cap added # for #66: 12-CPU users on a fast # box could otherwise CCE_EMBED_PARALLEL=64 # and OOM themselves) diff --git a/src/context_engine/indexer/watcher.py b/src/context_engine/indexer/watcher.py index 18ffff2..81c91a3 100644 --- a/src/context_engine/indexer/watcher.py +++ b/src/context_engine/indexer/watcher.py @@ -75,9 +75,11 @@ def on_deleted(self, event): def on_moved(self, event): if event.is_directory: return - # A move emits one event with both src_path and dest_path; queue - # whichever side is inside the watch dir (or both, if it's a rename - # within the project). + # A move emits one event with both src_path and dest_path. Enqueue + # each path; _enqueue → _should_ignore drops anything that + # resolves outside the watch dir (or under .cce / an ignore + # pattern). Putting the watch-dir filter inside _enqueue keeps the + # rule in one place for every event type. self._enqueue(event.src_path) dest = getattr(event, "dest_path", None) if dest and dest != event.src_path: diff --git a/src/context_engine/memory/hook_server.py b/src/context_engine/memory/hook_server.py index 5636f21..3ca8c73 100644 --- a/src/context_engine/memory/hook_server.py +++ b/src/context_engine/memory/hook_server.py @@ -62,11 +62,12 @@ async def _close_db(app): log.exception("memory_db close failed") async def _unlink_port_files(app): - # Without this, stale serve.port files survived `cce serve` exits - # (especially SIGKILL after the embed-worker leak in #66). The hook - # shell script would then POST to a dead port and capture would - # silently break for the next session. Best-effort unlink — if the - # file is already gone or unwritable, that's fine. + # Covers graceful shutdown paths only. SIGKILL bypasses + # `app.on_cleanup` entirely, so the residual-port-file class of + # bugs (#66) still needs the corresponding socket-liveness probe + # in the hook shell script. What this handler does cleanly cover + # is the orderly SIGINT/SIGTERM/Ctrl-D path so the next session + # doesn't inherit a stale serve.port from a normal exit. for f in app.get("_port_files", []): try: f.unlink(missing_ok=True) diff --git a/tests/indexer/test_watcher.py b/tests/indexer/test_watcher.py index b0f6c53..f35380f 100644 --- a/tests/indexer/test_watcher.py +++ b/tests/indexer/test_watcher.py @@ -124,11 +124,19 @@ def __init__(self, src_path, *, is_directory=False, dest_path=None): self.dest_path = dest_path -def _make_handler(tmp_path, queued): +def _make_handler(tmp_path, queued, _registry: list | None = None): + """Build a _DebouncedHandler with a recorder swapped in for _enqueue. + + A loop is required by the constructor but never used here — + `_enqueue` is monkey-patched below, so `on_change` is never + scheduled. Loops created by this helper are tracked in a + per-test registry so the test can close them on teardown — leaving + them open emits ResourceWarnings (Copilot review on #69). + """ from context_engine.indexer.watcher import _DebouncedHandler - # A loop is required by the handler constructor but never used here — - # _enqueue is monkey-patched below, so on_change is never scheduled. loop = asyncio.new_event_loop() + if _registry is not None: + _registry.append(loop) handler = _DebouncedHandler( on_change=lambda p: None, debounce_ms=10, @@ -136,20 +144,35 @@ def _make_handler(tmp_path, queued): watch_dir=str(tmp_path), loop=loop, ) - # Swap in a recorder so we can assert what was enqueued without timing - # out on the real debounce timer. handler._enqueue = lambda path: queued.append(path) return handler -def test_watcher_ignores_read_only_events(tmp_path): +@pytest.fixture +def synthetic_loops(): + """Per-test registry that closes any loops created via _make_handler. + + Tests that exercise the typed-handler dispatch use synthetic loops + they never run; without explicit close() each one leaks a + ResourceWarning on garbage collection. + """ + loops: list[asyncio.AbstractEventLoop] = [] + yield loops + for loop in loops: + try: + loop.close() + except Exception: + pass + + +def test_watcher_ignores_read_only_events(tmp_path, synthetic_loops): """Read-only `opened`/`closed_no_write` events from a sibling `cce index` must not enqueue work. This is the trigger half of the #66 leak — without this, hundreds of read events per `cce index --path X` cascade into the reindex worker and spawn embed pools. """ queued: list[str] = [] - handler = _make_handler(tmp_path, queued) + handler = _make_handler(tmp_path, queued, synthetic_loops) # Read-only events flow through the base FileSystemEventHandler stubs, # which are no-ops on our subclass. They must NOT result in anything @@ -182,9 +205,9 @@ def test_watcher_ignores_read_only_events(tmp_path): assert str(tmp_path / "new.py") in queued -def test_watcher_skips_directory_events(tmp_path): +def test_watcher_skips_directory_events(tmp_path, synthetic_loops): queued: list[str] = [] - handler = _make_handler(tmp_path, queued) + handler = _make_handler(tmp_path, queued, synthetic_loops) handler.on_modified(_FakeEvent(str(tmp_path / "subdir"), is_directory=True)) handler.on_created(_FakeEvent(str(tmp_path / "subdir2"), is_directory=True)) handler.on_deleted(_FakeEvent(str(tmp_path / "subdir3"), is_directory=True)) @@ -195,10 +218,10 @@ def test_watcher_skips_directory_events(tmp_path): assert queued == [] -def test_watcher_move_with_same_src_and_dest(tmp_path): +def test_watcher_move_with_same_src_and_dest(tmp_path, synthetic_loops): """Spurious move events sometimes report src == dest; queue only once.""" queued: list[str] = [] - handler = _make_handler(tmp_path, queued) + handler = _make_handler(tmp_path, queued, synthetic_loops) handler.on_moved(_FakeEvent( str(tmp_path / "x.py"), dest_path=str(tmp_path / "x.py"), ))