Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,59 @@ def _validate_source_type(source_type: str | None) -> dict | None:
return None


def _pid_state(pid: int) -> str | None:
"""Return Linux process state code (e.g. 'R', 'S', 'Z') when available."""
try:
with open(f"/proc/{pid}/status") as f:
for line in f:
if line.startswith("State:"):
parts = line.split()
if len(parts) >= 2:
return parts[1]
break
except OSError:
return None
return None


def _resolve_indexer_pid(pid_file: Path) -> tuple[bool, int | None]:
"""Return whether the pid file points to a live, non-zombie process.

Stale, corrupt, or zombie pid files are cleaned up so callers can safely
start a fresh indexer run.
"""
if not pid_file.exists():
return False, None

try:
pid = int(pid_file.read_text().strip())
except (OSError, ValueError):
pid_file.unlink(missing_ok=True)
return False, None

try:
os.kill(pid, 0)
except OSError:
pid_file.unlink(missing_ok=True)
return False, None

# Crash-aborted children can remain as zombies; os.kill(pid, 0) still
# succeeds for them, so treat zombie state as dead and clean up the pid.
try:
waited_pid, _ = os.waitpid(pid, os.WNOHANG)
if waited_pid == pid:
pid_file.unlink(missing_ok=True)
return False, None
except ChildProcessError:
pass

if _pid_state(pid) == "Z":
pid_file.unlink(missing_ok=True)
return False, None

return True, pid


def _hit_to_dict(h: SearchHit, include_text: bool = False) -> dict:
"""Convert a SearchHit to a response dict."""
d: dict = {
Expand Down Expand Up @@ -652,15 +705,7 @@ def _file_status_impl() -> dict:
# Check if an indexer subprocess is currently running
index_root = Path(config["index_root"])
pid_file = index_root / "indexer.pid"
indexer_running = False
indexer_pid = None
if pid_file.exists():
try:
indexer_pid = int(pid_file.read_text().strip())
os.kill(indexer_pid, 0) # Signal 0 = existence check
indexer_running = True
except (OSError, ValueError):
indexer_pid = None
indexer_running, indexer_pid = _resolve_indexer_pid(pid_file)

result: dict = {
"doc_count": len(doc_ids),
Expand Down Expand Up @@ -788,19 +833,14 @@ def _file_index_update_impl(config_path: str = "config.yaml") -> dict:
pid_file = index_root / "indexer.pid"

# Check if an indexer is already running
if pid_file.exists():
try:
old_pid = int(pid_file.read_text().strip())
os.kill(old_pid, 0) # Signal 0 = existence check; raises OSError if dead
logger.info("file_index_update: indexer already running (pid %d)", old_pid)
return {
"status": "already_running",
"pid": old_pid,
"message": "An indexer is already running. Use file_status to check progress.",
}
except (OSError, ValueError):
# Process is dead or PID file is corrupt — clean up and proceed
pid_file.unlink(missing_ok=True)
indexer_running, old_pid = _resolve_indexer_pid(pid_file)
if indexer_running and old_pid is not None:
logger.info("file_index_update: indexer already running (pid %d)", old_pid)
return {
"status": "already_running",
"pid": old_pid,
"message": "An indexer is already running. Use file_status to check progress.",
}

# Ensure index_root exists so the subprocess can write its PID file
index_root.mkdir(parents=True, exist_ok=True)
Expand Down
89 changes: 76 additions & 13 deletions tests/test_index_nonblocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,43 @@
can continue serving search/status requests while indexing runs.
"""

from pathlib import Path
import os
import time
from unittest.mock import patch

import pytest


def _spawn_zombie_child() -> int:
"""Create a zombie child for the current process and return its PID."""
pid = os.fork()
if pid == 0:
os._exit(0)

deadline = time.time() + 5
status_path = Path(f"/proc/{pid}/status")
while time.time() < deadline:
try:
status = status_path.read_text()
except FileNotFoundError:
break
if "State:\tZ" in status:
return pid
time.sleep(0.01)

os.waitpid(pid, 0)
raise AssertionError("Failed to create zombie child for test")


def _patch_index_config(monkeypatch, mcp_server, tmp_path):
monkeypatch.setattr(
mcp_server,
"load_config",
lambda _path: {"index_root": str(tmp_path)},
)


def test_index_update_returns_started(tmp_path, monkeypatch):
"""_file_index_update_impl must return immediately with status='started'.

Expand All @@ -20,8 +51,13 @@ def test_index_update_returns_started(tmp_path, monkeypatch):
import mcp_server

monkeypatch.setenv("INDEX_ROOT", str(tmp_path))
_patch_index_config(monkeypatch, mcp_server, tmp_path)

class DummyProc:
pid = 424242

result = mcp_server._file_index_update_impl("config.yaml")
with patch("subprocess.Popen", return_value=DummyProc()):
result = mcp_server._file_index_update_impl("config.yaml")

assert result.get("status") == "started", (
f"Expected status='started' (non-blocking), got {result.get('status')!r}. "
Expand All @@ -30,16 +66,7 @@ def test_index_update_returns_started(tmp_path, monkeypatch):
assert "pid" in result, "Must return the subprocess PID for tracking"
assert isinstance(result["pid"], int), "PID must be an integer"
assert result["pid"] > 0, "PID must be positive"

# Clean up: wait briefly for the subprocess to exit (it will fail fast
# in the test environment since config.yaml doesn't exist — that's fine)
import subprocess
try:
proc = subprocess.Popen.__new__(subprocess.Popen)
# Just check via os.waitpid with WNOHANG; ignore errors
pass
except Exception:
pass
assert result["pid"] == 424242


def test_index_update_rejects_concurrent_runs(tmp_path, monkeypatch):
Expand All @@ -50,6 +77,7 @@ def test_index_update_rejects_concurrent_runs(tmp_path, monkeypatch):
import mcp_server

monkeypatch.setenv("INDEX_ROOT", str(tmp_path))
_patch_index_config(monkeypatch, mcp_server, tmp_path)

# Launch a long-lived dummy subprocess and write its PID file manually,
# simulating an in-progress indexer run.
Expand Down Expand Up @@ -78,6 +106,7 @@ def test_index_update_clears_stale_pid_file(tmp_path, monkeypatch):
import mcp_server

monkeypatch.setenv("INDEX_ROOT", str(tmp_path))
_patch_index_config(monkeypatch, mcp_server, tmp_path)

# Write a PID that cannot possibly be alive (PID 1 is init/systemd, which
# we can't signal to test "dead", so use a PID that we know exited by
Expand All @@ -90,13 +119,47 @@ def test_index_update_clears_stale_pid_file(tmp_path, monkeypatch):
pid_file = tmp_path / "indexer.pid"
pid_file.write_text(str(dead_pid))

result = mcp_server._file_index_update_impl("config.yaml")
class DummyProc:
pid = 424243

with patch("subprocess.Popen", return_value=DummyProc()):
result = mcp_server._file_index_update_impl("config.yaml")

# Should have cleaned up the stale PID and started a new indexer
assert result.get("status") == "started", (
f"Expected 'started' after clearing stale PID file, got {result!r}"
)
assert "pid" in result
assert result.get("pid") == 424243


def test_index_update_clears_zombie_pid_file(tmp_path, monkeypatch):
"""Zombie PID file must not block a fresh indexer start."""
import mcp_server

monkeypatch.setenv("INDEX_ROOT", str(tmp_path))
_patch_index_config(monkeypatch, mcp_server, tmp_path)

zombie_pid = _spawn_zombie_child()
pid_file = tmp_path / "indexer.pid"
pid_file.write_text(str(zombie_pid))

class DummyProc:
pid = 424242

try:
with patch("subprocess.Popen", return_value=DummyProc()):
result = mcp_server._file_index_update_impl("config_test.yaml")

assert result.get("status") == "started", (
f"Expected zombie PID file to be cleared, got {result!r}"
)
assert result.get("pid") == 424242
assert not pid_file.exists(), "Zombie PID file should be removed before restart"
finally:
try:
os.waitpid(zombie_pid, os.WNOHANG)
except ChildProcessError:
pass


def test_pid_file_is_cleaned_up_by_subprocess(tmp_path, monkeypatch):
Expand Down
56 changes: 56 additions & 0 deletions tests/test_mcp_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

No external services needed. Uses mocks and direct function calls."""

import os
import time
from pathlib import Path
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -944,3 +946,57 @@ def test_file_status_health_reranker_disabled():
assert health["last_index_failed_count"] == 0
finally:
mcp_server._cache = old_cache


def test_file_status_ignores_zombie_indexer_pid(tmp_path):
"""Zombie indexer PID should be treated as not running and cleaned up."""
zombie_pid = os.fork()
if zombie_pid == 0:
os._exit(0)

pid_file = tmp_path / "indexer.pid"
pid_file.write_text(str(zombie_pid))

deadline = time.time() + 5
status_path = Path(f"/proc/{zombie_pid}/status")
while time.time() < deadline:
try:
status = status_path.read_text()
except FileNotFoundError:
break
if "State:\tZ" in status:
break
time.sleep(0.01)
else:
os.waitpid(zombie_pid, 0)
raise AssertionError("Failed to create zombie child for test")

old_cache = mcp_server._cache
try:
mock_store = MagicMock()
mock_store.list_doc_ids.return_value = ["a.md"]
mock_store.count_chunks.return_value = 1
mock_store._metadata_subfields.return_value = {"doc_id"}
mock_store.fts_available.return_value = True

mcp_server._cache = (
mock_store,
MagicMock(),
{
"index_root": str(tmp_path),
"embeddings": {"provider": "openrouter"},
"search": {"reranker": {"enabled": False}},
},
)

result = mcp_server._file_status_impl()

assert result["indexer_running"] is False
assert "indexer_pid" not in result
assert not pid_file.exists(), "Zombie PID file should be removed during status check"
finally:
mcp_server._cache = old_cache
try:
os.waitpid(zombie_pid, os.WNOHANG)
except ChildProcessError:
pass