diff --git a/mcp_server.py b/mcp_server.py index a0c2429..f21eb4a 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -76,6 +76,59 @@ def _validate_source_type(source_type: str | None) -> dict | None: return None +def _pid_state(pid: int) -> str | None: + """Return Linux process state code (e.g. 'R', 'S', 'Z') when available.""" + try: + with open(f"/proc/{pid}/status") as f: + for line in f: + if line.startswith("State:"): + parts = line.split() + if len(parts) >= 2: + return parts[1] + break + except OSError: + return None + return None + + +def _resolve_indexer_pid(pid_file: Path) -> tuple[bool, int | None]: + """Return whether the pid file points to a live, non-zombie process. + + Stale, corrupt, or zombie pid files are cleaned up so callers can safely + start a fresh indexer run. + """ + if not pid_file.exists(): + return False, None + + try: + pid = int(pid_file.read_text().strip()) + except (OSError, ValueError): + pid_file.unlink(missing_ok=True) + return False, None + + try: + os.kill(pid, 0) + except OSError: + pid_file.unlink(missing_ok=True) + return False, None + + # Crash-aborted children can remain as zombies; os.kill(pid, 0) still + # succeeds for them, so treat zombie state as dead and clean up the pid. + try: + waited_pid, _ = os.waitpid(pid, os.WNOHANG) + if waited_pid == pid: + pid_file.unlink(missing_ok=True) + return False, None + except ChildProcessError: + pass + + if _pid_state(pid) == "Z": + pid_file.unlink(missing_ok=True) + return False, None + + return True, pid + + def _hit_to_dict(h: SearchHit, include_text: bool = False) -> dict: """Convert a SearchHit to a response dict.""" d: dict = { @@ -652,15 +705,7 @@ def _file_status_impl() -> dict: # Check if an indexer subprocess is currently running index_root = Path(config["index_root"]) pid_file = index_root / "indexer.pid" - indexer_running = False - indexer_pid = None - if pid_file.exists(): - try: - indexer_pid = int(pid_file.read_text().strip()) - os.kill(indexer_pid, 0) # Signal 0 = existence check - indexer_running = True - except (OSError, ValueError): - indexer_pid = None + indexer_running, indexer_pid = _resolve_indexer_pid(pid_file) result: dict = { "doc_count": len(doc_ids), @@ -788,19 +833,14 @@ def _file_index_update_impl(config_path: str = "config.yaml") -> dict: pid_file = index_root / "indexer.pid" # Check if an indexer is already running - if pid_file.exists(): - try: - old_pid = int(pid_file.read_text().strip()) - os.kill(old_pid, 0) # Signal 0 = existence check; raises OSError if dead - logger.info("file_index_update: indexer already running (pid %d)", old_pid) - return { - "status": "already_running", - "pid": old_pid, - "message": "An indexer is already running. Use file_status to check progress.", - } - except (OSError, ValueError): - # Process is dead or PID file is corrupt — clean up and proceed - pid_file.unlink(missing_ok=True) + indexer_running, old_pid = _resolve_indexer_pid(pid_file) + if indexer_running and old_pid is not None: + logger.info("file_index_update: indexer already running (pid %d)", old_pid) + return { + "status": "already_running", + "pid": old_pid, + "message": "An indexer is already running. Use file_status to check progress.", + } # Ensure index_root exists so the subprocess can write its PID file index_root.mkdir(parents=True, exist_ok=True) diff --git a/tests/test_index_nonblocking.py b/tests/test_index_nonblocking.py index 9e43778..5b97e61 100644 --- a/tests/test_index_nonblocking.py +++ b/tests/test_index_nonblocking.py @@ -4,12 +4,43 @@ can continue serving search/status requests while indexing runs. """ +from pathlib import Path import os import time +from unittest.mock import patch import pytest +def _spawn_zombie_child() -> int: + """Create a zombie child for the current process and return its PID.""" + pid = os.fork() + if pid == 0: + os._exit(0) + + deadline = time.time() + 5 + status_path = Path(f"/proc/{pid}/status") + while time.time() < deadline: + try: + status = status_path.read_text() + except FileNotFoundError: + break + if "State:\tZ" in status: + return pid + time.sleep(0.01) + + os.waitpid(pid, 0) + raise AssertionError("Failed to create zombie child for test") + + +def _patch_index_config(monkeypatch, mcp_server, tmp_path): + monkeypatch.setattr( + mcp_server, + "load_config", + lambda _path: {"index_root": str(tmp_path)}, + ) + + def test_index_update_returns_started(tmp_path, monkeypatch): """_file_index_update_impl must return immediately with status='started'. @@ -20,8 +51,13 @@ def test_index_update_returns_started(tmp_path, monkeypatch): import mcp_server monkeypatch.setenv("INDEX_ROOT", str(tmp_path)) + _patch_index_config(monkeypatch, mcp_server, tmp_path) + + class DummyProc: + pid = 424242 - result = mcp_server._file_index_update_impl("config.yaml") + with patch("subprocess.Popen", return_value=DummyProc()): + result = mcp_server._file_index_update_impl("config.yaml") assert result.get("status") == "started", ( f"Expected status='started' (non-blocking), got {result.get('status')!r}. " @@ -30,16 +66,7 @@ def test_index_update_returns_started(tmp_path, monkeypatch): assert "pid" in result, "Must return the subprocess PID for tracking" assert isinstance(result["pid"], int), "PID must be an integer" assert result["pid"] > 0, "PID must be positive" - - # Clean up: wait briefly for the subprocess to exit (it will fail fast - # in the test environment since config.yaml doesn't exist — that's fine) - import subprocess - try: - proc = subprocess.Popen.__new__(subprocess.Popen) - # Just check via os.waitpid with WNOHANG; ignore errors - pass - except Exception: - pass + assert result["pid"] == 424242 def test_index_update_rejects_concurrent_runs(tmp_path, monkeypatch): @@ -50,6 +77,7 @@ def test_index_update_rejects_concurrent_runs(tmp_path, monkeypatch): import mcp_server monkeypatch.setenv("INDEX_ROOT", str(tmp_path)) + _patch_index_config(monkeypatch, mcp_server, tmp_path) # Launch a long-lived dummy subprocess and write its PID file manually, # simulating an in-progress indexer run. @@ -78,6 +106,7 @@ def test_index_update_clears_stale_pid_file(tmp_path, monkeypatch): import mcp_server monkeypatch.setenv("INDEX_ROOT", str(tmp_path)) + _patch_index_config(monkeypatch, mcp_server, tmp_path) # Write a PID that cannot possibly be alive (PID 1 is init/systemd, which # we can't signal to test "dead", so use a PID that we know exited by @@ -90,13 +119,47 @@ def test_index_update_clears_stale_pid_file(tmp_path, monkeypatch): pid_file = tmp_path / "indexer.pid" pid_file.write_text(str(dead_pid)) - result = mcp_server._file_index_update_impl("config.yaml") + class DummyProc: + pid = 424243 + + with patch("subprocess.Popen", return_value=DummyProc()): + result = mcp_server._file_index_update_impl("config.yaml") # Should have cleaned up the stale PID and started a new indexer assert result.get("status") == "started", ( f"Expected 'started' after clearing stale PID file, got {result!r}" ) - assert "pid" in result + assert result.get("pid") == 424243 + + +def test_index_update_clears_zombie_pid_file(tmp_path, monkeypatch): + """Zombie PID file must not block a fresh indexer start.""" + import mcp_server + + monkeypatch.setenv("INDEX_ROOT", str(tmp_path)) + _patch_index_config(monkeypatch, mcp_server, tmp_path) + + zombie_pid = _spawn_zombie_child() + pid_file = tmp_path / "indexer.pid" + pid_file.write_text(str(zombie_pid)) + + class DummyProc: + pid = 424242 + + try: + with patch("subprocess.Popen", return_value=DummyProc()): + result = mcp_server._file_index_update_impl("config_test.yaml") + + assert result.get("status") == "started", ( + f"Expected zombie PID file to be cleared, got {result!r}" + ) + assert result.get("pid") == 424242 + assert not pid_file.exists(), "Zombie PID file should be removed before restart" + finally: + try: + os.waitpid(zombie_pid, os.WNOHANG) + except ChildProcessError: + pass def test_pid_file_is_cleaned_up_by_subprocess(tmp_path, monkeypatch): diff --git a/tests/test_mcp_contract.py b/tests/test_mcp_contract.py index ac32d8f..c9a4356 100644 --- a/tests/test_mcp_contract.py +++ b/tests/test_mcp_contract.py @@ -2,6 +2,8 @@ No external services needed. Uses mocks and direct function calls.""" +import os +import time from pathlib import Path from unittest.mock import MagicMock, patch @@ -944,3 +946,57 @@ def test_file_status_health_reranker_disabled(): assert health["last_index_failed_count"] == 0 finally: mcp_server._cache = old_cache + + +def test_file_status_ignores_zombie_indexer_pid(tmp_path): + """Zombie indexer PID should be treated as not running and cleaned up.""" + zombie_pid = os.fork() + if zombie_pid == 0: + os._exit(0) + + pid_file = tmp_path / "indexer.pid" + pid_file.write_text(str(zombie_pid)) + + deadline = time.time() + 5 + status_path = Path(f"/proc/{zombie_pid}/status") + while time.time() < deadline: + try: + status = status_path.read_text() + except FileNotFoundError: + break + if "State:\tZ" in status: + break + time.sleep(0.01) + else: + os.waitpid(zombie_pid, 0) + raise AssertionError("Failed to create zombie child for test") + + old_cache = mcp_server._cache + try: + mock_store = MagicMock() + mock_store.list_doc_ids.return_value = ["a.md"] + mock_store.count_chunks.return_value = 1 + mock_store._metadata_subfields.return_value = {"doc_id"} + mock_store.fts_available.return_value = True + + mcp_server._cache = ( + mock_store, + MagicMock(), + { + "index_root": str(tmp_path), + "embeddings": {"provider": "openrouter"}, + "search": {"reranker": {"enabled": False}}, + }, + ) + + result = mcp_server._file_status_impl() + + assert result["indexer_running"] is False + assert "indexer_pid" not in result + assert not pid_file.exists(), "Zombie PID file should be removed during status check" + finally: + mcp_server._cache = old_cache + try: + os.waitpid(zombie_pid, os.WNOHANG) + except ChildProcessError: + pass