diff --git a/mcp_cloud/app.py b/mcp_cloud/app.py index 2f64b878..24078152 100644 --- a/mcp_cloud/app.py +++ b/mcp_cloud/app.py @@ -28,7 +28,7 @@ from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import CallToolResult, Tool, TextContent -from pydantic import BaseModel +from pydantic import BaseModel, field_validator from mcp_cloud.dotenv_utils import load_planexe_dotenv _dotenv_loaded, _dotenv_paths = load_planexe_dotenv(Path(__file__).parent) @@ -155,20 +155,56 @@ class TaskCreateRequest(BaseModel): speed_vs_detail: Optional[SpeedVsDetailInput] = None user_api_key: Optional[str] = None + +def normalize_task_id_input(task_id: str) -> tuple[str, bool]: + """Normalize task_id input and classify whether it's a canonical UUID. + + Returns (normalized_value, is_uuid). UUIDs are canonicalized to lowercase. + Non-UUID values are preserved as compatibility identifiers for legacy MCP rows. + """ + if not isinstance(task_id, str): + raise ValueError("task_id must be a string") + value = task_id.strip() + if not value: + raise ValueError("task_id is required") + try: + return str(uuid.UUID(value)), True + except ValueError: + return value, False + + +def _normalize_task_id_field(task_id: str) -> str: + normalized, _ = normalize_task_id_input(task_id) + return normalized + + class TaskStatusRequest(BaseModel): task_id: str + _normalize_task_id = field_validator("task_id")(_normalize_task_id_field) + + class TaskStopRequest(BaseModel): task_id: str + _normalize_task_id = field_validator("task_id")(_normalize_task_id_field) + + class TaskFileInfoRequest(BaseModel): task_id: str artifact: Optional[str] = None + _normalize_task_id = field_validator("task_id")(_normalize_task_id_field) + # Helper functions def find_task_by_task_id(task_id: str) -> Optional[TaskItem]: - """Find TaskItem by MCP task_id (UUID), with legacy fallback.""" - task = get_task_by_id(task_id) + """Find TaskItem by MCP task_id with UUID-first lookup and legacy fallback.""" + try: + normalized_task_id, is_uuid = normalize_task_id_input(task_id) + except ValueError: + return None + + task = get_task_by_id(normalized_task_id) if is_uuid else None if task is not None: return task @@ -176,11 +212,11 @@ def _query_legacy() -> Optional[TaskItem]: query = db.session.query(TaskItem) if db.engine.dialect.name == "postgresql": tasks = query.filter( - cast(TaskItem.parameters, JSONB).contains({"_mcp_task_id": task_id}) + cast(TaskItem.parameters, JSONB).contains({"_mcp_task_id": normalized_task_id}) ).all() else: tasks = query.filter( - TaskItem.parameters.contains({"_mcp_task_id": task_id}) + TaskItem.parameters.contains({"_mcp_task_id": normalized_task_id}) ).all() if tasks: return tasks[0] @@ -192,16 +228,19 @@ def _query_legacy() -> Optional[TaskItem]: with app.app_context(): legacy_task = _query_legacy() if legacy_task is not None: - logger.debug("Resolved legacy MCP task id %s to task %s", task_id, legacy_task.id) + logger.debug("Resolved legacy MCP task id %s to task %s", normalized_task_id, legacy_task.id) return legacy_task def get_task_by_id(task_id: str) -> Optional[TaskItem]: """Fetch a TaskItem by its UUID string.""" def _query() -> Optional[TaskItem]: try: - task_uuid = uuid.UUID(task_id) + normalized_task_id, is_uuid = normalize_task_id_input(task_id) except ValueError: return None + if not is_uuid: + return None + task_uuid = uuid.UUID(normalized_task_id) return db.session.get(TaskItem, task_uuid) if has_app_context(): diff --git a/mcp_cloud/tests/test_task_id_resolution.py b/mcp_cloud/tests/test_task_id_resolution.py new file mode 100644 index 00000000..93b56309 --- /dev/null +++ b/mcp_cloud/tests/test_task_id_resolution.py @@ -0,0 +1,46 @@ +import unittest +import uuid +from unittest.mock import MagicMock, patch + +from mcp_cloud.app import find_task_by_task_id, normalize_task_id_input + + +class TestTaskIdResolution(unittest.TestCase): + def test_normalize_task_id_input_canonicalizes_uuid(self): + raw = str(uuid.uuid4()).upper() + normalized, is_uuid = normalize_task_id_input(raw) + self.assertTrue(is_uuid) + self.assertEqual(normalized, raw.lower()) + + def test_normalize_task_id_input_preserves_legacy_identifier(self): + legacy = "PlanExe_19841231_195936" + normalized, is_uuid = normalize_task_id_input(legacy) + self.assertFalse(is_uuid) + self.assertEqual(normalized, legacy) + + def test_find_task_by_task_id_uses_uuid_lookup_first(self): + task_id = str(uuid.uuid4()) + found = object() + with patch("mcp_cloud.app.get_task_by_id", return_value=found) as mock_get: + result = find_task_by_task_id(task_id.upper()) + self.assertIs(result, found) + mock_get.assert_called_once_with(task_id) + + def test_find_task_by_task_id_falls_back_to_legacy_query(self): + legacy = "PlanExe_19841231_195936" + legacy_task = object() + + mock_query = MagicMock() + mock_query.filter.return_value.all.return_value = [legacy_task] + + with patch("mcp_cloud.app.get_task_by_id") as mock_get, patch( + "mcp_cloud.app.has_app_context", return_value=True + ), patch("mcp_cloud.app.db.session.query", return_value=mock_query): + result = find_task_by_task_id(legacy) + + self.assertIs(result, legacy_task) + mock_get.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/mcp_local/planexe_mcp_local.py b/mcp_local/planexe_mcp_local.py index 07d9dce1..46cddf22 100644 --- a/mcp_local/planexe_mcp_local.py +++ b/mcp_local/planexe_mcp_local.py @@ -11,6 +11,7 @@ import logging import os import time +import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Literal, Optional @@ -21,7 +22,7 @@ from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import CallToolResult, TextContent, Tool -from pydantic import BaseModel +from pydantic import BaseModel, field_validator logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) @@ -41,18 +42,46 @@ class TaskCreateRequest(BaseModel): speed_vs_detail: Optional[SpeedVsDetailInput] = None +def normalize_task_id_input(task_id: str) -> tuple[str, bool]: + """Normalize task_id input and classify whether it's a canonical UUID. + + Returns (normalized_value, is_uuid). UUIDs are canonicalized to lowercase. + Non-UUID values are preserved as compatibility identifiers for legacy MCP rows. + """ + if not isinstance(task_id, str): + raise ValueError("task_id must be a string") + value = task_id.strip() + if not value: + raise ValueError("task_id is required") + try: + return str(uuid.UUID(value)), True + except ValueError: + return value, False + + +def _normalize_task_id_field(task_id: str) -> str: + normalized, _ = normalize_task_id_input(task_id) + return normalized + + class TaskStatusRequest(BaseModel): task_id: str + _normalize_task_id = field_validator("task_id")(_normalize_task_id_field) + class TaskStopRequest(BaseModel): task_id: str + _normalize_task_id = field_validator("task_id")(_normalize_task_id_field) + class TaskDownloadRequest(BaseModel): task_id: str artifact: str = "report" + _normalize_task_id = field_validator("task_id")(_normalize_task_id_field) + def _get_env(name: str, default: Optional[str] = None) -> Optional[str]: value = os.environ.get(name) diff --git a/open_dir_server/app.py b/open_dir_server/app.py index 779bf7b5..b555b20d 100644 --- a/open_dir_server/app.py +++ b/open_dir_server/app.py @@ -7,6 +7,7 @@ import subprocess import sys import re +import uuid from pathlib import Path from fastapi import FastAPI, HTTPException from pydantic import BaseModel @@ -59,12 +60,29 @@ def _command_for_platform(target: Path) -> list[str]: raise ValueError(f"Unsupported platform: {sys.platform}") +def _is_uuid_name(name: str) -> bool: + try: + parsed = uuid.UUID(name) + except ValueError: + return False + return str(parsed) == name + + +def _is_legacy_planexe_run_name(name: str) -> bool: + # Compatibility shim: legacy single-user runs used PlanExe_YYYYMMDD_HHMMSS. + return bool(re.fullmatch(r"^PlanExe_\d{8}_\d{6}$", name)) + + +def _is_allowed_run_name(name: str) -> bool: + return _is_uuid_name(name) or _is_legacy_planexe_run_name(name) + + @app.post("/open", response_model=OpenPathResponse) def open_path(request: OpenPathRequest): raw_path = request.path - # Only allow PlanExe-style run directory names to avoid arbitrary path access. - if not re.fullmatch(r"^PlanExe_\d+_\d+$", Path(raw_path).name): - raise HTTPException(status_code=400, detail="Invalid path format.") + # UUID-first, while retaining legacy support for existing timestamp-prefixed run dirs. + if not _is_allowed_run_name(Path(raw_path).name): + raise HTTPException(status_code=400, detail="Invalid path format. Expected UUID task id or legacy PlanExe_YYYYMMDD_HHMMSS name.") target = Path(raw_path).expanduser().resolve() if not _is_allowed(target): diff --git a/worker_plan/AGENTS.md b/worker_plan/AGENTS.md index a659eb4b..34077863 100644 --- a/worker_plan/AGENTS.md +++ b/worker_plan/AGENTS.md @@ -8,7 +8,7 @@ consumers. - Preserve the public API contract in `worker_plan/app.py`: - Keep request/response shapes and endpoint paths backward compatible. - Avoid renaming response fields like `run_id`, `run_dir`, `display_run_dir`. -- Maintain the run directory conventions (`PlanExe_...`) and environment-driven +- Maintain UUID run directory conventions (with legacy compatibility where explicitly required) and environment-driven paths (`PLANEXE_RUN_DIR`, `PLANEXE_HOST_RUN_DIR`, `PLANEXE_CONFIG_PATH`). - When changing pipeline behavior, keep the subprocess invocation in `start_pipeline_subprocess` consistent with `worker_plan_internal`. diff --git a/worker_plan/README.md b/worker_plan/README.md index 7e14b878..95c1f098 100644 --- a/worker_plan/README.md +++ b/worker_plan/README.md @@ -43,7 +43,7 @@ If you must stay on Python 3.14, expect source builds and potential failures; ex | `PLANEXE_PURGE_ENABLED` | `false` | Enable the background scheduler that purges old run directories. | | `PLANEXE_PURGE_MAX_AGE_HOURS` | `1` | Maximum age (hours) of runs to delete when purging (scheduler and manual default). | | `PLANEXE_PURGE_INTERVAL_SECONDS` | `3600` | How often the purge scheduler runs when enabled. | -| `PLANEXE_PURGE_RUN_PREFIX` | `PlanExe_` | Only purge runs whose IDs start with this prefix. | +| `PLANEXE_PURGE_RUN_PREFIX` | *(empty)* | Optional compatibility filter. UUID-named runs are always considered; when set, legacy prefixed non-UUID run directories that match this prefix are also considered. | | `PLANEXE_LOG_LEVEL` | `INFO` | Sets the console log level for the worker API and the pipeline process. Accepted values are the standard logging levels (e.g., `DEBUG`, `INFO`, `WARNING`, `ERROR`). | `PLANEXE_LOG_LEVEL` affects both the FastAPI worker and the spawned pipeline logs written to stdout. File logs in `run//log.txt` always include DEBUG and above. diff --git a/worker_plan/worker_plan_internal/utils/purge_old_runs.py b/worker_plan/worker_plan_internal/utils/purge_old_runs.py index 3c08400e..fccb1596 100644 --- a/worker_plan/worker_plan_internal/utils/purge_old_runs.py +++ b/worker_plan/worker_plan_internal/utils/purge_old_runs.py @@ -21,7 +21,7 @@ def _is_uuid_name(name: str) -> bool: def _looks_like_plan_run_dir(dirname: str, path: str) -> bool: - """A run directory must be UUID-named and contain required marker files.""" + """A UUID-style run directory with required marker files.""" if not _is_uuid_name(dirname): return False if not os.path.isdir(path): @@ -35,9 +35,31 @@ def _looks_like_plan_run_dir(dirname: str, path: str) -> bool: return has_start_time and has_plan -def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myrun_") -> None: +def _looks_like_legacy_prefixed_run_dir(dirname: str, path: str, prefix: str) -> bool: + """Compatibility mode for legacy non-UUID run directory names when prefix filtering is explicitly used.""" + if not prefix: + return False + if not dirname.startswith(prefix): + return False + if _is_uuid_name(dirname): + return False + if not os.path.isdir(path): + return False + try: + filenames = os.listdir(path) + except OSError: + return False + has_start_time = any(name.endswith(_START_TIME_SUFFIX) for name in filenames) + has_plan = any(name.endswith(_PLAN_SUFFIX) for name in filenames) + return has_start_time and has_plan + + +def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "") -> None: """ - Deletes files and directories in the specified run_dir older than max_age_hours and matching the specified prefix. + Delete UUID-named run directories older than max_age_hours. + + Compatibility shim: when prefix is explicitly provided, legacy prefixed non-UUID run + directory names are also considered purge candidates. """ if not os.path.isabs(run_dir): raise ValueError(f"run_dir must be an absolute path: {run_dir}") @@ -51,14 +73,14 @@ def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myru cutoff = now - datetime.timedelta(hours=max_age_hours) count_deleted = 0 - count_skip_without_prefix = 0 + count_skip_prefix_filter = 0 count_skip_recent = 0 count_skip_non_run_shape = 0 count_error = 0 for item in os.listdir(run_dir): - if not item.startswith(prefix): - count_skip_without_prefix += 1 - continue # Skip files and directories that don't match the prefix + if prefix and not item.startswith(prefix) and not _is_uuid_name(item): + count_skip_prefix_filter += 1 + continue item_path = os.path.join(run_dir, item) is_dir = os.path.isdir(item_path) @@ -66,7 +88,10 @@ def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myru # Never delete files from run root. Users may place arbitrary files there. count_skip_non_run_shape += 1 continue - if not _looks_like_plan_run_dir(item, item_path): + + is_uuid_run = _looks_like_plan_run_dir(item, item_path) + is_legacy_prefixed_run = _looks_like_legacy_prefixed_run_dir(item, item_path, prefix) + if not (is_uuid_run or is_legacy_prefixed_run): count_skip_non_run_shape += 1 continue @@ -89,10 +114,10 @@ def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myru logger.error(f"Error processing {item} in {run_dir}: {e}") count_error += 1 logger.info( - "Purge complete: %s deleted, %s skipped (recent), %s skipped (no prefix), %s skipped (not run artifacts), %s errors", + "Purge complete: %s deleted, %s skipped (recent), %s skipped (prefix filter), %s skipped (not run artifacts), %s errors", count_deleted, count_skip_recent, - count_skip_without_prefix, + count_skip_prefix_filter, count_skip_non_run_shape, count_error, ) @@ -102,7 +127,7 @@ def start_purge_scheduler( run_dir: str, purge_interval_seconds: float = 3600, max_age_hours: float = 1.0, - prefix: str = "myrun_", + prefix: str = "", ) -> None: """ Start the purge scheduler in a background thread. diff --git a/worker_plan/worker_plan_internal/utils/tests/test_purge_old_runs.py b/worker_plan/worker_plan_internal/utils/tests/test_purge_old_runs.py index 3c535221..42b0ab5f 100644 --- a/worker_plan/worker_plan_internal/utils/tests/test_purge_old_runs.py +++ b/worker_plan/worker_plan_internal/utils/tests/test_purge_old_runs.py @@ -4,7 +4,7 @@ import unittest import uuid -from worker_plan_internal.utils.purge_old_runs import purge_old_runs +from worker_plan.worker_plan_internal.utils.purge_old_runs import purge_old_runs class TestPurgeOldRuns(unittest.TestCase): @@ -24,7 +24,10 @@ def setUp(self): self._create_run_dir(self.uuid_recent_valid, hours_old=0.1, with_start=True, with_plan=True) self._create_run_dir(self.uuid_old_missing_start, hours_old=2.0, with_start=False, with_plan=True) self._create_run_dir(self.uuid_old_missing_plan, hours_old=2.0, with_start=True, with_plan=False) + self.legacy_old_prefixed = "PlanExe_19841231_195936" + self._create_run_dir("not-a-uuid", hours_old=2.0, with_start=True, with_plan=True) + self._create_run_dir(self.legacy_old_prefixed, hours_old=2.0, with_start=True, with_plan=True) self._create_file(self.uuid_old_zip, hours_old=2.0) self._create_file("not-a-uuid.zip", hours_old=2.0) self._create_file("random.txt", hours_old=2.0) @@ -62,15 +65,18 @@ def test_purge_uuid_run_dirs_with_required_files_only(self): self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, self.uuid_old_missing_start))) self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, self.uuid_old_missing_plan))) self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, "not-a-uuid"))) + self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, self.legacy_old_prefixed))) self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, self.uuid_old_zip))) self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, "not-a-uuid.zip"))) self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, "random.txt"))) - def test_purge_respects_prefix_filter(self): - prefixed_uuid = "keepme-" + str(uuid.uuid4()) - self._create_run_dir(prefixed_uuid, hours_old=2.0, with_start=True, with_plan=True) - purge_old_runs(self.test_run_dir, max_age_hours=1.0, prefix="keepme-") - self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, prefixed_uuid))) + def test_purge_can_target_legacy_prefixed_runs_when_prefix_is_explicit(self): + purge_old_runs(self.test_run_dir, max_age_hours=1.0, prefix="PlanExe_") + + self.assertFalse(os.path.exists(os.path.join(self.test_run_dir, self.legacy_old_prefixed))) + self.assertFalse(os.path.exists(os.path.join(self.test_run_dir, self.uuid_old_valid))) + self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, self.uuid_recent_valid))) + self.assertTrue(os.path.exists(os.path.join(self.test_run_dir, "not-a-uuid"))) if __name__ == "__main__":