Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions mcp_cloud/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import CallToolResult, Tool, TextContent
from pydantic import BaseModel
from pydantic import BaseModel, field_validator

from mcp_cloud.dotenv_utils import load_planexe_dotenv
_dotenv_loaded, _dotenv_paths = load_planexe_dotenv(Path(__file__).parent)
Expand Down Expand Up @@ -155,32 +155,68 @@ class TaskCreateRequest(BaseModel):
speed_vs_detail: Optional[SpeedVsDetailInput] = None
user_api_key: Optional[str] = None


def normalize_task_id_input(task_id: str) -> tuple[str, bool]:
"""Normalize task_id input and classify whether it's a canonical UUID.

Returns (normalized_value, is_uuid). UUIDs are canonicalized to lowercase.
Non-UUID values are preserved as compatibility identifiers for legacy MCP rows.
"""
if not isinstance(task_id, str):
raise ValueError("task_id must be a string")
value = task_id.strip()
if not value:
raise ValueError("task_id is required")
try:
return str(uuid.UUID(value)), True
except ValueError:
return value, False


def _normalize_task_id_field(task_id: str) -> str:
normalized, _ = normalize_task_id_input(task_id)
return normalized


class TaskStatusRequest(BaseModel):
task_id: str

_normalize_task_id = field_validator("task_id")(_normalize_task_id_field)


class TaskStopRequest(BaseModel):
task_id: str

_normalize_task_id = field_validator("task_id")(_normalize_task_id_field)


class TaskFileInfoRequest(BaseModel):
task_id: str
artifact: Optional[str] = None

_normalize_task_id = field_validator("task_id")(_normalize_task_id_field)

# Helper functions
def find_task_by_task_id(task_id: str) -> Optional[TaskItem]:
"""Find TaskItem by MCP task_id (UUID), with legacy fallback."""
task = get_task_by_id(task_id)
"""Find TaskItem by MCP task_id with UUID-first lookup and legacy fallback."""
try:
normalized_task_id, is_uuid = normalize_task_id_input(task_id)
except ValueError:
return None

task = get_task_by_id(normalized_task_id) if is_uuid else None
if task is not None:
return task

def _query_legacy() -> Optional[TaskItem]:
query = db.session.query(TaskItem)
if db.engine.dialect.name == "postgresql":
tasks = query.filter(
cast(TaskItem.parameters, JSONB).contains({"_mcp_task_id": task_id})
cast(TaskItem.parameters, JSONB).contains({"_mcp_task_id": normalized_task_id})
).all()
else:
tasks = query.filter(
TaskItem.parameters.contains({"_mcp_task_id": task_id})
TaskItem.parameters.contains({"_mcp_task_id": normalized_task_id})
).all()
if tasks:
return tasks[0]
Expand All @@ -192,16 +228,19 @@ def _query_legacy() -> Optional[TaskItem]:
with app.app_context():
legacy_task = _query_legacy()
if legacy_task is not None:
logger.debug("Resolved legacy MCP task id %s to task %s", task_id, legacy_task.id)
logger.debug("Resolved legacy MCP task id %s to task %s", normalized_task_id, legacy_task.id)
return legacy_task

def get_task_by_id(task_id: str) -> Optional[TaskItem]:
"""Fetch a TaskItem by its UUID string."""
def _query() -> Optional[TaskItem]:
try:
task_uuid = uuid.UUID(task_id)
normalized_task_id, is_uuid = normalize_task_id_input(task_id)
except ValueError:
return None
if not is_uuid:
return None
task_uuid = uuid.UUID(normalized_task_id)
return db.session.get(TaskItem, task_uuid)

if has_app_context():
Expand Down
46 changes: 46 additions & 0 deletions mcp_cloud/tests/test_task_id_resolution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import unittest
import uuid
from unittest.mock import MagicMock, patch

from mcp_cloud.app import find_task_by_task_id, normalize_task_id_input


class TestTaskIdResolution(unittest.TestCase):
def test_normalize_task_id_input_canonicalizes_uuid(self):
raw = str(uuid.uuid4()).upper()
normalized, is_uuid = normalize_task_id_input(raw)
self.assertTrue(is_uuid)
self.assertEqual(normalized, raw.lower())

def test_normalize_task_id_input_preserves_legacy_identifier(self):
legacy = "PlanExe_19841231_195936"
normalized, is_uuid = normalize_task_id_input(legacy)
self.assertFalse(is_uuid)
self.assertEqual(normalized, legacy)

def test_find_task_by_task_id_uses_uuid_lookup_first(self):
task_id = str(uuid.uuid4())
found = object()
with patch("mcp_cloud.app.get_task_by_id", return_value=found) as mock_get:
result = find_task_by_task_id(task_id.upper())
self.assertIs(result, found)
mock_get.assert_called_once_with(task_id)

def test_find_task_by_task_id_falls_back_to_legacy_query(self):
legacy = "PlanExe_19841231_195936"
legacy_task = object()

mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = [legacy_task]

with patch("mcp_cloud.app.get_task_by_id") as mock_get, patch(
"mcp_cloud.app.has_app_context", return_value=True
), patch("mcp_cloud.app.db.session.query", return_value=mock_query):
result = find_task_by_task_id(legacy)

self.assertIs(result, legacy_task)
mock_get.assert_not_called()


if __name__ == "__main__":
unittest.main()
31 changes: 30 additions & 1 deletion mcp_local/planexe_mcp_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import os
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, Optional
Expand All @@ -21,7 +22,7 @@
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import CallToolResult, TextContent, Tool
from pydantic import BaseModel
from pydantic import BaseModel, field_validator

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
Expand All @@ -41,18 +42,46 @@ class TaskCreateRequest(BaseModel):
speed_vs_detail: Optional[SpeedVsDetailInput] = None


def normalize_task_id_input(task_id: str) -> tuple[str, bool]:
"""Normalize task_id input and classify whether it's a canonical UUID.

Returns (normalized_value, is_uuid). UUIDs are canonicalized to lowercase.
Non-UUID values are preserved as compatibility identifiers for legacy MCP rows.
"""
if not isinstance(task_id, str):
raise ValueError("task_id must be a string")
value = task_id.strip()
if not value:
raise ValueError("task_id is required")
try:
return str(uuid.UUID(value)), True
except ValueError:
return value, False


def _normalize_task_id_field(task_id: str) -> str:
normalized, _ = normalize_task_id_input(task_id)
return normalized


class TaskStatusRequest(BaseModel):
task_id: str

_normalize_task_id = field_validator("task_id")(_normalize_task_id_field)


class TaskStopRequest(BaseModel):
task_id: str

_normalize_task_id = field_validator("task_id")(_normalize_task_id_field)


class TaskDownloadRequest(BaseModel):
task_id: str
artifact: str = "report"

_normalize_task_id = field_validator("task_id")(_normalize_task_id_field)


def _get_env(name: str, default: Optional[str] = None) -> Optional[str]:
value = os.environ.get(name)
Expand Down
24 changes: 21 additions & 3 deletions open_dir_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import subprocess
import sys
import re
import uuid
from pathlib import Path
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
Expand Down Expand Up @@ -59,12 +60,29 @@ def _command_for_platform(target: Path) -> list[str]:
raise ValueError(f"Unsupported platform: {sys.platform}")


def _is_uuid_name(name: str) -> bool:
try:
parsed = uuid.UUID(name)
except ValueError:
return False
return str(parsed) == name


def _is_legacy_planexe_run_name(name: str) -> bool:
# Compatibility shim: legacy single-user runs used PlanExe_YYYYMMDD_HHMMSS.
return bool(re.fullmatch(r"^PlanExe_\d{8}_\d{6}$", name))


def _is_allowed_run_name(name: str) -> bool:
return _is_uuid_name(name) or _is_legacy_planexe_run_name(name)


@app.post("/open", response_model=OpenPathResponse)
def open_path(request: OpenPathRequest):
raw_path = request.path
# Only allow PlanExe-style run directory names to avoid arbitrary path access.
if not re.fullmatch(r"^PlanExe_\d+_\d+$", Path(raw_path).name):
raise HTTPException(status_code=400, detail="Invalid path format.")
# UUID-first, while retaining legacy support for existing timestamp-prefixed run dirs.
if not _is_allowed_run_name(Path(raw_path).name):
raise HTTPException(status_code=400, detail="Invalid path format. Expected UUID task id or legacy PlanExe_YYYYMMDD_HHMMSS name.")
target = Path(raw_path).expanduser().resolve()

if not _is_allowed(target):
Expand Down
2 changes: 1 addition & 1 deletion worker_plan/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ consumers.
- Preserve the public API contract in `worker_plan/app.py`:
- Keep request/response shapes and endpoint paths backward compatible.
- Avoid renaming response fields like `run_id`, `run_dir`, `display_run_dir`.
- Maintain the run directory conventions (`PlanExe_...`) and environment-driven
- Maintain UUID run directory conventions (with legacy compatibility where explicitly required) and environment-driven
paths (`PLANEXE_RUN_DIR`, `PLANEXE_HOST_RUN_DIR`, `PLANEXE_CONFIG_PATH`).
- When changing pipeline behavior, keep the subprocess invocation in
`start_pipeline_subprocess` consistent with `worker_plan_internal`.
Expand Down
2 changes: 1 addition & 1 deletion worker_plan/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ If you must stay on Python 3.14, expect source builds and potential failures; ex
| `PLANEXE_PURGE_ENABLED` | `false` | Enable the background scheduler that purges old run directories. |
| `PLANEXE_PURGE_MAX_AGE_HOURS` | `1` | Maximum age (hours) of runs to delete when purging (scheduler and manual default). |
| `PLANEXE_PURGE_INTERVAL_SECONDS` | `3600` | How often the purge scheduler runs when enabled. |
| `PLANEXE_PURGE_RUN_PREFIX` | `PlanExe_` | Only purge runs whose IDs start with this prefix. |
| `PLANEXE_PURGE_RUN_PREFIX` | *(empty)* | Optional compatibility filter. UUID-named runs are always considered; when set, legacy prefixed non-UUID run directories that match this prefix are also considered. |
| `PLANEXE_LOG_LEVEL` | `INFO` | Sets the console log level for the worker API and the pipeline process. Accepted values are the standard logging levels (e.g., `DEBUG`, `INFO`, `WARNING`, `ERROR`). |

`PLANEXE_LOG_LEVEL` affects both the FastAPI worker and the spawned pipeline logs written to stdout. File logs in `run/<id>/log.txt` always include DEBUG and above.
47 changes: 36 additions & 11 deletions worker_plan/worker_plan_internal/utils/purge_old_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _is_uuid_name(name: str) -> bool:


def _looks_like_plan_run_dir(dirname: str, path: str) -> bool:
"""A run directory must be UUID-named and contain required marker files."""
"""A UUID-style run directory with required marker files."""
if not _is_uuid_name(dirname):
return False
if not os.path.isdir(path):
Expand All @@ -35,9 +35,31 @@ def _looks_like_plan_run_dir(dirname: str, path: str) -> bool:
return has_start_time and has_plan


def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myrun_") -> None:
def _looks_like_legacy_prefixed_run_dir(dirname: str, path: str, prefix: str) -> bool:
"""Compatibility mode for legacy non-UUID run directory names when prefix filtering is explicitly used."""
if not prefix:
return False
if not dirname.startswith(prefix):
return False
if _is_uuid_name(dirname):
return False
if not os.path.isdir(path):
return False
try:
filenames = os.listdir(path)
except OSError:
return False
has_start_time = any(name.endswith(_START_TIME_SUFFIX) for name in filenames)
has_plan = any(name.endswith(_PLAN_SUFFIX) for name in filenames)
return has_start_time and has_plan


def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "") -> None:
"""
Deletes files and directories in the specified run_dir older than max_age_hours and matching the specified prefix.
Delete UUID-named run directories older than max_age_hours.

Compatibility shim: when prefix is explicitly provided, legacy prefixed non-UUID run
directory names are also considered purge candidates.
"""
if not os.path.isabs(run_dir):
raise ValueError(f"run_dir must be an absolute path: {run_dir}")
Expand All @@ -51,22 +73,25 @@ def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myru
cutoff = now - datetime.timedelta(hours=max_age_hours)

count_deleted = 0
count_skip_without_prefix = 0
count_skip_prefix_filter = 0
count_skip_recent = 0
count_skip_non_run_shape = 0
count_error = 0
for item in os.listdir(run_dir):
if not item.startswith(prefix):
count_skip_without_prefix += 1
continue # Skip files and directories that don't match the prefix
if prefix and not item.startswith(prefix) and not _is_uuid_name(item):
count_skip_prefix_filter += 1
continue

item_path = os.path.join(run_dir, item)
is_dir = os.path.isdir(item_path)
if not is_dir:
# Never delete files from run root. Users may place arbitrary files there.
count_skip_non_run_shape += 1
continue
if not _looks_like_plan_run_dir(item, item_path):

is_uuid_run = _looks_like_plan_run_dir(item, item_path)
is_legacy_prefixed_run = _looks_like_legacy_prefixed_run_dir(item, item_path, prefix)
if not (is_uuid_run or is_legacy_prefixed_run):
count_skip_non_run_shape += 1
continue

Expand All @@ -89,10 +114,10 @@ def purge_old_runs(run_dir: str, max_age_hours: float = 1.0, prefix: str = "myru
logger.error(f"Error processing {item} in {run_dir}: {e}")
count_error += 1
logger.info(
"Purge complete: %s deleted, %s skipped (recent), %s skipped (no prefix), %s skipped (not run artifacts), %s errors",
"Purge complete: %s deleted, %s skipped (recent), %s skipped (prefix filter), %s skipped (not run artifacts), %s errors",
count_deleted,
count_skip_recent,
count_skip_without_prefix,
count_skip_prefix_filter,
count_skip_non_run_shape,
count_error,
)
Expand All @@ -102,7 +127,7 @@ def start_purge_scheduler(
run_dir: str,
purge_interval_seconds: float = 3600,
max_age_hours: float = 1.0,
prefix: str = "myrun_",
prefix: str = "",
) -> None:
"""
Start the purge scheduler in a background thread.
Expand Down
Loading