From afff28953b33d7339c88fb5fbe8407cfa3193be1 Mon Sep 17 00:00:00 2001 From: Damian Barabonkov Date: Fri, 17 Apr 2026 14:17:33 -0400 Subject: [PATCH 1/7] feat: Enhance image listing with detailed status and types --- .../prime/src/prime_cli/commands/images.py | 453 ++++++++++--- packages/prime/tests/test_images_list.py | 623 ++++++++++++++++++ 2 files changed, 998 insertions(+), 78 deletions(-) create mode 100644 packages/prime/tests/test_images_list.py diff --git a/packages/prime/src/prime_cli/commands/images.py b/packages/prime/src/prime_cli/commands/images.py index d4b698f07..1b1055c23 100644 --- a/packages/prime/src/prime_cli/commands/images.py +++ b/packages/prime/src/prime_cli/commands/images.py @@ -3,9 +3,10 @@ import json import tarfile import tempfile -from datetime import datetime +from dataclasses import dataclass +from datetime import datetime, timezone from pathlib import Path -from typing import Optional +from typing import Any, Iterable, Optional import click import httpx @@ -28,6 +29,322 @@ "artifactType, ownerType, sizeBytes?, createdAt, pushedAt?}", ) +# --------------------------------------------------------------------------- +# Helpers for rendering `prime images list` +# --------------------------------------------------------------------------- + +# Raw artifact row as returned by ``GET /v1/images``. The backend schema is +# documented in ``LIST_IMAGES_JSON_HELP`` above; we keep the dict shape loose +# here because the server may add new optional fields over time. +ImageRow = dict[str, Any] + + +@dataclass +class ArtifactPartition: + """Per-artifact-type view of a grouped image. + + All three fields may be ``None``. ``failed_only`` is populated exclusively + when there is no completed artifact for that type — i.e. stale failures + that still belong in the Status column. + """ + + completed: Optional[ImageRow] = None + active: Optional[ImageRow] = None + failed_only: Optional[ImageRow] = None + + def is_empty(self) -> bool: + """True when the bucket has no completed, active, or failed row.""" + return self.completed is None and self.active is None and self.failed_only is None + + +# Mapping of artifact type (e.g. ``CONTAINER_IMAGE``) to its partition bucket. +PartitionMap = dict[str, ArtifactPartition] + +_ACTIVE_STATUSES: set[str] = {"PENDING", "UPLOADING", "BUILDING"} +_FAILED_STATUSES: set[str] = {"FAILED", "CANCELLED"} + + +def _parse_ts(value: Any) -> Optional[datetime]: + """Parse an ISO8601 timestamp (possibly ``Z`` suffixed) as a tz-aware UTC datetime. + + Naive timestamps (the backend emits ``createdAt`` as naive UTC, e.g. from + ``datetime.utcnow()``) are treated as UTC so comparisons across the dataset + are consistent. Returns ``None`` on failure. + """ + if not value: + return None + try: + dt = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + except Exception: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def _latest(rows: list[ImageRow], *keys: str) -> Optional[ImageRow]: + """Return the row whose first non-null parsed timestamp (across ``keys``) is newest.""" + best: Optional[ImageRow] = None + best_ts: Optional[datetime] = None + for row in rows: + ts: Optional[datetime] = None + for k in keys: + ts = _parse_ts(row.get(k)) + if ts is not None: + break + if ts is None: + continue + if best_ts is None or ts > best_ts: + best = row + best_ts = ts + if best is None and rows: + best = rows[0] + return best + + +def _partition_group(artifacts: list[ImageRow]) -> PartitionMap: + """Partition a group of artifact rows (all for the same ``imageName:imageTag``) by + artifact type into a ``completed`` / ``active`` / ``failed_only`` bucket. + + ``failed_only`` is populated only when no completed artifact exists for that type; + once a user has a working image, stale failed rows are intentionally suppressed so + they don't pollute the Status column. + """ + by_type: dict[str, list[ImageRow]] = {} + for art in artifacts: + t = art.get("artifactType", "CONTAINER_IMAGE") + by_type.setdefault(t, []).append(art) + + result: PartitionMap = {} + for art_type, rows in by_type.items(): + completed = [r for r in rows if r.get("status") == "COMPLETED"] + active = [r for r in rows if r.get("status") in _ACTIVE_STATUSES] + failed = [r for r in rows if r.get("status") in _FAILED_STATUSES] + + result[art_type] = ArtifactPartition( + completed=( + _latest(completed, "pushedAt", "completedAt", "createdAt") if completed else None + ), + active=_latest(active, "createdAt") if active else None, + failed_only=( + _latest(failed, "completedAt", "createdAt") if failed and not completed else None + ), + ) + return result + + +_TYPE_LABELS: tuple[tuple[str, str], ...] = ( + ("CONTAINER_IMAGE", "[cyan]Container[/cyan]"), + ("VM_SANDBOX", "[magenta]VM[/magenta]"), +) + + +def _ordered_present_types(partition: PartitionMap) -> list[tuple[str, str]]: + """Return artifact types present in ``partition`` in display order. + + Container first, then VM, then any future types in sorted order. Types + whose per-artifact partition bucket is completely empty (no completed, + no active, no failed_only) are skipped so we don't render dead slots. + """ + ordered: list[tuple[str, str]] = [] + for art_type, label in _TYPE_LABELS: + part = partition.get(art_type) + if part is not None and not part.is_empty(): + ordered.append((art_type, label)) + for art_type in sorted(partition): + if art_type in {"CONTAINER_IMAGE", "VM_SANDBOX"}: + continue + if not partition[art_type].is_empty(): + ordered.append((art_type, f"[white]{art_type.replace('_', ' ').title()}[/white]")) + return ordered + + +def _render_type_column(partition: PartitionMap) -> str: + """Build the Type cell: ``Container / VM`` with color, only for types present.""" + parts = [label for _art_type, label in _ordered_present_types(partition)] + return " / ".join(parts) if parts else "[dim]—[/dim]" + + +def _render_status_slot(part: Optional[ArtifactPartition]) -> str: + """Return the status word for a single artifact slot (no label prefix). + + When a completed artifact coexists with an active build row, the slot is + rendered as ``(rebuilding)`` to communicate that an update is in flight + while the previous build is still the usable version. + """ + if part is None: + return "[dim]—[/dim]" + + if part.completed and part.active: + return "[yellow italic](rebuilding)[/yellow italic]" + if part.completed: + return "[green]Ready[/green]" + if part.active: + status = part.active.get("status", "UNKNOWN") + if status == "BUILDING": + return "[yellow]Building[/yellow]" + if status == "UPLOADING": + return "[yellow]Uploading[/yellow]" + if status == "PENDING": + return "[blue]Pending[/blue]" + return f"[dim]{status.title()}[/dim]" + if part.failed_only: + status = part.failed_only.get("status", "FAILED") + if status == "CANCELLED": + return "[dim]Cancelled[/dim]" + return "[red]Failed[/red]" + return "[dim]—[/dim]" + + +def _render_status_column(partition: PartitionMap) -> str: + """Build the Status cell as positional slots aligned with the Type column. + + Example: if Type is ``Container / VM``, Status for ``rehl:latest`` with a + container that's Ready and a VM that's Failed becomes ``Ready / Failed``. + When an artifact has an active rebuild on top of a completed image, its + slot is ``(rebuilding)``. + """ + ordered = _ordered_present_types(partition) + if not ordered: + return "[dim]—[/dim]" + return " / ".join(_render_status_slot(partition.get(art_type)) for art_type, _ in ordered) + + +def _render_image_reference(img: ImageRow, *, is_team_listing: bool) -> str: + """Render the user-facing image reference. + + The owner prefix (``{userId}/`` or ``team-{teamId}/``) is **always** + preserved so the full string is a valid reference that can be pasted + directly into downstream commands (e.g. ``prime sandbox create``), + which require the fully-qualified ``/:`` form. + + Visual truncation (if any) is applied separately by + :func:`_truncate_ref_left` so that when the terminal is too narrow the + *owner prefix* is what gets clipped, not the image ``name:tag``. + """ + del is_team_listing + return ( + img.get("displayRef") + or img.get("fullImagePath") + or f"{img.get('imageName', 'unknown')}:{img.get('imageTag', 'latest')}" + ) + + +def _truncate_ref_left(ref: str, max_width: Optional[int]) -> str: + """Left-ellipsize ``ref`` so the tail (``name:tag``) stays visible. + + If ``ref`` fits within ``max_width`` it's returned unchanged. Otherwise, + the head of the string (the owner prefix) is dropped and replaced with a + single ``…`` character. This matches the requested display style: + + cmkrcib4x00004kjyxq48…/nvidia-basic-dev:latest → …/nvidia-basic-dev:latest + + ``max_width`` must be at least 2; smaller values are clamped. + """ + if max_width is None or max_width <= 0: + return ref + if len(ref) <= max_width: + return ref + max_width = max(max_width, 2) + return "…" + ref[-(max_width - 1) :] + + +def _image_ref_column_width(console_width: int, is_team_listing: bool) -> int: + """Compute a budget for the Image Reference column based on terminal width. + + The remaining columns have roughly fixed widths (worst-case labels): + + Type ~14 chars ("Container / VM") + Status ~26 chars ("(rebuilding) / (rebuilding)") + Size ~10 chars + Created ~17 chars ("YYYY-MM-DD HH:MM ") + Owner ~10 chars (team listings only) + borders + padding ~ 3 chars per column + + We subtract these from the terminal width and clamp to a reasonable range + so the column is neither absurdly wide on large terminals nor unusable on + tiny ones. + """ + reserved = 14 + 26 + 10 + 17 + (10 if is_team_listing else 0) + num_cols = 5 + (1 if is_team_listing else 0) + reserved += 3 * num_cols + budget = console_width - reserved + return max(30, min(80, budget)) + + +def _completed_size_mb(artifacts: Iterable[ImageRow]) -> str: + """Sum sizes of COMPLETED artifacts only and format as a MB string.""" + total = sum((a.get("sizeBytes") or 0) for a in artifacts if a.get("status") == "COMPLETED") + if total <= 0: + return "[dim]—[/dim]" + return f"{total / 1024 / 1024:.1f} MB" + + +def _display_created(partition: PartitionMap, artifacts: list[ImageRow]) -> str: + """Pick the most meaningful timestamp for the Created column. + + Preference order: + 1. Newest ``pushedAt`` across completed artifacts (stable "last published" time). + 2. Newest ``createdAt`` across active build rows (for first-time builds). + 3. Newest ``createdAt`` across all rows (fallback for failed-only groups). + """ + completed_rows: list[ImageRow] = [ + p.completed for p in partition.values() if p.completed is not None + ] + if completed_rows: + chosen = _latest(completed_rows, "pushedAt", "completedAt", "createdAt") + if chosen is not None: + ts = _parse_ts( + chosen.get("pushedAt") or chosen.get("completedAt") or chosen.get("createdAt") + ) + if ts: + return ts.strftime("%Y-%m-%d %H:%M") + + active_rows: list[ImageRow] = [p.active for p in partition.values() if p.active is not None] + if active_rows: + chosen = _latest(active_rows, "createdAt") + if chosen is not None: + ts = _parse_ts(chosen.get("createdAt")) + if ts: + return ts.strftime("%Y-%m-%d %H:%M") + + chosen = _latest(artifacts, "createdAt") + if chosen is not None: + ts = _parse_ts(chosen.get("createdAt")) + if ts: + return ts.strftime("%Y-%m-%d %H:%M") + return "" + + +def _group_sort_key(partition: PartitionMap, artifacts: list[ImageRow]) -> datetime: + """Key function to sort groups newest-first by their display timestamp.""" + completed_rows: list[ImageRow] = [ + p.completed for p in partition.values() if p.completed is not None + ] + if completed_rows: + chosen = _latest(completed_rows, "pushedAt", "completedAt", "createdAt") + if chosen is not None: + ts = _parse_ts( + chosen.get("pushedAt") or chosen.get("completedAt") or chosen.get("createdAt") + ) + if ts: + return ts + + active_rows: list[ImageRow] = [p.active for p in partition.values() if p.active is not None] + if active_rows: + chosen = _latest(active_rows, "createdAt") + if chosen is not None: + ts = _parse_ts(chosen.get("createdAt")) + if ts: + return ts + + chosen = _latest(artifacts, "createdAt") + if chosen is not None: + ts = _parse_ts(chosen.get("createdAt")) + if ts: + return ts + return datetime.min.replace(tzinfo=timezone.utc) + @app.command("push") def push_image( @@ -249,12 +566,12 @@ def list_images( client = APIClient() # Build query params - params = {} + params: dict[str, str] = {} if config.team_id: params["teamId"] = config.team_id response = client.request("GET", "/images", params=params if params else None) - images = response.get("data", []) + images: list[ImageRow] = response.get("data", []) if not images: console.print("[yellow]No images or builds found.[/yellow]") @@ -266,94 +583,74 @@ def list_images( return # Table output - if config.team_id: + is_team_listing: bool = bool(config.team_id) + title: str + if is_team_listing: title = f"Team Docker Images (team: {config.team_id})" else: title = "Personal Docker Images" - # Group images by (owner, imageName, imageTag) to deduplicate rows. - grouped: dict[str, list] = {} + grouped: dict[str, list[ImageRow]] = {} for img in images: owner_scope = img.get("teamId") or img.get("ownerType", "personal") key = f"{owner_scope}/{img.get('imageName', '')}:{img.get('imageTag', 'latest')}" grouped.setdefault(key, []).append(img) - table = Table(title=title) - table.add_column("Image Reference", style="cyan") - table.add_column("Type", justify="center") - table.add_column("Owner", justify="center") - table.add_column("Status", justify="center") - table.add_column("Size", justify="right") - table.add_column("Created", style="dim") + ref_max_width: int = _image_ref_column_width( + console.size.width, is_team_listing=is_team_listing + ) + table = Table(title=title) + table.add_column( + "Image Reference", + style="cyan", + no_wrap=True, + overflow="crop", + min_width=ref_max_width, + max_width=ref_max_width, + ) + table.add_column("Type", justify="center", no_wrap=True) + if is_team_listing: + table.add_column("Owner", justify="center") + table.add_column("Status", justify="center", no_wrap=True, min_width=27) + table.add_column("Size", justify="right", no_wrap=True) + table.add_column("Created", style="dim", no_wrap=True, min_width=16) + + sortable: list[tuple[datetime, list[ImageRow], PartitionMap]] = [] for _key, artifacts in grouped.items(): - # Build type display by combining unique artifact types - artifact_types = {art.get("artifactType", "CONTAINER_IMAGE") for art in artifacts} - type_parts = [] - for at in sorted(artifact_types): - if at == "VM_SANDBOX": - type_parts.append("[magenta]VM[/magenta]") - else: - type_parts.append("[cyan]Container[/cyan]") - type_display = " / ".join(type_parts) - - # Use the first artifact for shared fields, prefer container image - img = next( - ( - a - for a in artifacts - if a.get("artifactType", "CONTAINER_IMAGE") == "CONTAINER_IMAGE" - ), - artifacts[0], + partition = _partition_group(artifacts) + sortable.append((_group_sort_key(partition, artifacts), artifacts, partition)) + sortable.sort(key=lambda item: item[0], reverse=True) + + for _ts, artifacts, partition in sortable: + container_part = partition.get("CONTAINER_IMAGE") or ArtifactPartition() + preferred: ImageRow = ( + container_part.completed + or container_part.active + or container_part.failed_only + or next(iter(artifacts), {}) ) - # Status — show combined if they differ - statuses = {a.get("status", "UNKNOWN") for a in artifacts} - status_displays = [] - for s in sorted(statuses): - if s == "COMPLETED": - status_displays.append("[green]Ready[/green]") - elif s == "BUILDING": - status_displays.append("[yellow]Building[/yellow]") - elif s == "PENDING": - status_displays.append("[blue]Pending[/blue]") - elif s == "FAILED": - status_displays.append("[red]Failed[/red]") - elif s == "CANCELLED": - status_displays.append("[dim]Cancelled[/dim]") - else: - status_displays.append(f"[dim]{s}[/dim]") - status_display = " / ".join(status_displays) - - # Owner type - owner_type = img.get("ownerType", "personal") - if owner_type == "team": - owner_display = "[blue]Team[/blue]" - else: - owner_display = "[dim]Personal[/dim]" - - # Size — sum across artifacts - total_bytes = sum(a.get("sizeBytes") or 0 for a in artifacts) - size_mb = f"{total_bytes / 1024 / 1024:.1f} MB" if total_bytes else "" - - # Date - use pushedAt for completed images, createdAt for builds - try: - if img.get("pushedAt"): - date_dt = datetime.fromisoformat(img["pushedAt"].replace("Z", "+00:00")) - else: - date_dt = datetime.fromisoformat(img["createdAt"].replace("Z", "+00:00")) - date_str = date_dt.strftime("%Y-%m-%d %H:%M") - except Exception: - date_str = img.get("pushedAt") or img.get("createdAt", "") - - # Image reference - prefer displayRef for user-friendly format - image_ref = ( - img.get("displayRef") - or img.get("fullImagePath") - or f"{img.get('imageName', 'unknown')}:{img.get('imageTag', 'latest')}" + image_ref: str = _truncate_ref_left( + _render_image_reference(preferred, is_team_listing=is_team_listing), + ref_max_width, ) - - table.add_row(image_ref, type_display, owner_display, status_display, size_mb, date_str) + type_display: str = _render_type_column(partition) + status_display: str = _render_status_column(partition) + size_mb: str = _completed_size_mb(artifacts) + date_str: str = _display_created(partition, artifacts) + + row: list[str] = [image_ref, type_display] + if is_team_listing: + owner_type = preferred.get( + "ownerType", "team" if preferred.get("teamId") else "personal" + ) + owner_display: str = ( + "[blue]Team[/blue]" if owner_type == "team" else "[dim]Personal[/dim]" + ) + row.append(owner_display) + row.extend([status_display, size_mb, date_str]) + table.add_row(*row) console.print() console.print(table) diff --git a/packages/prime/tests/test_images_list.py b/packages/prime/tests/test_images_list.py new file mode 100644 index 000000000..6e5bfe0fb --- /dev/null +++ b/packages/prime/tests/test_images_list.py @@ -0,0 +1,623 @@ +"""Tests for `prime images list` rendering helpers and end-to-end output.""" + +from __future__ import annotations + +from typing import Any + +from prime_cli.commands import images as images_cmd +from prime_cli.commands.images import ( + ArtifactPartition, + ImageRow, + _completed_size_mb, + _display_created, + _image_ref_column_width, + _partition_group, + _render_image_reference, + _render_status_column, + _render_status_slot, + _render_type_column, + _truncate_ref_left, +) +from prime_cli.main import app +from typer.testing import CliRunner + +runner: CliRunner = CliRunner() + +TEST_ENV: dict[str, str] = { + "COLUMNS": "200", + "LINES": "50", + "NO_COLOR": "1", + "PRIME_DISABLE_VERSION_CHECK": "1", +} + + +USER_ID: str = "cmkrcib4x00004kjyxq48nltd" +TEAM_ID: str = "team-abc123" + + +# --------------------------------------------------------------------------- +# Fixture builders +# --------------------------------------------------------------------------- + + +def _art( + artifact_type: str, + status: str, + *, + image_name: str = "nvidia-basic-dev", + image_tag: str = "latest", + pushed_at: str | None = None, + created_at: str = "2026-04-01T10:00:00", + size_bytes: int | None = None, + team_id: str | None = None, + owner_type: str | None = None, + display_ref: str | None = None, +) -> ImageRow: + ref: str = ( + display_ref + if display_ref is not None + else (f"{('team-' + team_id) if team_id else USER_ID}/{image_name}:{image_tag}") + ) + row: dict[str, Any] = { + "id": f"id-{artifact_type}-{status}-{created_at}", + "artifactType": artifact_type, + "imageName": image_name, + "imageTag": image_tag, + "status": status, + "fullImagePath": f"{USER_ID}/{image_name}:{image_tag}", + "errorMessage": None, + "sizeBytes": size_bytes, + "createdAt": created_at, + "startedAt": None, + "completedAt": None, + "pushedAt": pushed_at, + "teamId": team_id, + "ownerType": owner_type or ("team" if team_id else "personal"), + "displayRef": ref, + } + return row + + +# --------------------------------------------------------------------------- +# _partition_group +# --------------------------------------------------------------------------- + + +def test_partition_healthy_image_has_completed_only(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + ] + part = _partition_group(arts) + assert part["CONTAINER_IMAGE"].completed is not None + assert part["CONTAINER_IMAGE"].active is None + assert part["CONTAINER_IMAGE"].failed_only is None + assert part["VM_SANDBOX"].completed is not None + + +def test_partition_hides_failed_rows_when_completed_exists(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T21:00:00"), + _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T20:55:00"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T21:00:00"), + ] + part = _partition_group(arts) + assert part["CONTAINER_IMAGE"].completed is not None + assert part["CONTAINER_IMAGE"].failed_only is None + assert part["VM_SANDBOX"].failed_only is None + + +def test_partition_surfaces_failure_when_no_completed(): + arts = [ + _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T21:00:00"), + _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T21:00:00"), + ] + part = _partition_group(arts) + assert part["CONTAINER_IMAGE"].completed is None + assert part["CONTAINER_IMAGE"].failed_only is not None + assert part["CONTAINER_IMAGE"].failed_only["status"] == "FAILED" + + +def test_partition_marks_active_rebuild_alongside_completed(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-16T10:00:00"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + ] + part = _partition_group(arts) + assert part["CONTAINER_IMAGE"].completed is not None + assert part["CONTAINER_IMAGE"].active is not None + assert part["CONTAINER_IMAGE"].active["status"] == "BUILDING" + + +# --------------------------------------------------------------------------- +# _render_status_slot +# --------------------------------------------------------------------------- + + +def test_render_status_slot_ready(): + part = ArtifactPartition(completed={"status": "COMPLETED"}) + assert _render_status_slot(part) == "[green]Ready[/green]" + + +def test_render_status_slot_rebuilding_replaces_status(): + part = ArtifactPartition( + completed={"status": "COMPLETED"}, + active={"status": "BUILDING"}, + ) + out = _render_status_slot(part) + assert "rebuilding" in out + assert "Ready" not in out + + +def test_render_status_slot_first_time_build(): + part = ArtifactPartition(active={"status": "BUILDING"}) + assert _render_status_slot(part) == "[yellow]Building[/yellow]" + + +def test_render_status_slot_uploading(): + part = ArtifactPartition(active={"status": "UPLOADING"}) + assert _render_status_slot(part) == "[yellow]Uploading[/yellow]" + + +def test_render_status_slot_pending(): + part = ArtifactPartition(active={"status": "PENDING"}) + assert _render_status_slot(part) == "[blue]Pending[/blue]" + + +def test_render_status_slot_failed_only(): + part = ArtifactPartition(failed_only={"status": "FAILED"}) + assert _render_status_slot(part) == "[red]Failed[/red]" + + +def test_render_status_slot_cancelled(): + part = ArtifactPartition(failed_only={"status": "CANCELLED"}) + assert _render_status_slot(part) == "[dim]Cancelled[/dim]" + + +def test_render_status_slot_none_part_returns_dash(): + assert _render_status_slot(None) == "[dim]—[/dim]" + + +def test_render_status_slot_empty_part_returns_dash(): + assert _render_status_slot(ArtifactPartition()) == "[dim]—[/dim]" + + +# --------------------------------------------------------------------------- +# _render_type_column +# --------------------------------------------------------------------------- + + +def test_render_type_column_container_and_vm(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + ] + text = _render_type_column(_partition_group(arts)) + assert "Container" in text and "VM" in text + assert " / " in text + assert text.index("Container") < text.index("VM") + + +def test_render_type_column_container_only_legacy(): + arts = [_art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07")] + text = _render_type_column(_partition_group(arts)) + assert "Container" in text + assert "VM" not in text + assert " / " not in text + + +# --------------------------------------------------------------------------- +# _render_status_column (positional slots) +# --------------------------------------------------------------------------- + + +def test_render_status_column_healthy_slots(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + ] + text = _render_status_column(_partition_group(arts)) + assert text == "[green]Ready[/green] / [green]Ready[/green]" + + +def test_render_status_column_partial_failure_aligned(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T22:00:00"), + ] + text = _render_status_column(_partition_group(arts)) + assert text == "[green]Ready[/green] / [red]Failed[/red]" + + +def test_render_status_column_container_only_legacy(): + arts = [_art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07")] + text = _render_status_column(_partition_group(arts)) + assert text == "[green]Ready[/green]" + assert " / " not in text + + +def test_render_status_column_rebuild_replaces_status(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-16T10:00:00"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + ] + text = _render_status_column(_partition_group(arts)) + assert text.count("rebuilding") == 2 + assert "Ready" not in text + + +def test_render_status_column_rebuild_vm_only(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + ] + text = _render_status_column(_partition_group(arts)) + assert text.startswith("[green]Ready[/green]") + assert "rebuilding" in text + assert text.count("rebuilding") == 1 + + +def test_render_status_column_stale_failures_hidden_by_completed(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + ] + for i in range(5): + arts.append(_art("CONTAINER_IMAGE", "FAILED", created_at=f"2026-04-16T20:{i:02d}:00")) + arts.append(_art("VM_SANDBOX", "FAILED", created_at=f"2026-04-16T20:{i:02d}:00")) + text = _render_status_column(_partition_group(arts)) + assert "Failed" not in text + assert text.count("Ready") == 2 + + +# --------------------------------------------------------------------------- +# _render_image_reference +# --------------------------------------------------------------------------- + + +def test_render_image_reference_always_shows_user_prefix_for_personal(): + img = _art("CONTAINER_IMAGE", "COMPLETED", image_name="myapp", image_tag="v1") + assert _render_image_reference(img, is_team_listing=False) == f"{USER_ID}/myapp:v1" + + +def test_render_image_reference_keeps_team_prefix_for_team_listing(): + img = _art( + "CONTAINER_IMAGE", + "COMPLETED", + team_id=TEAM_ID, + image_name="myapp", + image_tag="v1", + ) + assert _render_image_reference(img, is_team_listing=True) == f"team-{TEAM_ID}/myapp:v1" + + +def test_render_image_reference_falls_back_without_display_ref(): + img = {"imageName": "legacy", "imageTag": "v2"} + assert _render_image_reference(img, is_team_listing=False) == "legacy:v2" + + +# --------------------------------------------------------------------------- +# _truncate_ref_left +# --------------------------------------------------------------------------- + + +def test_truncate_ref_left_returns_unchanged_when_within_width(): + assert _truncate_ref_left("short/name:tag", 40) == "short/name:tag" + + +def test_truncate_ref_left_clips_head_and_preserves_name_tag(): + ref = f"{USER_ID}/nvidia-basic-dev:latest" + out = _truncate_ref_left(ref, 30) + assert out.endswith("nvidia-basic-dev:latest") + assert out.startswith("…") + assert len(out) == 30 + + +def test_truncate_ref_left_handles_zero_or_negative_budget(): + ref = f"{USER_ID}/x:y" + assert _truncate_ref_left(ref, 0) == ref + assert _truncate_ref_left(ref, -5) == ref + + +def test_truncate_ref_left_clamps_tiny_budget_to_two(): + out = _truncate_ref_left("abcdefghij", 1) + assert out == "…j" + + +# --------------------------------------------------------------------------- +# _image_ref_column_width +# --------------------------------------------------------------------------- + + +def test_image_ref_column_width_wide_terminal_capped_at_80(): + assert _image_ref_column_width(300, is_team_listing=False) == 80 + + +def test_image_ref_column_width_narrow_terminal_clamped_to_floor(): + assert _image_ref_column_width(60, is_team_listing=False) == 30 + + +def test_image_ref_column_width_scales_with_available_space(): + w_personal = _image_ref_column_width(140, is_team_listing=False) + w_team = _image_ref_column_width(140, is_team_listing=True) + assert w_team < w_personal + assert 20 <= w_team <= 80 + assert 20 <= w_personal <= 80 + + +# --------------------------------------------------------------------------- +# _completed_size_mb +# --------------------------------------------------------------------------- + + +def test_completed_size_sums_only_completed_artifacts(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", size_bytes=1024 * 1024 * 100), + _art("VM_SANDBOX", "COMPLETED", size_bytes=1024 * 1024 * 200), + _art("CONTAINER_IMAGE", "BUILDING"), + _art("VM_SANDBOX", "FAILED"), + ] + assert _completed_size_mb(arts) == "300.0 MB" + + +def test_completed_size_returns_dash_when_no_completed(): + arts = [ + _art("CONTAINER_IMAGE", "BUILDING"), + _art("VM_SANDBOX", "PENDING"), + ] + assert _completed_size_mb(arts) == "[dim]—[/dim]" + + +# --------------------------------------------------------------------------- +# _display_created +# --------------------------------------------------------------------------- + + +def test_display_created_uses_latest_pushed_at_when_completed_exists(): + arts = [ + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-17T09:00:00"), + ] + part = _partition_group(arts) + assert _display_created(part, arts) == "2026-04-16 22:24" + + +def test_display_created_falls_back_to_active_when_no_completed(): + arts = [ + _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-17T09:00:00"), + _art("VM_SANDBOX", "PENDING", created_at="2026-04-17T08:00:00"), + ] + part = _partition_group(arts) + assert _display_created(part, arts) == "2026-04-17 09:00" + + +def test_display_created_falls_back_to_any_row_when_only_failed(): + arts = [ + _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T12:00:00"), + _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T13:00:00"), + ] + part = _partition_group(arts) + assert _display_created(part, arts) == "2026-04-16 13:00" + + +# --------------------------------------------------------------------------- +# End-to-end: run the CLI with a mocked API client +# --------------------------------------------------------------------------- + + +def _build_dummy_client(payload: list[ImageRow]) -> type: + class DummyAPIClient: + def request( + self, + method: str, + path: str, + json: Any = None, + params: dict[str, Any] | None = None, + ) -> dict[str, Any]: + assert method == "GET" + assert path == "/images" + return {"data": payload} + + return DummyAPIClient + + +class _StubConfig: + def __init__(self, team_id: str | None = None) -> None: + self.team_id: str | None = team_id + + +def _patch_config(monkeypatch: Any, team_id: str | None = None) -> None: + """Replace the module-level ``config`` with a stub.""" + monkeypatch.setattr(images_cmd, "config", _StubConfig(team_id=team_id)) + + +def test_list_cli_shows_type_and_positional_status(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + payload = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + size_bytes=100 * 1024 * 1024, + ), + _art( + "VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07", size_bytes=200 * 1024 * 1024 + ), + _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T20:00:00"), + _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T20:00:00"), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + + assert "Container / VM" in result.output + assert "Ready / Ready" in result.output + assert "Container:" not in result.output + assert "VM:" not in result.output + assert "Failed" not in result.output + assert f"{USER_ID}/nvidia-basic-dev:latest" in result.output + assert "300.0 MB" in result.output + + +def test_list_cli_shows_rebuilding_marker(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + payload = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-15T20:57:52", + size_bytes=50 * 1024 * 1024, + ), + _art( + "VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52", size_bytes=50 * 1024 * 1024 + ), + _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-16T10:00:00"), + _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + assert "rebuilding" in result.output + assert "2026-04-15 20:57" in result.output + + +def test_list_cli_team_listing_keeps_owner_column_and_prefix(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=TEAM_ID) + + payload = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + size_bytes=10 * 1024 * 1024, + team_id=TEAM_ID, + image_name="teamapp", + ), + _art( + "VM_SANDBOX", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + size_bytes=20 * 1024 * 1024, + team_id=TEAM_ID, + image_name="teamapp", + ), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + assert "Owner" in result.output + assert f"team-{TEAM_ID}/teamapp:latest" in result.output + + +def test_list_cli_container_only_legacy_image(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + payload = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + size_bytes=10 * 1024 * 1024, + image_name="legacy", + ), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + assert "Container" in result.output + assert "Ready" in result.output + assert "VM" not in result.output + + +def test_list_cli_first_time_build_shows_building(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + payload = [ + _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-17T09:00:00"), + _art("VM_SANDBOX", "BUILDING", created_at="2026-04-17T09:00:00"), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + assert "Container / VM" in result.output + assert "Building / Building" in result.output + assert "Container:" not in result.output + assert "—" in result.output + + +def test_list_cli_truncates_owner_prefix_on_narrow_terminal(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + payload = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + size_bytes=1024 * 1024, + image_name="nvidia-basic-dev", + ), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + narrow_env = dict(TEST_ENV, COLUMNS="90") + result = runner.invoke(app, ["images", "list"], env=narrow_env) + assert result.exit_code == 0, result.output + assert "nvidia-basic-dev:latest" in result.output + assert "…" in result.output + assert f"{USER_ID}/nvidia-basic-dev" not in result.output + + +def test_list_cli_newest_group_first(monkeypatch): + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + older = _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-03-24T20:11:46", + image_name="old-image", + size_bytes=1024 * 1024, + ) + newer = _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + image_name="new-image", + size_bytes=1024 * 1024, + ) + payload = [older, newer] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + new_idx = result.output.find("new-image") + old_idx = result.output.find("old-image") + assert 0 <= new_idx < old_idx From 563617f7489cdc66cd676474af1253dababa6758 Mon Sep 17 00:00:00 2001 From: Damian Barabonkov Date: Fri, 17 Apr 2026 14:43:45 -0400 Subject: [PATCH 2/7] refactor: Streamline artifact row handling and status display --- .../prime/src/prime_cli/commands/images.py | 272 ++++++------ packages/prime/tests/test_images_list.py | 415 ++++++++++++++---- 2 files changed, 450 insertions(+), 237 deletions(-) diff --git a/packages/prime/src/prime_cli/commands/images.py b/packages/prime/src/prime_cli/commands/images.py index 1b1055c23..ba8e59c79 100644 --- a/packages/prime/src/prime_cli/commands/images.py +++ b/packages/prime/src/prime_cli/commands/images.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import Any, Iterable, Optional +from typing import Any, Optional import click import httpx @@ -43,25 +43,28 @@ class ArtifactPartition: """Per-artifact-type view of a grouped image. - All three fields may be ``None``. ``failed_only`` is populated exclusively - when there is no completed artifact for that type — i.e. stale failures - that still belong in the Status column. + Holds the single most recently updated row for this artifact type. The + status of that row is what gets rendered — we intentionally ignore older + rows (including stale ``BUILDING`` / ``PENDING`` entries the backend may + have orphaned) so the display reflects the current truth rather than a + composite derived from history. """ - completed: Optional[ImageRow] = None - active: Optional[ImageRow] = None - failed_only: Optional[ImageRow] = None + latest: Optional[ImageRow] = None def is_empty(self) -> bool: - """True when the bucket has no completed, active, or failed row.""" - return self.completed is None and self.active is None and self.failed_only is None + """True when no row exists for this artifact type.""" + return self.latest is None # Mapping of artifact type (e.g. ``CONTAINER_IMAGE``) to its partition bucket. PartitionMap = dict[str, ArtifactPartition] -_ACTIVE_STATUSES: set[str] = {"PENDING", "UPLOADING", "BUILDING"} -_FAILED_STATUSES: set[str] = {"FAILED", "CANCELLED"} +# Timestamp priority used when picking the single latest row per artifact +# type. ``pushedAt`` wins for completed uploads; ``completedAt`` covers +# failed/cancelled terminal states; ``startedAt`` and ``createdAt`` are +# ultimate fallbacks for rows that never finished. +_LATEST_ROW_KEYS: tuple[str, ...] = ("pushedAt", "completedAt", "startedAt", "createdAt") def _parse_ts(value: Any) -> Optional[datetime]: @@ -82,8 +85,24 @@ def _parse_ts(value: Any) -> Optional[datetime]: return dt -def _latest(rows: list[ImageRow], *keys: str) -> Optional[ImageRow]: - """Return the row whose first non-null parsed timestamp (across ``keys``) is newest.""" +def _latest(rows: list[ImageRow], *keys: str) -> Optional[tuple[ImageRow, Optional[datetime]]]: + """Return the row whose first parseable timestamp (across ``keys``) is newest. + + Each row is evaluated by walking ``keys`` in order and taking the first + value that ``_parse_ts`` accepts. This means an unparseable-but-truthy + value (e.g. a malformed date string) is skipped rather than short-circuiting + selection — and the *same* parsed timestamp is returned alongside the row + so callers don't re-read a potentially different field. + + The returned ``datetime`` is ``None`` only in the fallback case where no + row had any parseable timestamp; we still return the first row so Size / + Reference fields can be derived, but callers should treat "no timestamp" + as a signal to fall through to a lower-priority tier. + + Returns ``None`` if ``rows`` is empty. + """ + if not rows: + return None best: Optional[ImageRow] = None best_ts: Optional[datetime] = None for row in rows: @@ -97,39 +116,48 @@ def _latest(rows: list[ImageRow], *keys: str) -> Optional[ImageRow]: if best_ts is None or ts > best_ts: best = row best_ts = ts - if best is None and rows: - best = rows[0] - return best + if best is None: + return rows[0], None + return best, best_ts -def _partition_group(artifacts: list[ImageRow]) -> PartitionMap: - """Partition a group of artifact rows (all for the same ``imageName:imageTag``) by - artifact type into a ``completed`` / ``active`` / ``failed_only`` bucket. +def _coerce_artifact_type(value: Any) -> str: + """Normalise an ``artifactType`` value into a usable string key. + + Defensive against a malformed backend payload (``null``, missing, or a + non-string value) so that downstream ``sorted(partition)`` and label + rendering never blow up on mixed key types. + """ + if isinstance(value, str) and value: + return value + return "CONTAINER_IMAGE" - ``failed_only`` is populated only when no completed artifact exists for that type; - once a user has a working image, stale failed rows are intentionally suppressed so - they don't pollute the Status column. + +def _pick_row(rows: list[ImageRow], *keys: str) -> Optional[ImageRow]: + """Return just the row component from ``_latest`` (drops the timestamp).""" + result = _latest(rows, *keys) + return result[0] if result is not None else None + + +def _partition_group(artifacts: list[ImageRow]) -> PartitionMap: + """Group artifact rows by type, keeping only the most recent row per type. + + For a single ``imageName:imageTag`` the backend returns one row per + (build, artifact type) plus any completed artifacts from the user images + table. We pick the single newest row per artifact type (ordered by + ``pushedAt → completedAt → startedAt → createdAt``) and render its raw + ``status`` verbatim. Older rows — including stuck ``BUILDING`` zombies + and stale failures — are simply not considered. """ by_type: dict[str, list[ImageRow]] = {} for art in artifacts: - t = art.get("artifactType", "CONTAINER_IMAGE") + t = _coerce_artifact_type(art.get("artifactType")) by_type.setdefault(t, []).append(art) result: PartitionMap = {} for art_type, rows in by_type.items(): - completed = [r for r in rows if r.get("status") == "COMPLETED"] - active = [r for r in rows if r.get("status") in _ACTIVE_STATUSES] - failed = [r for r in rows if r.get("status") in _FAILED_STATUSES] - - result[art_type] = ArtifactPartition( - completed=( - _latest(completed, "pushedAt", "completedAt", "createdAt") if completed else None - ), - active=_latest(active, "createdAt") if active else None, - failed_only=( - _latest(failed, "completedAt", "createdAt") if failed and not completed else None - ), - ) + latest_row = _pick_row(rows, *_LATEST_ROW_KEYS) + result[art_type] = ArtifactPartition(latest=latest_row) return result @@ -144,7 +172,8 @@ def _ordered_present_types(partition: PartitionMap) -> list[tuple[str, str]]: Container first, then VM, then any future types in sorted order. Types whose per-artifact partition bucket is completely empty (no completed, - no active, no failed_only) are skipped so we don't render dead slots. + no active, no failed_only, no other) are skipped so we don't render + dead slots. """ ordered: list[tuple[str, str]] = [] for art_type, label in _TYPE_LABELS: @@ -155,7 +184,8 @@ def _ordered_present_types(partition: PartitionMap) -> list[tuple[str, str]]: if art_type in {"CONTAINER_IMAGE", "VM_SANDBOX"}: continue if not partition[art_type].is_empty(): - ordered.append((art_type, f"[white]{art_type.replace('_', ' ').title()}[/white]")) + label = f"[white]{str(art_type).replace('_', ' ').title()}[/white]" + ordered.append((art_type, label)) return ordered @@ -165,35 +195,26 @@ def _render_type_column(partition: PartitionMap) -> str: return " / ".join(parts) if parts else "[dim]—[/dim]" +_STATUS_LABELS: dict[str, str] = { + "COMPLETED": "[green]Ready[/green]", + "BUILDING": "[yellow]Building[/yellow]", + "UPLOADING": "[yellow]Uploading[/yellow]", + "PENDING": "[blue]Pending[/blue]", + "FAILED": "[red]Failed[/red]", + "CANCELLED": "[dim]Cancelled[/dim]", +} + + def _render_status_slot(part: Optional[ArtifactPartition]) -> str: - """Return the status word for a single artifact slot (no label prefix). + """Render the raw status of the latest row for this artifact type. - When a completed artifact coexists with an active build row, the slot is - rendered as ``(rebuilding)`` to communicate that an update is in flight - while the previous build is still the usable version. + Unknown statuses (e.g. a future backend addition) are rendered as a + dim title-cased label rather than being dropped. """ - if part is None: + if part is None or part.latest is None: return "[dim]—[/dim]" - - if part.completed and part.active: - return "[yellow italic](rebuilding)[/yellow italic]" - if part.completed: - return "[green]Ready[/green]" - if part.active: - status = part.active.get("status", "UNKNOWN") - if status == "BUILDING": - return "[yellow]Building[/yellow]" - if status == "UPLOADING": - return "[yellow]Uploading[/yellow]" - if status == "PENDING": - return "[blue]Pending[/blue]" - return f"[dim]{status.title()}[/dim]" - if part.failed_only: - status = part.failed_only.get("status", "FAILED") - if status == "CANCELLED": - return "[dim]Cancelled[/dim]" - return "[red]Failed[/red]" - return "[dim]—[/dim]" + status = str(part.latest.get("status") or "UNKNOWN") + return _STATUS_LABELS.get(status, f"[dim]{status.title()}[/dim]") def _render_status_column(partition: PartitionMap) -> str: @@ -255,7 +276,7 @@ def _image_ref_column_width(console_width: int, is_team_listing: bool) -> int: The remaining columns have roughly fixed widths (worst-case labels): Type ~14 chars ("Container / VM") - Status ~26 chars ("(rebuilding) / (rebuilding)") + Status ~20 chars ("Uploading / Uploading") Size ~10 chars Created ~17 chars ("YYYY-MM-DD HH:MM ") Owner ~10 chars (team listings only) @@ -265,85 +286,54 @@ def _image_ref_column_width(console_width: int, is_team_listing: bool) -> int: so the column is neither absurdly wide on large terminals nor unusable on tiny ones. """ - reserved = 14 + 26 + 10 + 17 + (10 if is_team_listing else 0) + reserved = 14 + 20 + 10 + 17 + (10 if is_team_listing else 0) num_cols = 5 + (1 if is_team_listing else 0) reserved += 3 * num_cols budget = console_width - reserved return max(30, min(80, budget)) -def _completed_size_mb(artifacts: Iterable[ImageRow]) -> str: - """Sum sizes of COMPLETED artifacts only and format as a MB string.""" - total = sum((a.get("sizeBytes") or 0) for a in artifacts if a.get("status") == "COMPLETED") +def _completed_size_mb(partition: PartitionMap) -> str: + """Sum sizes of the latest COMPLETED rows per artifact type. + + Only completed artifacts carry a meaningful ``sizeBytes``; in-flight and + failed rows contribute nothing, so summing across the latest-per-type + gives the current on-disk footprint of the image. + """ + total = 0 + for part in partition.values(): + row = part.latest + if row is None or row.get("status") != "COMPLETED": + continue + total += row.get("sizeBytes") or 0 if total <= 0: return "[dim]—[/dim]" return f"{total / 1024 / 1024:.1f} MB" -def _display_created(partition: PartitionMap, artifacts: list[ImageRow]) -> str: - """Pick the most meaningful timestamp for the Created column. - - Preference order: - 1. Newest ``pushedAt`` across completed artifacts (stable "last published" time). - 2. Newest ``createdAt`` across active build rows (for first-time builds). - 3. Newest ``createdAt`` across all rows (fallback for failed-only groups). - """ - completed_rows: list[ImageRow] = [ - p.completed for p in partition.values() if p.completed is not None - ] - if completed_rows: - chosen = _latest(completed_rows, "pushedAt", "completedAt", "createdAt") - if chosen is not None: - ts = _parse_ts( - chosen.get("pushedAt") or chosen.get("completedAt") or chosen.get("createdAt") - ) - if ts: - return ts.strftime("%Y-%m-%d %H:%M") - - active_rows: list[ImageRow] = [p.active for p in partition.values() if p.active is not None] - if active_rows: - chosen = _latest(active_rows, "createdAt") - if chosen is not None: - ts = _parse_ts(chosen.get("createdAt")) - if ts: - return ts.strftime("%Y-%m-%d %H:%M") - - chosen = _latest(artifacts, "createdAt") - if chosen is not None: - ts = _parse_ts(chosen.get("createdAt")) - if ts: - return ts.strftime("%Y-%m-%d %H:%M") - return "" - - -def _group_sort_key(partition: PartitionMap, artifacts: list[ImageRow]) -> datetime: - """Key function to sort groups newest-first by their display timestamp.""" - completed_rows: list[ImageRow] = [ - p.completed for p in partition.values() if p.completed is not None +def _pick_display_datetime(partition: PartitionMap) -> Optional[datetime]: + """Return the newest timestamp across the latest row of each artifact type.""" + latest_rows: list[ImageRow] = [ + part.latest for part in partition.values() if part.latest is not None ] - if completed_rows: - chosen = _latest(completed_rows, "pushedAt", "completedAt", "createdAt") - if chosen is not None: - ts = _parse_ts( - chosen.get("pushedAt") or chosen.get("completedAt") or chosen.get("createdAt") - ) - if ts: - return ts + if not latest_rows: + return None + result = _latest(latest_rows, *_LATEST_ROW_KEYS) + if result is None: + return None + return result[1] - active_rows: list[ImageRow] = [p.active for p in partition.values() if p.active is not None] - if active_rows: - chosen = _latest(active_rows, "createdAt") - if chosen is not None: - ts = _parse_ts(chosen.get("createdAt")) - if ts: - return ts - chosen = _latest(artifacts, "createdAt") - if chosen is not None: - ts = _parse_ts(chosen.get("createdAt")) - if ts: - return ts - return datetime.min.replace(tzinfo=timezone.utc) +def _display_created(partition: PartitionMap) -> str: + """Format the Created column value for a grouped image row.""" + ts = _pick_display_datetime(partition) + return ts.strftime("%Y-%m-%d %H:%M") if ts is not None else "" + + +def _group_sort_key(partition: PartitionMap) -> datetime: + """Key function to sort groups newest-first by their display timestamp.""" + ts = _pick_display_datetime(partition) + return ts if ts is not None else datetime.min.replace(tzinfo=timezone.utc) @app.command("push") @@ -612,24 +602,26 @@ def list_images( table.add_column("Type", justify="center", no_wrap=True) if is_team_listing: table.add_column("Owner", justify="center") - table.add_column("Status", justify="center", no_wrap=True, min_width=27) + # Worst-case label is ``Cancelled / Cancelled`` (21 chars); pin to 21 + # so Rich never wraps the status text across two lines. + table.add_column("Status", justify="center", no_wrap=True, min_width=21) table.add_column("Size", justify="right", no_wrap=True) table.add_column("Created", style="dim", no_wrap=True, min_width=16) sortable: list[tuple[datetime, list[ImageRow], PartitionMap]] = [] for _key, artifacts in grouped.items(): partition = _partition_group(artifacts) - sortable.append((_group_sort_key(partition, artifacts), artifacts, partition)) + sortable.append((_group_sort_key(partition), artifacts, partition)) sortable.sort(key=lambda item: item[0], reverse=True) for _ts, artifacts, partition in sortable: - container_part = partition.get("CONTAINER_IMAGE") or ArtifactPartition() - preferred: ImageRow = ( - container_part.completed - or container_part.active - or container_part.failed_only - or next(iter(artifacts), {}) - ) + # Pick a representative row for the Image Reference / Owner + # columns. Prefer the latest container artifact row so the + # reference always resolves to something the user can copy-paste, + # then fall back to the latest VM row, then to any raw artifact. + container_latest = (partition.get("CONTAINER_IMAGE") or ArtifactPartition()).latest + vm_latest = (partition.get("VM_SANDBOX") or ArtifactPartition()).latest + preferred: ImageRow = container_latest or vm_latest or next(iter(artifacts), {}) image_ref: str = _truncate_ref_left( _render_image_reference(preferred, is_team_listing=is_team_listing), @@ -637,8 +629,8 @@ def list_images( ) type_display: str = _render_type_column(partition) status_display: str = _render_status_column(partition) - size_mb: str = _completed_size_mb(artifacts) - date_str: str = _display_created(partition, artifacts) + size_mb: str = _completed_size_mb(partition) + date_str: str = _display_created(partition) row: list[str] = [image_ref, type_display] if is_team_listing: diff --git a/packages/prime/tests/test_images_list.py b/packages/prime/tests/test_images_list.py index 6e5bfe0fb..2f5125da5 100644 --- a/packages/prime/tests/test_images_list.py +++ b/packages/prime/tests/test_images_list.py @@ -10,6 +10,7 @@ ImageRow, _completed_size_mb, _display_created, + _group_sort_key, _image_ref_column_width, _partition_group, _render_image_reference, @@ -47,6 +48,8 @@ def _art( image_name: str = "nvidia-basic-dev", image_tag: str = "latest", pushed_at: str | None = None, + completed_at: str | None = None, + started_at: str | None = None, created_at: str = "2026-04-01T10:00:00", size_bytes: int | None = None, team_id: str | None = None, @@ -68,8 +71,8 @@ def _art( "errorMessage": None, "sizeBytes": size_bytes, "createdAt": created_at, - "startedAt": None, - "completedAt": None, + "startedAt": started_at, + "completedAt": completed_at, "pushedAt": pushed_at, "teamId": team_id, "ownerType": owner_type or ("team" if team_id else "personal"), @@ -79,102 +82,159 @@ def _art( # --------------------------------------------------------------------------- -# _partition_group +# _partition_group — only the newest row per artifact type survives # --------------------------------------------------------------------------- -def test_partition_healthy_image_has_completed_only(): +def test_partition_keeps_newest_completed_per_type(): arts = [ _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), ] part = _partition_group(arts) - assert part["CONTAINER_IMAGE"].completed is not None - assert part["CONTAINER_IMAGE"].active is None - assert part["CONTAINER_IMAGE"].failed_only is None - assert part["VM_SANDBOX"].completed is not None + assert part["CONTAINER_IMAGE"].latest is not None + assert part["CONTAINER_IMAGE"].latest["status"] == "COMPLETED" + assert part["VM_SANDBOX"].latest is not None + assert part["VM_SANDBOX"].latest["status"] == "COMPLETED" -def test_partition_hides_failed_rows_when_completed_exists(): +def test_partition_newer_completed_beats_older_failed(): arts = [ + _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T21:00:00"), _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T21:00:00"), - _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T20:55:00"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T21:00:00"), + _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T20:55:00"), ] part = _partition_group(arts) - assert part["CONTAINER_IMAGE"].completed is not None - assert part["CONTAINER_IMAGE"].failed_only is None - assert part["VM_SANDBOX"].failed_only is None + assert part["CONTAINER_IMAGE"].latest["status"] == "COMPLETED" -def test_partition_surfaces_failure_when_no_completed(): +def test_partition_fresh_completed_wins_over_stale_building_zombie(): + # The real-world case observed in production: a stale BUILDING row from + # days ago (never reaped) alongside a fresh COMPLETED push. The fresh + # COMPLETED row has a later pushedAt, so it wins — no misleading + # "(rebuilding)" signal. arts = [ - _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T21:00:00"), - _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T21:00:00"), + _art( + "VM_SANDBOX", + "BUILDING", + started_at="2026-04-09 20:52:01", + created_at="2026-04-09T20:52:00", + ), + _art( + "VM_SANDBOX", + "COMPLETED", + pushed_at="2026-04-17T18:21:28", + completed_at="2026-04-17T18:21:28", + created_at="2026-04-17T18:19:01", + ), ] part = _partition_group(arts) - assert part["CONTAINER_IMAGE"].completed is None - assert part["CONTAINER_IMAGE"].failed_only is not None - assert part["CONTAINER_IMAGE"].failed_only["status"] == "FAILED" + assert part["VM_SANDBOX"].latest["status"] == "COMPLETED" -def test_partition_marks_active_rebuild_alongside_completed(): +def test_partition_active_build_wins_over_older_completed(): + # If a genuinely-new build started *after* the last completed push, it's + # the truth and should be reflected in the status. arts = [ _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-16T10:00:00"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + _art( + "CONTAINER_IMAGE", + "BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + ] + part = _partition_group(arts) + assert part["CONTAINER_IMAGE"].latest["status"] == "BUILDING" + + +def test_partition_surfaces_failure_when_only_failed_rows_exist(): + arts = [ + _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T21:00:00"), + _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T21:00:00"), ] part = _partition_group(arts) - assert part["CONTAINER_IMAGE"].completed is not None - assert part["CONTAINER_IMAGE"].active is not None - assert part["CONTAINER_IMAGE"].active["status"] == "BUILDING" + assert part["CONTAINER_IMAGE"].latest["status"] == "FAILED" + assert part["VM_SANDBOX"].latest["status"] == "FAILED" + + +def test_partition_keeps_unknown_status_row_as_latest(): + arts = [_art("CONTAINER_IMAGE", "QUEUED", created_at="2026-04-17T12:00:00")] + part = _partition_group(arts) + assert part["CONTAINER_IMAGE"].latest is not None + assert part["CONTAINER_IMAGE"].latest["status"] == "QUEUED" + + +def test_partition_coerces_null_artifact_type_to_container_image(): + arts = [ + { + "id": "x", + "artifactType": None, + "imageName": "myimage", + "imageTag": "latest", + "status": "COMPLETED", + "pushedAt": "2026-04-17T12:00:00", + "createdAt": "2026-04-17T12:00:00", + "displayRef": f"{USER_ID}/myimage:latest", + } + ] + part = _partition_group(arts) + assert "CONTAINER_IMAGE" in part + assert None not in part + assert part["CONTAINER_IMAGE"].latest is not None + + +def test_partition_coerces_non_string_artifact_type(): + arts = [ + { + "id": "x", + "artifactType": 42, # defensive: server could send a number + "imageName": "myimage", + "imageTag": "latest", + "status": "COMPLETED", + "pushedAt": "2026-04-17T12:00:00", + "createdAt": "2026-04-17T12:00:00", + "displayRef": f"{USER_ID}/myimage:latest", + } + ] + part = _partition_group(arts) + assert "CONTAINER_IMAGE" in part + # ``sorted(partition)`` must not raise on mixed key types. + _render_status_column(part) # --------------------------------------------------------------------------- -# _render_status_slot +# _render_status_slot — renders the raw status of the latest row # --------------------------------------------------------------------------- def test_render_status_slot_ready(): - part = ArtifactPartition(completed={"status": "COMPLETED"}) + part = ArtifactPartition(latest={"status": "COMPLETED"}) assert _render_status_slot(part) == "[green]Ready[/green]" -def test_render_status_slot_rebuilding_replaces_status(): - part = ArtifactPartition( - completed={"status": "COMPLETED"}, - active={"status": "BUILDING"}, - ) - out = _render_status_slot(part) - assert "rebuilding" in out - assert "Ready" not in out - - -def test_render_status_slot_first_time_build(): - part = ArtifactPartition(active={"status": "BUILDING"}) +def test_render_status_slot_building(): + part = ArtifactPartition(latest={"status": "BUILDING"}) assert _render_status_slot(part) == "[yellow]Building[/yellow]" def test_render_status_slot_uploading(): - part = ArtifactPartition(active={"status": "UPLOADING"}) + part = ArtifactPartition(latest={"status": "UPLOADING"}) assert _render_status_slot(part) == "[yellow]Uploading[/yellow]" def test_render_status_slot_pending(): - part = ArtifactPartition(active={"status": "PENDING"}) + part = ArtifactPartition(latest={"status": "PENDING"}) assert _render_status_slot(part) == "[blue]Pending[/blue]" -def test_render_status_slot_failed_only(): - part = ArtifactPartition(failed_only={"status": "FAILED"}) +def test_render_status_slot_failed(): + part = ArtifactPartition(latest={"status": "FAILED"}) assert _render_status_slot(part) == "[red]Failed[/red]" def test_render_status_slot_cancelled(): - part = ArtifactPartition(failed_only={"status": "CANCELLED"}) + part = ArtifactPartition(latest={"status": "CANCELLED"}) assert _render_status_slot(part) == "[dim]Cancelled[/dim]" @@ -186,6 +246,13 @@ def test_render_status_slot_empty_part_returns_dash(): assert _render_status_slot(ArtifactPartition()) == "[dim]—[/dim]" +def test_render_status_slot_unknown_status_renders_dim_title_case(): + # A status outside the recognised enum set (e.g. the backend adds + # ``QUEUED`` later) is surfaced as a dim title-cased label. + part = ArtifactPartition(latest={"status": "QUEUED"}) + assert _render_status_slot(part) == "[dim]Queued[/dim]" + + # --------------------------------------------------------------------------- # _render_type_column # --------------------------------------------------------------------------- @@ -227,7 +294,7 @@ def test_render_status_column_healthy_slots(): def test_render_status_column_partial_failure_aligned(): arts = [ _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T22:00:00"), + _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T22:00:00"), ] text = _render_status_column(_partition_group(arts)) assert text == "[green]Ready[/green] / [red]Failed[/red]" @@ -240,41 +307,62 @@ def test_render_status_column_container_only_legacy(): assert " / " not in text -def test_render_status_column_rebuild_replaces_status(): +def test_render_status_column_stale_zombie_hidden_by_fresh_completed(): + # Mirrors the real nvidia-basic-dev:latest payload: a week-old stuck + # BUILDING VM row + a fresh successful build. The fresh COMPLETED row has + # a newer pushedAt, so only "Ready" is shown — no false "rebuilding". arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-16T10:00:00"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-17T18:21:28"), + _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-17T18:21:28"), + # Old stuck BUILDING row that should be ignored because it's older. + _art( + "VM_SANDBOX", + "BUILDING", + started_at="2026-04-09T20:52:01", + created_at="2026-04-09T20:52:00", + ), + # Recent failed attempts before the successful build. + _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T21:00:00"), + _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T21:00:00"), ] text = _render_status_column(_partition_group(arts)) - assert text.count("rebuilding") == 2 - assert "Ready" not in text + assert text == "[green]Ready[/green] / [green]Ready[/green]" + assert "rebuilding" not in text + assert "Failed" not in text + assert "Building" not in text -def test_render_status_column_rebuild_vm_only(): +def test_render_status_column_active_build_on_top_of_older_completed(): + # Container has been rebuilt recently (BUILDING started *after* the last + # completed push); status should reflect the new build rather than the + # stale Ready. arts = [ _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), + _art( + "CONTAINER_IMAGE", + "BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), + _art( + "VM_SANDBOX", + "BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), ] text = _render_status_column(_partition_group(arts)) - assert text.startswith("[green]Ready[/green]") - assert "rebuilding" in text - assert text.count("rebuilding") == 1 + assert text == "[yellow]Building[/yellow] / [yellow]Building[/yellow]" -def test_render_status_column_stale_failures_hidden_by_completed(): +def test_render_status_column_surfaces_unknown_status_alongside_known(): arts = [ _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), + _art("VM_SANDBOX", "QUEUED", created_at="2026-04-16T22:00:00"), ] - for i in range(5): - arts.append(_art("CONTAINER_IMAGE", "FAILED", created_at=f"2026-04-16T20:{i:02d}:00")) - arts.append(_art("VM_SANDBOX", "FAILED", created_at=f"2026-04-16T20:{i:02d}:00")) text = _render_status_column(_partition_group(arts)) - assert "Failed" not in text - assert text.count("Ready") == 2 + assert text == "[green]Ready[/green] / [dim]Queued[/dim]" # --------------------------------------------------------------------------- @@ -357,22 +445,51 @@ def test_image_ref_column_width_scales_with_available_space(): # --------------------------------------------------------------------------- -def test_completed_size_sums_only_completed_artifacts(): +def test_completed_size_sums_only_latest_completed_rows(): arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", size_bytes=1024 * 1024 * 100), - _art("VM_SANDBOX", "COMPLETED", size_bytes=1024 * 1024 * 200), - _art("CONTAINER_IMAGE", "BUILDING"), - _art("VM_SANDBOX", "FAILED"), + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-16T22:00:00", + size_bytes=1024 * 1024 * 100, + ), + _art( + "VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:00:00", size_bytes=1024 * 1024 * 200 + ), + ] + part = _partition_group(arts) + assert _completed_size_mb(part) == "300.0 MB" + + +def test_completed_size_ignores_older_completed_if_newer_is_not_completed(): + # Size reflects the *current* picture: if the latest CONTAINER_IMAGE row + # is BUILDING (in flight), its size is unknown, so its contribution is 0 + # even though an older COMPLETED row has a size. + arts = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-15T20:00:00", + size_bytes=1024 * 1024 * 100, + ), + _art( + "CONTAINER_IMAGE", + "BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), ] - assert _completed_size_mb(arts) == "300.0 MB" + part = _partition_group(arts) + assert _completed_size_mb(part) == "[dim]—[/dim]" def test_completed_size_returns_dash_when_no_completed(): arts = [ - _art("CONTAINER_IMAGE", "BUILDING"), - _art("VM_SANDBOX", "PENDING"), + _art("CONTAINER_IMAGE", "BUILDING", started_at="2026-04-17T09:00:00"), + _art("VM_SANDBOX", "PENDING", created_at="2026-04-17T08:00:00"), ] - assert _completed_size_mb(arts) == "[dim]—[/dim]" + part = _partition_group(arts) + assert _completed_size_mb(part) == "[dim]—[/dim]" # --------------------------------------------------------------------------- @@ -384,28 +501,64 @@ def test_display_created_uses_latest_pushed_at_when_completed_exists(): arts = [ _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-17T09:00:00"), ] part = _partition_group(arts) - assert _display_created(part, arts) == "2026-04-16 22:24" + assert _display_created(part) == "2026-04-16 22:24" -def test_display_created_falls_back_to_active_when_no_completed(): +def test_display_created_falls_back_to_started_at_for_active_builds(): arts = [ - _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-17T09:00:00"), + _art( + "CONTAINER_IMAGE", + "BUILDING", + started_at="2026-04-17T09:00:00", + created_at="2026-04-17T08:59:00", + ), _art("VM_SANDBOX", "PENDING", created_at="2026-04-17T08:00:00"), ] part = _partition_group(arts) - assert _display_created(part, arts) == "2026-04-17 09:00" + assert _display_created(part) == "2026-04-17 09:00" + + +def test_display_created_falls_back_to_completed_at_for_failures(): + arts = [ + _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T12:00:00"), + _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T13:00:00"), + ] + part = _partition_group(arts) + assert _display_created(part) == "2026-04-16 13:00" + + +def test_display_created_skips_truthy_but_unparseable_pushed_at(): + # Regression for the original bugbot finding: if pushedAt is a malformed + # but truthy string, ``_latest`` skips it and chooses the next parseable + # key. The tuple-returning _latest reuses *its* parsed timestamp so the + # display never silently empties. + arts = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="definitely-not-a-date", + completed_at="2026-04-16T22:24:07", + ) + ] + part = _partition_group(arts) + assert _display_created(part) == "2026-04-16 22:24" -def test_display_created_falls_back_to_any_row_when_only_failed(): +def test_group_sort_key_matches_display_created(): arts = [ - _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T12:00:00"), - _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T13:00:00"), + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="definitely-not-a-date", + completed_at="2026-04-16T22:24:07", + ) ] part = _partition_group(arts) - assert _display_created(part, arts) == "2026-04-16 13:00" + display = _display_created(part) + sort_key = _group_sort_key(part) + assert sort_key.strftime("%Y-%m-%d %H:%M") == display # --------------------------------------------------------------------------- @@ -451,10 +604,13 @@ def test_list_cli_shows_type_and_positional_status(monkeypatch): size_bytes=100 * 1024 * 1024, ), _art( - "VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07", size_bytes=200 * 1024 * 1024 + "VM_SANDBOX", + "COMPLETED", + pushed_at="2026-04-16T22:24:07", + size_bytes=200 * 1024 * 1024, ), - _art("CONTAINER_IMAGE", "FAILED", created_at="2026-04-16T20:00:00"), - _art("VM_SANDBOX", "FAILED", created_at="2026-04-16T20:00:00"), + _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T20:00:00"), + _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T20:00:00"), ] monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) @@ -466,12 +622,52 @@ def test_list_cli_shows_type_and_positional_status(monkeypatch): assert "Ready / Ready" in result.output assert "Container:" not in result.output assert "VM:" not in result.output + # Stale failures are older than the completed rows → hidden. assert "Failed" not in result.output assert f"{USER_ID}/nvidia-basic-dev:latest" in result.output assert "300.0 MB" in result.output -def test_list_cli_shows_rebuilding_marker(monkeypatch): +def test_list_cli_ignores_stale_building_zombie_when_completed_is_newer(monkeypatch): + # Production-observed case: a week-old BUILDING VM row that was never + # reaped sits alongside today's successful build. The CLI must use the + # newer COMPLETED row, not the zombie. + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + _patch_config(monkeypatch, team_id=None) + + payload = [ + _art( + "CONTAINER_IMAGE", + "COMPLETED", + pushed_at="2026-04-17T18:21:28", + size_bytes=992_290_762, + ), + _art( + "VM_SANDBOX", + "COMPLETED", + pushed_at="2026-04-17T18:21:28", + size_bytes=2_585_571_832, + ), + _art( + "VM_SANDBOX", + "BUILDING", + started_at="2026-04-09T20:52:01", + created_at="2026-04-09T20:52:00", + ), + ] + + monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) + + result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + assert result.exit_code == 0, result.output + assert "Ready / Ready" in result.output + assert "rebuilding" not in result.output + assert "Building" not in result.output + + +def test_list_cli_shows_building_when_build_is_newer_than_last_completed(monkeypatch): + # Real rebuild in progress: BUILDING row has a later startedAt than the + # previous successful build's pushedAt, so Building is the current truth. monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) _patch_config(monkeypatch, team_id=None) @@ -483,18 +679,33 @@ def test_list_cli_shows_rebuilding_marker(monkeypatch): size_bytes=50 * 1024 * 1024, ), _art( - "VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52", size_bytes=50 * 1024 * 1024 + "VM_SANDBOX", + "COMPLETED", + pushed_at="2026-04-15T20:57:52", + size_bytes=50 * 1024 * 1024, + ), + _art( + "CONTAINER_IMAGE", + "BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + _art( + "VM_SANDBOX", + "BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", ), - _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-16T10:00:00"), - _art("VM_SANDBOX", "BUILDING", created_at="2026-04-16T10:00:00"), ] monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) result = runner.invoke(app, ["images", "list"], env=TEST_ENV) assert result.exit_code == 0, result.output - assert "rebuilding" in result.output - assert "2026-04-15 20:57" in result.output + assert "Building / Building" in result.output + # Created reflects the newer startedAt of the active build rather than + # the older pushedAt of the previous release. + assert "2026-04-17 10:00" in result.output def test_list_cli_team_listing_keeps_owner_column_and_prefix(monkeypatch): @@ -556,8 +767,18 @@ def test_list_cli_first_time_build_shows_building(monkeypatch): _patch_config(monkeypatch, team_id=None) payload = [ - _art("CONTAINER_IMAGE", "BUILDING", created_at="2026-04-17T09:00:00"), - _art("VM_SANDBOX", "BUILDING", created_at="2026-04-17T09:00:00"), + _art( + "CONTAINER_IMAGE", + "BUILDING", + started_at="2026-04-17T09:00:05", + created_at="2026-04-17T09:00:00", + ), + _art( + "VM_SANDBOX", + "BUILDING", + started_at="2026-04-17T09:00:05", + created_at="2026-04-17T09:00:00", + ), ] monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) From c31cc2a43c58b5239160697c7d9eb40647bff421 Mon Sep 17 00:00:00 2001 From: Damian Barabonkov Date: Sun, 19 Apr 2026 20:55:17 -0400 Subject: [PATCH 3/7] test: Refactor tests to use helper functions for image rows --- packages/prime/tests/test_images_list.py | 906 +++++++++-------------- 1 file changed, 357 insertions(+), 549 deletions(-) diff --git a/packages/prime/tests/test_images_list.py b/packages/prime/tests/test_images_list.py index 2f5125da5..7a6a8e78f 100644 --- a/packages/prime/tests/test_images_list.py +++ b/packages/prime/tests/test_images_list.py @@ -2,8 +2,9 @@ from __future__ import annotations -from typing import Any +from typing import Any, Callable +import pytest from prime_cli.commands import images as images_cmd from prime_cli.commands.images import ( ArtifactPartition, @@ -22,7 +23,8 @@ from prime_cli.main import app from typer.testing import CliRunner -runner: CliRunner = CliRunner() +USER_ID = "cmkrcib4x00004kjyxq48nltd" +TEAM_ID = "team-abc123" TEST_ENV: dict[str, str] = { "COLUMNS": "200", @@ -32,53 +34,49 @@ } -USER_ID: str = "cmkrcib4x00004kjyxq48nltd" -TEAM_ID: str = "team-abc123" - - # --------------------------------------------------------------------------- -# Fixture builders +# Fixtures / helpers # --------------------------------------------------------------------------- def _art( - artifact_type: str, - status: str, + artifact_type: str = "CONTAINER_IMAGE", + status: str = "COMPLETED", *, - image_name: str = "nvidia-basic-dev", - image_tag: str = "latest", + image: str = "nvidia-basic-dev:latest", pushed_at: str | None = None, completed_at: str | None = None, started_at: str | None = None, created_at: str = "2026-04-01T10:00:00", size_bytes: int | None = None, team_id: str | None = None, - owner_type: str | None = None, - display_ref: str | None = None, ) -> ImageRow: - ref: str = ( - display_ref - if display_ref is not None - else (f"{('team-' + team_id) if team_id else USER_ID}/{image_name}:{image_tag}") - ) - row: dict[str, Any] = { - "id": f"id-{artifact_type}-{status}-{created_at}", + """Build a minimal image row carrying only the fields the CLI reads.""" + name, _, tag = image.partition(":") + tag = tag or "latest" + scope = f"team-{team_id}" if team_id else USER_ID + return { "artifactType": artifact_type, - "imageName": image_name, - "imageTag": image_tag, + "imageName": name, + "imageTag": tag, "status": status, - "fullImagePath": f"{USER_ID}/{image_name}:{image_tag}", - "errorMessage": None, "sizeBytes": size_bytes, "createdAt": created_at, "startedAt": started_at, "completedAt": completed_at, "pushedAt": pushed_at, "teamId": team_id, - "ownerType": owner_type or ("team" if team_id else "personal"), - "displayRef": ref, + "ownerType": "team" if team_id else "personal", + "displayRef": f"{scope}/{name}:{tag}", } - return row + + +def _container(**kw: Any) -> ImageRow: + return _art("CONTAINER_IMAGE", **kw) + + +def _vm(**kw: Any) -> ImageRow: + return _art("VM_SANDBOX", **kw) # --------------------------------------------------------------------------- @@ -87,118 +85,80 @@ def _art( def test_partition_keeps_newest_completed_per_type(): - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - ] - part = _partition_group(arts) - assert part["CONTAINER_IMAGE"].latest is not None + part = _partition_group( + [ + _container(pushed_at="2026-04-16T22:24:07"), + _vm(pushed_at="2026-04-16T22:24:07"), + ] + ) assert part["CONTAINER_IMAGE"].latest["status"] == "COMPLETED" - assert part["VM_SANDBOX"].latest is not None assert part["VM_SANDBOX"].latest["status"] == "COMPLETED" def test_partition_newer_completed_beats_older_failed(): - arts = [ - _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T21:00:00"), - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T20:55:00"), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(status="FAILED", completed_at="2026-04-16T21:00:00"), + _container(pushed_at="2026-04-16T22:24:07"), + _container(status="FAILED", completed_at="2026-04-16T20:55:00"), + ] + ) assert part["CONTAINER_IMAGE"].latest["status"] == "COMPLETED" def test_partition_fresh_completed_wins_over_stale_building_zombie(): - # The real-world case observed in production: a stale BUILDING row from - # days ago (never reaped) alongside a fresh COMPLETED push. The fresh - # COMPLETED row has a later pushedAt, so it wins — no misleading - # "(rebuilding)" signal. - arts = [ - _art( - "VM_SANDBOX", - "BUILDING", - started_at="2026-04-09 20:52:01", - created_at="2026-04-09T20:52:00", - ), - _art( - "VM_SANDBOX", - "COMPLETED", - pushed_at="2026-04-17T18:21:28", - completed_at="2026-04-17T18:21:28", - created_at="2026-04-17T18:19:01", - ), - ] - part = _partition_group(arts) + # Real-world case: a stale BUILDING row from 8 days ago (never reaped) + # alongside a fresh COMPLETED push. The fresher COMPLETED row wins. + part = _partition_group( + [ + _vm( + status="BUILDING", + started_at="2026-04-09T20:52:01", + created_at="2026-04-09T20:52:00", + ), + _vm(pushed_at="2026-04-17T18:21:28", completed_at="2026-04-17T18:21:28"), + ] + ) assert part["VM_SANDBOX"].latest["status"] == "COMPLETED" def test_partition_active_build_wins_over_older_completed(): - # If a genuinely-new build started *after* the last completed push, it's - # the truth and should be reflected in the status. - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art( - "CONTAINER_IMAGE", - "BUILDING", - started_at="2026-04-17T10:00:05", - created_at="2026-04-17T10:00:00", - ), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(pushed_at="2026-04-15T20:57:52"), + _container( + status="BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + ] + ) assert part["CONTAINER_IMAGE"].latest["status"] == "BUILDING" def test_partition_surfaces_failure_when_only_failed_rows_exist(): - arts = [ - _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T21:00:00"), - _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T21:00:00"), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(status="FAILED", completed_at="2026-04-16T21:00:00"), + _vm(status="FAILED", completed_at="2026-04-16T21:00:00"), + ] + ) assert part["CONTAINER_IMAGE"].latest["status"] == "FAILED" assert part["VM_SANDBOX"].latest["status"] == "FAILED" def test_partition_keeps_unknown_status_row_as_latest(): - arts = [_art("CONTAINER_IMAGE", "QUEUED", created_at="2026-04-17T12:00:00")] - part = _partition_group(arts) - assert part["CONTAINER_IMAGE"].latest is not None + part = _partition_group([_container(status="QUEUED", created_at="2026-04-17T12:00:00")]) assert part["CONTAINER_IMAGE"].latest["status"] == "QUEUED" -def test_partition_coerces_null_artifact_type_to_container_image(): - arts = [ - { - "id": "x", - "artifactType": None, - "imageName": "myimage", - "imageTag": "latest", - "status": "COMPLETED", - "pushedAt": "2026-04-17T12:00:00", - "createdAt": "2026-04-17T12:00:00", - "displayRef": f"{USER_ID}/myimage:latest", - } - ] - part = _partition_group(arts) - assert "CONTAINER_IMAGE" in part - assert None not in part - assert part["CONTAINER_IMAGE"].latest is not None - - -def test_partition_coerces_non_string_artifact_type(): - arts = [ - { - "id": "x", - "artifactType": 42, # defensive: server could send a number - "imageName": "myimage", - "imageTag": "latest", - "status": "COMPLETED", - "pushedAt": "2026-04-17T12:00:00", - "createdAt": "2026-04-17T12:00:00", - "displayRef": f"{USER_ID}/myimage:latest", - } - ] - part = _partition_group(arts) +@pytest.mark.parametrize("bad_type", [None, 42, ""]) +def test_partition_coerces_non_string_artifact_type(bad_type): + row = _container(pushed_at="2026-04-17T12:00:00") + row["artifactType"] = bad_type + part = _partition_group([row]) assert "CONTAINER_IMAGE" in part + assert bad_type not in part # ``sorted(partition)`` must not raise on mixed key types. _render_status_column(part) @@ -208,49 +168,26 @@ def test_partition_coerces_non_string_artifact_type(): # --------------------------------------------------------------------------- -def test_render_status_slot_ready(): - part = ArtifactPartition(latest={"status": "COMPLETED"}) - assert _render_status_slot(part) == "[green]Ready[/green]" - - -def test_render_status_slot_building(): - part = ArtifactPartition(latest={"status": "BUILDING"}) - assert _render_status_slot(part) == "[yellow]Building[/yellow]" - - -def test_render_status_slot_uploading(): - part = ArtifactPartition(latest={"status": "UPLOADING"}) - assert _render_status_slot(part) == "[yellow]Uploading[/yellow]" - - -def test_render_status_slot_pending(): - part = ArtifactPartition(latest={"status": "PENDING"}) - assert _render_status_slot(part) == "[blue]Pending[/blue]" - - -def test_render_status_slot_failed(): - part = ArtifactPartition(latest={"status": "FAILED"}) - assert _render_status_slot(part) == "[red]Failed[/red]" - - -def test_render_status_slot_cancelled(): - part = ArtifactPartition(latest={"status": "CANCELLED"}) - assert _render_status_slot(part) == "[dim]Cancelled[/dim]" - - -def test_render_status_slot_none_part_returns_dash(): - assert _render_status_slot(None) == "[dim]—[/dim]" - - -def test_render_status_slot_empty_part_returns_dash(): - assert _render_status_slot(ArtifactPartition()) == "[dim]—[/dim]" +@pytest.mark.parametrize( + "status,expected", + [ + ("COMPLETED", "[green]Ready[/green]"), + ("BUILDING", "[yellow]Building[/yellow]"), + ("UPLOADING", "[yellow]Uploading[/yellow]"), + ("PENDING", "[blue]Pending[/blue]"), + ("FAILED", "[red]Failed[/red]"), + ("CANCELLED", "[dim]Cancelled[/dim]"), + # Unknown / future backend statuses fall back to dim title-case. + ("QUEUED", "[dim]Queued[/dim]"), + ], +) +def test_render_status_slot_known_and_unknown(status, expected): + assert _render_status_slot(ArtifactPartition(latest={"status": status})) == expected -def test_render_status_slot_unknown_status_renders_dim_title_case(): - # A status outside the recognised enum set (e.g. the backend adds - # ``QUEUED`` later) is surfaced as a dim title-cased label. - part = ArtifactPartition(latest={"status": "QUEUED"}) - assert _render_status_slot(part) == "[dim]Queued[/dim]" +@pytest.mark.parametrize("empty", [None, ArtifactPartition()]) +def test_render_status_slot_empty_returns_dash(empty): + assert _render_status_slot(empty) == "[dim]—[/dim]" # --------------------------------------------------------------------------- @@ -259,19 +196,17 @@ def test_render_status_slot_unknown_status_renders_dim_title_case(): def test_render_type_column_container_and_vm(): - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - ] - text = _render_type_column(_partition_group(arts)) + text = _render_type_column( + _partition_group( + [_container(pushed_at="2026-04-16T22:24:07"), _vm(pushed_at="2026-04-16T22:24:07")] + ) + ) assert "Container" in text and "VM" in text - assert " / " in text assert text.index("Container") < text.index("VM") def test_render_type_column_container_only_legacy(): - arts = [_art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07")] - text = _render_type_column(_partition_group(arts)) + text = _render_type_column(_partition_group([_container(pushed_at="2026-04-16T22:24:07")])) assert "Container" in text assert "VM" not in text assert " / " not in text @@ -283,85 +218,85 @@ def test_render_type_column_container_only_legacy(): def test_render_status_column_healthy_slots(): - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - ] - text = _render_status_column(_partition_group(arts)) + text = _render_status_column( + _partition_group( + [_container(pushed_at="2026-04-16T22:24:07"), _vm(pushed_at="2026-04-16T22:24:07")] + ) + ) assert text == "[green]Ready[/green] / [green]Ready[/green]" def test_render_status_column_partial_failure_aligned(): - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T22:00:00"), - ] - text = _render_status_column(_partition_group(arts)) + text = _render_status_column( + _partition_group( + [ + _container(pushed_at="2026-04-16T22:24:07"), + _vm(status="FAILED", completed_at="2026-04-16T22:00:00"), + ] + ) + ) assert text == "[green]Ready[/green] / [red]Failed[/red]" def test_render_status_column_container_only_legacy(): - arts = [_art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07")] - text = _render_status_column(_partition_group(arts)) + text = _render_status_column(_partition_group([_container(pushed_at="2026-04-16T22:24:07")])) assert text == "[green]Ready[/green]" assert " / " not in text def test_render_status_column_stale_zombie_hidden_by_fresh_completed(): # Mirrors the real nvidia-basic-dev:latest payload: a week-old stuck - # BUILDING VM row + a fresh successful build. The fresh COMPLETED row has - # a newer pushedAt, so only "Ready" is shown — no false "rebuilding". - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-17T18:21:28"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-17T18:21:28"), - # Old stuck BUILDING row that should be ignored because it's older. - _art( - "VM_SANDBOX", - "BUILDING", - started_at="2026-04-09T20:52:01", - created_at="2026-04-09T20:52:00", - ), - # Recent failed attempts before the successful build. - _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T21:00:00"), - _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T21:00:00"), - ] - text = _render_status_column(_partition_group(arts)) + # BUILDING VM row + a fresh successful build + recent failures. Only the + # newest row per type is rendered, so only "Ready / Ready" is shown. + text = _render_status_column( + _partition_group( + [ + _container(pushed_at="2026-04-17T18:21:28"), + _vm(pushed_at="2026-04-17T18:21:28"), + _vm( + status="BUILDING", + started_at="2026-04-09T20:52:01", + created_at="2026-04-09T20:52:00", + ), + _container(status="FAILED", completed_at="2026-04-16T21:00:00"), + _vm(status="FAILED", completed_at="2026-04-16T21:00:00"), + ] + ) + ) assert text == "[green]Ready[/green] / [green]Ready[/green]" - assert "rebuilding" not in text - assert "Failed" not in text - assert "Building" not in text def test_render_status_column_active_build_on_top_of_older_completed(): - # Container has been rebuilt recently (BUILDING started *after* the last - # completed push); status should reflect the new build rather than the - # stale Ready. - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art( - "CONTAINER_IMAGE", - "BUILDING", - started_at="2026-04-17T10:00:05", - created_at="2026-04-17T10:00:00", - ), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art( - "VM_SANDBOX", - "BUILDING", - started_at="2026-04-17T10:00:05", - created_at="2026-04-17T10:00:00", - ), - ] - text = _render_status_column(_partition_group(arts)) + text = _render_status_column( + _partition_group( + [ + _container(pushed_at="2026-04-15T20:57:52"), + _container( + status="BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + _vm(pushed_at="2026-04-15T20:57:52"), + _vm( + status="BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + ] + ) + ) assert text == "[yellow]Building[/yellow] / [yellow]Building[/yellow]" def test_render_status_column_surfaces_unknown_status_alongside_known(): - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - _art("VM_SANDBOX", "QUEUED", created_at="2026-04-16T22:00:00"), - ] - text = _render_status_column(_partition_group(arts)) + text = _render_status_column( + _partition_group( + [ + _container(pushed_at="2026-04-16T22:24:07"), + _vm(status="QUEUED", created_at="2026-04-16T22:00:00"), + ] + ) + ) assert text == "[green]Ready[/green] / [dim]Queued[/dim]" @@ -371,24 +306,24 @@ def test_render_status_column_surfaces_unknown_status_alongside_known(): def test_render_image_reference_always_shows_user_prefix_for_personal(): - img = _art("CONTAINER_IMAGE", "COMPLETED", image_name="myapp", image_tag="v1") - assert _render_image_reference(img, is_team_listing=False) == f"{USER_ID}/myapp:v1" + assert ( + _render_image_reference(_container(image="myapp:v1"), is_team_listing=False) + == f"{USER_ID}/myapp:v1" + ) def test_render_image_reference_keeps_team_prefix_for_team_listing(): - img = _art( - "CONTAINER_IMAGE", - "COMPLETED", - team_id=TEAM_ID, - image_name="myapp", - image_tag="v1", + assert ( + _render_image_reference(_container(image="myapp:v1", team_id=TEAM_ID), is_team_listing=True) + == f"team-{TEAM_ID}/myapp:v1" ) - assert _render_image_reference(img, is_team_listing=True) == f"team-{TEAM_ID}/myapp:v1" def test_render_image_reference_falls_back_without_display_ref(): - img = {"imageName": "legacy", "imageTag": "v2"} - assert _render_image_reference(img, is_team_listing=False) == "legacy:v2" + assert ( + _render_image_reference({"imageName": "legacy", "imageTag": "v2"}, is_team_listing=False) + == "legacy:v2" + ) # --------------------------------------------------------------------------- @@ -408,15 +343,14 @@ def test_truncate_ref_left_clips_head_and_preserves_name_tag(): assert len(out) == 30 -def test_truncate_ref_left_handles_zero_or_negative_budget(): +@pytest.mark.parametrize("budget", [0, -5]) +def test_truncate_ref_left_handles_zero_or_negative_budget(budget): ref = f"{USER_ID}/x:y" - assert _truncate_ref_left(ref, 0) == ref - assert _truncate_ref_left(ref, -5) == ref + assert _truncate_ref_left(ref, budget) == ref def test_truncate_ref_left_clamps_tiny_budget_to_two(): - out = _truncate_ref_left("abcdefghij", 1) - assert out == "…j" + assert _truncate_ref_left("abcdefghij", 1) == "…j" # --------------------------------------------------------------------------- @@ -432,12 +366,10 @@ def test_image_ref_column_width_narrow_terminal_clamped_to_floor(): assert _image_ref_column_width(60, is_team_listing=False) == 30 -def test_image_ref_column_width_scales_with_available_space(): +def test_image_ref_column_width_team_shrinks_ref_budget(): w_personal = _image_ref_column_width(140, is_team_listing=False) w_team = _image_ref_column_width(140, is_team_listing=True) - assert w_team < w_personal - assert 20 <= w_team <= 80 - assert 20 <= w_personal <= 80 + assert 20 <= w_team < w_personal <= 80 # --------------------------------------------------------------------------- @@ -446,119 +378,98 @@ def test_image_ref_column_width_scales_with_available_space(): def test_completed_size_sums_only_latest_completed_rows(): - arts = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-16T22:00:00", - size_bytes=1024 * 1024 * 100, - ), - _art( - "VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:00:00", size_bytes=1024 * 1024 * 200 - ), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(pushed_at="2026-04-16T22:00:00", size_bytes=100 * 1024 * 1024), + _vm(pushed_at="2026-04-16T22:00:00", size_bytes=200 * 1024 * 1024), + ] + ) assert _completed_size_mb(part) == "300.0 MB" def test_completed_size_ignores_older_completed_if_newer_is_not_completed(): - # Size reflects the *current* picture: if the latest CONTAINER_IMAGE row - # is BUILDING (in flight), its size is unknown, so its contribution is 0 - # even though an older COMPLETED row has a size. - arts = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-15T20:00:00", - size_bytes=1024 * 1024 * 100, - ), - _art( - "CONTAINER_IMAGE", - "BUILDING", - started_at="2026-04-17T10:00:05", - created_at="2026-04-17T10:00:00", - ), - ] - part = _partition_group(arts) + # If the latest row is BUILDING, that type contributes 0 — even if an + # older COMPLETED row has a size. Size reflects the *current* picture. + part = _partition_group( + [ + _container(pushed_at="2026-04-15T20:00:00", size_bytes=100 * 1024 * 1024), + _container( + status="BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + ] + ) assert _completed_size_mb(part) == "[dim]—[/dim]" def test_completed_size_returns_dash_when_no_completed(): - arts = [ - _art("CONTAINER_IMAGE", "BUILDING", started_at="2026-04-17T09:00:00"), - _art("VM_SANDBOX", "PENDING", created_at="2026-04-17T08:00:00"), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(status="BUILDING", started_at="2026-04-17T09:00:00"), + _vm(status="PENDING", created_at="2026-04-17T08:00:00"), + ] + ) assert _completed_size_mb(part) == "[dim]—[/dim]" # --------------------------------------------------------------------------- -# _display_created +# _display_created / _group_sort_key # --------------------------------------------------------------------------- def test_display_created_uses_latest_pushed_at_when_completed_exists(): - arts = [ - _art("CONTAINER_IMAGE", "COMPLETED", pushed_at="2026-04-15T20:57:52"), - _art("VM_SANDBOX", "COMPLETED", pushed_at="2026-04-16T22:24:07"), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(pushed_at="2026-04-15T20:57:52"), + _vm(pushed_at="2026-04-16T22:24:07"), + ] + ) assert _display_created(part) == "2026-04-16 22:24" def test_display_created_falls_back_to_started_at_for_active_builds(): - arts = [ - _art( - "CONTAINER_IMAGE", - "BUILDING", - started_at="2026-04-17T09:00:00", - created_at="2026-04-17T08:59:00", - ), - _art("VM_SANDBOX", "PENDING", created_at="2026-04-17T08:00:00"), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container( + status="BUILDING", + started_at="2026-04-17T09:00:00", + created_at="2026-04-17T08:59:00", + ), + _vm(status="PENDING", created_at="2026-04-17T08:00:00"), + ] + ) assert _display_created(part) == "2026-04-17 09:00" def test_display_created_falls_back_to_completed_at_for_failures(): - arts = [ - _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T12:00:00"), - _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T13:00:00"), - ] - part = _partition_group(arts) + part = _partition_group( + [ + _container(status="FAILED", completed_at="2026-04-16T12:00:00"), + _vm(status="FAILED", completed_at="2026-04-16T13:00:00"), + ] + ) assert _display_created(part) == "2026-04-16 13:00" def test_display_created_skips_truthy_but_unparseable_pushed_at(): - # Regression for the original bugbot finding: if pushedAt is a malformed - # but truthy string, ``_latest`` skips it and chooses the next parseable - # key. The tuple-returning _latest reuses *its* parsed timestamp so the - # display never silently empties. - arts = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="definitely-not-a-date", - completed_at="2026-04-16T22:24:07", - ) - ] - part = _partition_group(arts) + # Regression for the bugbot finding: a truthy-but-malformed pushedAt must + # not drop the Created column; _latest reuses its own parsed timestamp. + part = _partition_group( + [ + _container(pushed_at="definitely-not-a-date", completed_at="2026-04-16T22:24:07"), + ] + ) assert _display_created(part) == "2026-04-16 22:24" def test_group_sort_key_matches_display_created(): - arts = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="definitely-not-a-date", - completed_at="2026-04-16T22:24:07", - ) - ] - part = _partition_group(arts) - display = _display_created(part) - sort_key = _group_sort_key(part) - assert sort_key.strftime("%Y-%m-%d %H:%M") == display + part = _partition_group( + [ + _container(pushed_at="definitely-not-a-date", completed_at="2026-04-16T22:24:07"), + ] + ) + assert _group_sort_key(part).strftime("%Y-%m-%d %H:%M") == _display_created(part) # --------------------------------------------------------------------------- @@ -566,278 +477,175 @@ def test_group_sort_key_matches_display_created(): # --------------------------------------------------------------------------- -def _build_dummy_client(payload: list[ImageRow]) -> type: - class DummyAPIClient: - def request( - self, - method: str, - path: str, - json: Any = None, - params: dict[str, Any] | None = None, - ) -> dict[str, Any]: - assert method == "GET" - assert path == "/images" - return {"data": payload} - - return DummyAPIClient - - class _StubConfig: def __init__(self, team_id: str | None = None) -> None: - self.team_id: str | None = team_id - - -def _patch_config(monkeypatch: Any, team_id: str | None = None) -> None: - """Replace the module-level ``config`` with a stub.""" - monkeypatch.setattr(images_cmd, "config", _StubConfig(team_id=team_id)) - - -def test_list_cli_shows_type_and_positional_status(monkeypatch): - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - payload = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - size_bytes=100 * 1024 * 1024, - ), - _art( - "VM_SANDBOX", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - size_bytes=200 * 1024 * 1024, - ), - _art("CONTAINER_IMAGE", "FAILED", completed_at="2026-04-16T20:00:00"), - _art("VM_SANDBOX", "FAILED", completed_at="2026-04-16T20:00:00"), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) + self.team_id = team_id + + +@pytest.fixture +def run_images_list(monkeypatch) -> Callable[..., Any]: + """Return a callable that invokes ``prime images list`` against a + mocked API, with the module-level ``config`` stubbed and the update + check disabled.""" + runner = CliRunner() + + def _run( + payload: list[ImageRow], + *, + team_id: str | None = None, + env: dict[str, str] | None = None, + ): + class DummyAPIClient: + def request(self, method, path, json=None, params=None): + assert method == "GET" + assert path == "/images" + return {"data": payload} + + monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) + monkeypatch.setattr(images_cmd, "config", _StubConfig(team_id=team_id)) + monkeypatch.setattr("prime_cli.commands.images.APIClient", DummyAPIClient) + return runner.invoke(app, ["images", "list"], env=env or TEST_ENV) + + return _run + + +def test_list_cli_shows_type_and_positional_status(run_images_list): + result = run_images_list( + [ + _container(pushed_at="2026-04-16T22:24:07", size_bytes=100 * 1024 * 1024), + _vm(pushed_at="2026-04-16T22:24:07", size_bytes=200 * 1024 * 1024), + _container(status="FAILED", completed_at="2026-04-16T20:00:00"), + _vm(status="FAILED", completed_at="2026-04-16T20:00:00"), + ] + ) assert result.exit_code == 0, result.output - assert "Container / VM" in result.output assert "Ready / Ready" in result.output - assert "Container:" not in result.output - assert "VM:" not in result.output - # Stale failures are older than the completed rows → hidden. - assert "Failed" not in result.output + assert "Failed" not in result.output # stale, hidden by newer COMPLETED assert f"{USER_ID}/nvidia-basic-dev:latest" in result.output assert "300.0 MB" in result.output -def test_list_cli_ignores_stale_building_zombie_when_completed_is_newer(monkeypatch): - # Production-observed case: a week-old BUILDING VM row that was never - # reaped sits alongside today's successful build. The CLI must use the - # newer COMPLETED row, not the zombie. - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - payload = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-17T18:21:28", - size_bytes=992_290_762, - ), - _art( - "VM_SANDBOX", - "COMPLETED", - pushed_at="2026-04-17T18:21:28", - size_bytes=2_585_571_832, - ), - _art( - "VM_SANDBOX", - "BUILDING", - started_at="2026-04-09T20:52:01", - created_at="2026-04-09T20:52:00", - ), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) +def test_list_cli_ignores_stale_building_zombie_when_completed_is_newer(run_images_list): + result = run_images_list( + [ + _container(pushed_at="2026-04-17T18:21:28", size_bytes=992_290_762), + _vm(pushed_at="2026-04-17T18:21:28", size_bytes=2_585_571_832), + # The week-old stuck BUILDING row (as seen in production DB). + _vm( + status="BUILDING", + started_at="2026-04-09T20:52:01", + created_at="2026-04-09T20:52:00", + ), + ] + ) assert result.exit_code == 0, result.output assert "Ready / Ready" in result.output - assert "rebuilding" not in result.output assert "Building" not in result.output -def test_list_cli_shows_building_when_build_is_newer_than_last_completed(monkeypatch): - # Real rebuild in progress: BUILDING row has a later startedAt than the - # previous successful build's pushedAt, so Building is the current truth. - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - payload = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-15T20:57:52", - size_bytes=50 * 1024 * 1024, - ), - _art( - "VM_SANDBOX", - "COMPLETED", - pushed_at="2026-04-15T20:57:52", - size_bytes=50 * 1024 * 1024, - ), - _art( - "CONTAINER_IMAGE", - "BUILDING", - started_at="2026-04-17T10:00:05", - created_at="2026-04-17T10:00:00", - ), - _art( - "VM_SANDBOX", - "BUILDING", - started_at="2026-04-17T10:00:05", - created_at="2026-04-17T10:00:00", - ), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) +def test_list_cli_shows_building_when_build_is_newer_than_last_completed(run_images_list): + result = run_images_list( + [ + _container(pushed_at="2026-04-15T20:57:52", size_bytes=50 * 1024 * 1024), + _vm(pushed_at="2026-04-15T20:57:52", size_bytes=50 * 1024 * 1024), + _container( + status="BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + _vm( + status="BUILDING", + started_at="2026-04-17T10:00:05", + created_at="2026-04-17T10:00:00", + ), + ] + ) assert result.exit_code == 0, result.output assert "Building / Building" in result.output - # Created reflects the newer startedAt of the active build rather than - # the older pushedAt of the previous release. assert "2026-04-17 10:00" in result.output -def test_list_cli_team_listing_keeps_owner_column_and_prefix(monkeypatch): - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=TEAM_ID) - - payload = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - size_bytes=10 * 1024 * 1024, - team_id=TEAM_ID, - image_name="teamapp", - ), - _art( - "VM_SANDBOX", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - size_bytes=20 * 1024 * 1024, - team_id=TEAM_ID, - image_name="teamapp", - ), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) +def test_list_cli_team_listing_keeps_owner_column_and_prefix(run_images_list): + result = run_images_list( + [ + _container( + image="teamapp:latest", + team_id=TEAM_ID, + pushed_at="2026-04-16T22:24:07", + size_bytes=10 * 1024 * 1024, + ), + _vm( + image="teamapp:latest", + team_id=TEAM_ID, + pushed_at="2026-04-16T22:24:07", + size_bytes=20 * 1024 * 1024, + ), + ], + team_id=TEAM_ID, + ) assert result.exit_code == 0, result.output assert "Owner" in result.output assert f"team-{TEAM_ID}/teamapp:latest" in result.output -def test_list_cli_container_only_legacy_image(monkeypatch): - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - payload = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - size_bytes=10 * 1024 * 1024, - image_name="legacy", - ), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) +def test_list_cli_container_only_legacy_image(run_images_list): + result = run_images_list( + [ + _container( + image="legacy:latest", pushed_at="2026-04-16T22:24:07", size_bytes=10 * 1024 * 1024 + ), + ] + ) assert result.exit_code == 0, result.output assert "Container" in result.output assert "Ready" in result.output assert "VM" not in result.output -def test_list_cli_first_time_build_shows_building(monkeypatch): - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - payload = [ - _art( - "CONTAINER_IMAGE", - "BUILDING", - started_at="2026-04-17T09:00:05", - created_at="2026-04-17T09:00:00", - ), - _art( - "VM_SANDBOX", - "BUILDING", - started_at="2026-04-17T09:00:05", - created_at="2026-04-17T09:00:00", - ), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) +def test_list_cli_first_time_build_shows_building(run_images_list): + result = run_images_list( + [ + _container( + status="BUILDING", + started_at="2026-04-17T09:00:05", + created_at="2026-04-17T09:00:00", + ), + _vm( + status="BUILDING", + started_at="2026-04-17T09:00:05", + created_at="2026-04-17T09:00:00", + ), + ] + ) assert result.exit_code == 0, result.output assert "Container / VM" in result.output assert "Building / Building" in result.output - assert "Container:" not in result.output assert "—" in result.output -def test_list_cli_truncates_owner_prefix_on_narrow_terminal(monkeypatch): - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - payload = [ - _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - size_bytes=1024 * 1024, - image_name="nvidia-basic-dev", - ), - ] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - narrow_env = dict(TEST_ENV, COLUMNS="90") - result = runner.invoke(app, ["images", "list"], env=narrow_env) +def test_list_cli_truncates_owner_prefix_on_narrow_terminal(run_images_list): + result = run_images_list( + [_container(pushed_at="2026-04-16T22:24:07", size_bytes=1024 * 1024)], + env=dict(TEST_ENV, COLUMNS="90"), + ) assert result.exit_code == 0, result.output assert "nvidia-basic-dev:latest" in result.output assert "…" in result.output assert f"{USER_ID}/nvidia-basic-dev" not in result.output -def test_list_cli_newest_group_first(monkeypatch): - monkeypatch.setattr("prime_cli.main.check_for_update", lambda: (False, None)) - _patch_config(monkeypatch, team_id=None) - - older = _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-03-24T20:11:46", - image_name="old-image", - size_bytes=1024 * 1024, - ) - newer = _art( - "CONTAINER_IMAGE", - "COMPLETED", - pushed_at="2026-04-16T22:24:07", - image_name="new-image", - size_bytes=1024 * 1024, +def test_list_cli_newest_group_first(run_images_list): + result = run_images_list( + [ + _container( + image="old-image:latest", pushed_at="2026-03-24T20:11:46", size_bytes=1024 * 1024 + ), + _container( + image="new-image:latest", pushed_at="2026-04-16T22:24:07", size_bytes=1024 * 1024 + ), + ] ) - payload = [older, newer] - - monkeypatch.setattr("prime_cli.commands.images.APIClient", _build_dummy_client(payload)) - - result = runner.invoke(app, ["images", "list"], env=TEST_ENV) assert result.exit_code == 0, result.output new_idx = result.output.find("new-image") old_idx = result.output.find("old-image") From 43bd41a5ec9e5ac8662d729b20a9a29752d05715 Mon Sep 17 00:00:00 2001 From: Damian Barabonkov Date: Sun, 19 Apr 2026 22:16:51 -0400 Subject: [PATCH 4/7] fix: mirror CLI VM sandbox guards in SDK --- .../src/prime_sandboxes/sandbox.py | 48 +++ .../prime-sandboxes/tests/test_vm_guards.py | 377 ++++++++++++++++++ 2 files changed, 425 insertions(+) create mode 100644 packages/prime-sandboxes/tests/test_vm_guards.py diff --git a/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py b/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py index 19591aeab..b90367fde 100644 --- a/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py +++ b/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py @@ -541,6 +541,25 @@ def clear_auth_cache(self) -> None: """Clear all cached auth tokens""" self._auth_cache.clear() + def is_vm(self, sandbox_id: str) -> bool: + """Return True if the sandbox is VM-backed. + + Uses the internal auth cache when available and falls back to a + ``GET /sandbox/`` lookup on a cold cache. The result is cached + alongside the auth token so subsequent calls are essentially free. + """ + return self._auth_cache.is_vm(sandbox_id) + + def _guard_vm_unsupported(self, sandbox_id: str, feature_name: str) -> None: + """Raise APIError if the operation is not supported on VM sandboxes. + + Mirrors the CLI behavior of short-circuiting operations the backend + does not currently support for VM-backed sandboxes, so callers fail + fast with a clear message instead of an opaque gateway error. + """ + if self._auth_cache.is_vm(sandbox_id): + raise APIError(f"{feature_name} is not yet supported for VM sandboxes.") + def create(self, request: CreateSandboxRequest) -> Sandbox: """Create a new sandbox""" # Auto-populate team_id from config if not specified @@ -1229,6 +1248,7 @@ def expose( protocol: str = "HTTP", ) -> ExposedPort: """Expose a port from a sandbox.""" + self._guard_vm_unsupported(sandbox_id, "Port exposure") request = ExposePortRequest(port=port, name=name, protocol=protocol) response = self.client.request( "POST", @@ -1239,10 +1259,12 @@ def expose( def unexpose(self, sandbox_id: str, exposure_id: str) -> None: """Unexpose a port from a sandbox.""" + self._guard_vm_unsupported(sandbox_id, "Port unexpose") self.client.request("DELETE", f"/sandbox/{sandbox_id}/expose/{exposure_id}") def list_exposed_ports(self, sandbox_id: str) -> ListExposedPortsResponse: """List all exposed ports for a sandbox""" + self._guard_vm_unsupported(sandbox_id, "Port listing") response = self.client.request("GET", f"/sandbox/{sandbox_id}/expose") return ListExposedPortsResponse.model_validate(response) @@ -1257,6 +1279,7 @@ def create_ssh_session( ttl_seconds: Optional[int] = None, ) -> SSHSession: """Create an SSH session""" + self._guard_vm_unsupported(sandbox_id, "SSH") payload: Dict[str, Any] = {} if ttl_seconds is not None: payload["ttl_seconds"] = ttl_seconds @@ -1269,6 +1292,7 @@ def create_ssh_session( def close_ssh_session(self, sandbox_id: str, session_id: str) -> None: """Close an SSH session and remove its exposure""" + self._guard_vm_unsupported(sandbox_id, "SSH") self.client.request("DELETE", f"/sandbox/{sandbox_id}/ssh-session/{session_id}") @@ -1394,6 +1418,25 @@ async def clear_auth_cache(self) -> None: """Clear all cached auth tokens.""" await self._auth_cache.clear() + async def is_vm(self, sandbox_id: str) -> bool: + """Return True if the sandbox is VM-backed. + + Uses the internal auth cache when available and falls back to a + ``GET /sandbox/`` lookup on a cold cache. The result is cached + alongside the auth token so subsequent calls are essentially free. + """ + return await self._auth_cache.is_vm(sandbox_id) + + async def _guard_vm_unsupported(self, sandbox_id: str, feature_name: str) -> None: + """Raise APIError if the operation is not supported on VM sandboxes. + + Mirrors the CLI behavior of short-circuiting operations the backend + does not currently support for VM-backed sandboxes, so callers fail + fast with a clear message instead of an opaque gateway error. + """ + if await self._auth_cache.is_vm(sandbox_id): + raise APIError(f"{feature_name} is not yet supported for VM sandboxes.") + async def create(self, request: CreateSandboxRequest) -> Sandbox: """Create a new sandbox""" if request.team_id is None: @@ -2110,6 +2153,7 @@ async def expose( protocol: str = "HTTP", ) -> ExposedPort: """Expose a port from a sandbox.""" + await self._guard_vm_unsupported(sandbox_id, "Port exposure") request = ExposePortRequest(port=port, name=name, protocol=protocol) response = await self.client.request( "POST", @@ -2120,10 +2164,12 @@ async def expose( async def unexpose(self, sandbox_id: str, exposure_id: str) -> None: """Unexpose a port from a sandbox.""" + await self._guard_vm_unsupported(sandbox_id, "Port unexpose") await self.client.request("DELETE", f"/sandbox/{sandbox_id}/expose/{exposure_id}") async def list_exposed_ports(self, sandbox_id: str) -> ListExposedPortsResponse: """List all exposed ports for a sandbox""" + await self._guard_vm_unsupported(sandbox_id, "Port listing") response = await self.client.request("GET", f"/sandbox/{sandbox_id}/expose") return ListExposedPortsResponse.model_validate(response) @@ -2138,6 +2184,7 @@ async def create_ssh_session( ttl_seconds: Optional[int] = None, ) -> SSHSession: """Create an SSH session""" + await self._guard_vm_unsupported(sandbox_id, "SSH") payload: Dict[str, Any] = {} if ttl_seconds is not None: payload["ttl_seconds"] = ttl_seconds @@ -2150,6 +2197,7 @@ async def create_ssh_session( async def close_ssh_session(self, sandbox_id: str, session_id: str) -> None: """Close an SSH session and remove its exposure""" + await self._guard_vm_unsupported(sandbox_id, "SSH") await self.client.request("DELETE", f"/sandbox/{sandbox_id}/ssh-session/{session_id}") diff --git a/packages/prime-sandboxes/tests/test_vm_guards.py b/packages/prime-sandboxes/tests/test_vm_guards.py new file mode 100644 index 000000000..8bb751e63 --- /dev/null +++ b/packages/prime-sandboxes/tests/test_vm_guards.py @@ -0,0 +1,377 @@ +"""Tests for VM-unsupported operation guards in SandboxClient / AsyncSandboxClient. + +Mirrors the CLI's ``_guard_vm_unsupported`` behavior: the SDK should fail +fast with a clear ``APIError`` when a caller invokes an operation that is +not supported for VM-backed sandboxes, without making any HTTP calls. +""" + +from datetime import datetime, timedelta, timezone +from typing import Any, cast + +import pytest + +from prime_sandboxes import APIError +from prime_sandboxes.core.client import APIClient +from prime_sandboxes.sandbox import AsyncSandboxClient, SandboxAuthCache, SandboxClient + + +def _auth_payload(): + return { + "gateway_url": "https://gateway.example.com", + "user_ns": "ns", + "job_id": "job", + "token": "tok", + "expires_at": (datetime.now(timezone.utc) + timedelta(minutes=30)).isoformat(), + } + + +class _FakeCache: + def __init__(self, is_vm: bool): + self._is_vm = is_vm + + def get_or_refresh(self, _sandbox_id: str): + return _auth_payload() + + def is_vm(self, _sandbox_id: str) -> bool: + return self._is_vm + + +class _AsyncFakeCache: + def __init__(self, is_vm: bool): + self._is_vm = is_vm + + async def get_or_refresh(self, _sandbox_id: str): + return _auth_payload() + + async def is_vm(self, _sandbox_id: str) -> bool: + return self._is_vm + + +class _RecordingAPIClient: + """Minimal stand-in for APIClient.request that records calls.""" + + def __init__(self, response: Any = None): + self.calls = [] + self._response = response if response is not None else {} + + def request(self, method: str, path: str, **kwargs: Any) -> Any: + self.calls.append((method, path, kwargs)) + return self._response + + +class _AsyncRecordingAPIClient: + def __init__(self, response: Any = None): + self.calls = [] + self._response = response if response is not None else {} + + async def request(self, method: str, path: str, **kwargs: Any) -> Any: + self.calls.append((method, path, kwargs)) + return self._response + + async def aclose(self) -> None: + return None + + +def _make_sync_client(is_vm: bool) -> tuple[SandboxClient, _RecordingAPIClient]: + client = SandboxClient(APIClient(api_key="test-key")) + recording = _RecordingAPIClient() + # Swap in recording http client + fake cache + cast(Any, client).client = recording + cast(Any, client)._auth_cache = _FakeCache(is_vm=is_vm) + return client, recording + + +def _make_async_client(is_vm: bool) -> tuple[AsyncSandboxClient, _AsyncRecordingAPIClient]: + client = AsyncSandboxClient(api_key="test-key") + recording = _AsyncRecordingAPIClient() + cast(Any, client).client = recording + cast(Any, client)._auth_cache = _AsyncFakeCache(is_vm=is_vm) + return client, recording + + +# --------------------------------------------------------------------------- +# Sync guard tests +# --------------------------------------------------------------------------- + + +def test_sync_expose_blocked_for_vm(): + client, recording = _make_sync_client(is_vm=True) + with pytest.raises(APIError) as exc_info: + client.expose("sbx-vm", 8000) + assert "Port exposure" in str(exc_info.value) + assert "VM sandboxes" in str(exc_info.value) + assert recording.calls == [] + + +def test_sync_unexpose_blocked_for_vm(): + client, recording = _make_sync_client(is_vm=True) + with pytest.raises(APIError) as exc_info: + client.unexpose("sbx-vm", "exp-1") + assert "Port unexpose" in str(exc_info.value) + assert recording.calls == [] + + +def test_sync_list_exposed_ports_blocked_for_vm(): + client, recording = _make_sync_client(is_vm=True) + with pytest.raises(APIError) as exc_info: + client.list_exposed_ports("sbx-vm") + assert "Port listing" in str(exc_info.value) + assert recording.calls == [] + + +def test_sync_create_ssh_session_blocked_for_vm(): + client, recording = _make_sync_client(is_vm=True) + with pytest.raises(APIError) as exc_info: + client.create_ssh_session("sbx-vm") + assert "SSH" in str(exc_info.value) + assert recording.calls == [] + + +def test_sync_close_ssh_session_blocked_for_vm(): + client, recording = _make_sync_client(is_vm=True) + with pytest.raises(APIError) as exc_info: + client.close_ssh_session("sbx-vm", "sess-1") + assert "SSH" in str(exc_info.value) + assert recording.calls == [] + + +# --------------------------------------------------------------------------- +# Sync: container path still reaches the HTTP client +# --------------------------------------------------------------------------- + + +def test_sync_expose_allowed_for_container(): + client, recording = _make_sync_client(is_vm=False) + cast(Any, recording)._response = { + "exposure_id": "exp-1", + "sandbox_id": "sbx-c", + "port": 8000, + "name": None, + "url": "https://u", + "tls_socket": "tls", + } + client.expose("sbx-c", 8000) + assert any(method == "POST" and "/expose" in path for method, path, _ in recording.calls) + + +def test_sync_unexpose_allowed_for_container(): + client, recording = _make_sync_client(is_vm=False) + client.unexpose("sbx-c", "exp-1") + assert any( + method == "DELETE" and path.endswith("/expose/exp-1") for method, path, _ in recording.calls + ) + + +def test_sync_list_exposed_ports_allowed_for_container(): + client, recording = _make_sync_client(is_vm=False) + cast(Any, recording)._response = {"exposures": []} + client.list_exposed_ports("sbx-c") + assert any(method == "GET" and path.endswith("/expose") for method, path, _ in recording.calls) + + +def test_sync_create_ssh_session_allowed_for_container(): + client, recording = _make_sync_client(is_vm=False) + cast(Any, recording)._response = { + "session_id": "s", + "exposure_id": "e", + "sandbox_id": "sbx-c", + "host": "h", + "port": 22, + "external_endpoint": "h:22", + "expires_at": datetime.now(timezone.utc).isoformat(), + "ttl_seconds": 300, + "gateway_url": "https://g", + "user_ns": "ns", + "job_id": "job", + "token": "tok", + } + client.create_ssh_session("sbx-c") + assert any( + method == "POST" and path.endswith("/ssh-session") for method, path, _ in recording.calls + ) + + +def test_sync_close_ssh_session_allowed_for_container(): + client, recording = _make_sync_client(is_vm=False) + client.close_ssh_session("sbx-c", "sess-1") + assert any( + method == "DELETE" and path.endswith("/ssh-session/sess-1") + for method, path, _ in recording.calls + ) + + +def test_sync_list_all_exposed_ports_not_guarded(): + """list_all_exposed_ports has no sandbox_id and is not VM-guarded (matches CLI).""" + client = SandboxClient(APIClient(api_key="test-key")) + recording = _RecordingAPIClient(response={"exposures": []}) + cast(Any, client).client = recording + # Cache raises if queried: proves the call doesn't touch the guard + + class _BoomCache: + def is_vm(self, _sandbox_id: str) -> bool: + raise AssertionError("is_vm should not be called for list_all_exposed_ports") + + cast(Any, client)._auth_cache = _BoomCache() + client.list_all_exposed_ports() + assert any( + method == "GET" and path == "/sandbox/expose/all" for method, path, _ in recording.calls + ) + + +# --------------------------------------------------------------------------- +# Public is_vm helper +# --------------------------------------------------------------------------- + + +def test_sync_is_vm_delegates_to_cache_true(): + client, _ = _make_sync_client(is_vm=True) + assert client.is_vm("sbx-vm") is True + + +def test_sync_is_vm_delegates_to_cache_false(): + client, _ = _make_sync_client(is_vm=False) + assert client.is_vm("sbx-c") is False + + +def test_sync_is_vm_hits_backend_on_cold_cache(tmp_path): + """SandboxAuthCache.is_vm falls back to GET /sandbox/ when cached flag is missing.""" + + class _FakeAPIClient: + def __init__(self): + self.calls = 0 + + def request(self, method: str, path: str, **_kwargs): + if method == "GET" and path == "/sandbox/sbx-1": + self.calls += 1 + return { + "id": "sbx-1", + "name": "vm-box", + "dockerImage": "img", + "startCommand": None, + "cpuCores": 1.0, + "memoryGB": 2.0, + "diskSizeGB": 10.0, + "diskMountPath": "/sandbox-workspace", + "gpuCount": 0, + "gpuType": None, + "vm": True, + "networkAccess": True, + "status": "RUNNING", + "timeoutMinutes": 60, + "environmentVars": None, + "secrets": None, + "advancedConfigs": None, + "labels": [], + "createdAt": datetime.now(timezone.utc).isoformat(), + "updatedAt": datetime.now(timezone.utc).isoformat(), + "startedAt": None, + "terminatedAt": None, + "exitCode": None, + "errorType": None, + "errorMessage": None, + "userId": "user", + "teamId": "team", + "kubernetesJobId": None, + "registryCredentialsId": None, + } + raise AssertionError(f"Unexpected request: {method} {path}") + + cache = SandboxAuthCache(tmp_path / "auth_cache.json", _FakeAPIClient()) + cache.set("sbx-1", _auth_payload()) + + assert cache.is_vm("sbx-1") is True + # Second call uses the cached flag: backend hit count stays at 1. + assert cache.is_vm("sbx-1") is True + assert cache.client.calls == 1 + + +# --------------------------------------------------------------------------- +# Async guard tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_async_expose_blocked_for_vm(): + client, recording = _make_async_client(is_vm=True) + try: + with pytest.raises(APIError) as exc_info: + await client.expose("sbx-vm", 8000) + assert "Port exposure" in str(exc_info.value) + assert recording.calls == [] + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_unexpose_blocked_for_vm(): + client, recording = _make_async_client(is_vm=True) + try: + with pytest.raises(APIError) as exc_info: + await client.unexpose("sbx-vm", "exp-1") + assert "Port unexpose" in str(exc_info.value) + assert recording.calls == [] + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_list_exposed_ports_blocked_for_vm(): + client, recording = _make_async_client(is_vm=True) + try: + with pytest.raises(APIError) as exc_info: + await client.list_exposed_ports("sbx-vm") + assert "Port listing" in str(exc_info.value) + assert recording.calls == [] + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_create_ssh_session_blocked_for_vm(): + client, recording = _make_async_client(is_vm=True) + try: + with pytest.raises(APIError) as exc_info: + await client.create_ssh_session("sbx-vm") + assert "SSH" in str(exc_info.value) + assert recording.calls == [] + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_close_ssh_session_blocked_for_vm(): + client, recording = _make_async_client(is_vm=True) + try: + with pytest.raises(APIError) as exc_info: + await client.close_ssh_session("sbx-vm", "sess-1") + assert "SSH" in str(exc_info.value) + assert recording.calls == [] + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_expose_allowed_for_container(): + client, recording = _make_async_client(is_vm=False) + cast(Any, recording)._response = { + "exposure_id": "exp-1", + "sandbox_id": "sbx-c", + "port": 8000, + "name": None, + "url": "https://u", + "tls_socket": "tls", + } + try: + await client.expose("sbx-c", 8000) + assert any(method == "POST" and "/expose" in path for method, path, _ in recording.calls) + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_async_is_vm_public_helper(): + client, _ = _make_async_client(is_vm=True) + try: + assert (await client.is_vm("sbx-vm")) is True + finally: + await client.aclose() From 6a7121424b9f5286d746ab264f285e75247b19a3 Mon Sep 17 00:00:00 2001 From: Damian Barabonkov Date: Thu, 23 Apr 2026 14:20:01 -0400 Subject: [PATCH 5/7] feat: Add public key support for VM SSH sessions --- .../src/prime_sandboxes/sandbox.py | 26 ++++--- .../prime-sandboxes/tests/test_vm_guards.py | 71 +++++++++++++------ .../prime/src/prime_cli/commands/sandbox.py | 65 +++++++++++------ 3 files changed, 114 insertions(+), 48 deletions(-) diff --git a/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py b/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py index 0d46d5ad3..571b3d04d 100644 --- a/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py +++ b/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py @@ -1294,12 +1294,18 @@ def create_ssh_session( self, sandbox_id: str, ttl_seconds: Optional[int] = None, + public_key: Optional[str] = None, ) -> SSHSession: - """Create an SSH session""" - self._guard_vm_unsupported(sandbox_id, "SSH") + """Create an SSH session. + + Pass ``public_key`` for VM sandboxes; container sandboxes authorize + keys separately after session creation. + """ payload: Dict[str, Any] = {} if ttl_seconds is not None: payload["ttl_seconds"] = ttl_seconds + if public_key is not None: + payload["public_key"] = public_key response = self.client.request( "POST", f"/sandbox/{sandbox_id}/ssh-session", @@ -1308,8 +1314,7 @@ def create_ssh_session( return SSHSession.model_validate(response) def close_ssh_session(self, sandbox_id: str, session_id: str) -> None: - """Close an SSH session and remove its exposure""" - self._guard_vm_unsupported(sandbox_id, "SSH") + """Close an SSH session.""" self.client.request("DELETE", f"/sandbox/{sandbox_id}/ssh-session/{session_id}") @@ -2199,12 +2204,18 @@ async def create_ssh_session( self, sandbox_id: str, ttl_seconds: Optional[int] = None, + public_key: Optional[str] = None, ) -> SSHSession: - """Create an SSH session""" - await self._guard_vm_unsupported(sandbox_id, "SSH") + """Create an SSH session. + + Pass ``public_key`` for VM sandboxes; container sandboxes authorize + keys separately after session creation. + """ payload: Dict[str, Any] = {} if ttl_seconds is not None: payload["ttl_seconds"] = ttl_seconds + if public_key is not None: + payload["public_key"] = public_key response = await self.client.request( "POST", f"/sandbox/{sandbox_id}/ssh-session", @@ -2213,8 +2224,7 @@ async def create_ssh_session( return SSHSession.model_validate(response) async def close_ssh_session(self, sandbox_id: str, session_id: str) -> None: - """Close an SSH session and remove its exposure""" - await self._guard_vm_unsupported(sandbox_id, "SSH") + """Close an SSH session.""" await self.client.request("DELETE", f"/sandbox/{sandbox_id}/ssh-session/{session_id}") diff --git a/packages/prime-sandboxes/tests/test_vm_guards.py b/packages/prime-sandboxes/tests/test_vm_guards.py index 8bb751e63..d183468bf 100644 --- a/packages/prime-sandboxes/tests/test_vm_guards.py +++ b/packages/prime-sandboxes/tests/test_vm_guards.py @@ -119,20 +119,35 @@ def test_sync_list_exposed_ports_blocked_for_vm(): assert recording.calls == [] -def test_sync_create_ssh_session_blocked_for_vm(): +def test_sync_create_ssh_session_allowed_for_vm(): client, recording = _make_sync_client(is_vm=True) - with pytest.raises(APIError) as exc_info: - client.create_ssh_session("sbx-vm") - assert "SSH" in str(exc_info.value) - assert recording.calls == [] + cast(Any, recording)._response = { + "session_id": "s", + "exposure_id": "s", + "sandbox_id": "sbx-vm", + "host": "h", + "port": 2222, + "external_endpoint": "h:2222", + "expires_at": datetime.now(timezone.utc).isoformat(), + "ttl_seconds": 300, + "gateway_url": "", + "user_ns": "", + "job_id": "sbx-vm", + "token": "", + } + client.create_ssh_session("sbx-vm", public_key="ssh-ed25519 AAAA...") + assert any( + method == "POST" and path.endswith("/ssh-session") for method, path, _ in recording.calls + ) -def test_sync_close_ssh_session_blocked_for_vm(): +def test_sync_close_ssh_session_allowed_for_vm(): client, recording = _make_sync_client(is_vm=True) - with pytest.raises(APIError) as exc_info: - client.close_ssh_session("sbx-vm", "sess-1") - assert "SSH" in str(exc_info.value) - assert recording.calls == [] + client.close_ssh_session("sbx-vm", "sess-1") + assert any( + method == "DELETE" and path.endswith("/ssh-session/sess-1") + for method, path, _ in recording.calls + ) # --------------------------------------------------------------------------- @@ -327,25 +342,41 @@ async def test_async_list_exposed_ports_blocked_for_vm(): @pytest.mark.asyncio -async def test_async_create_ssh_session_blocked_for_vm(): +async def test_async_create_ssh_session_allowed_for_vm(): client, recording = _make_async_client(is_vm=True) + cast(Any, recording)._response = { + "session_id": "s", + "exposure_id": "s", + "sandbox_id": "sbx-vm", + "host": "h", + "port": 2222, + "external_endpoint": "h:2222", + "expires_at": datetime.now(timezone.utc).isoformat(), + "ttl_seconds": 300, + "gateway_url": "", + "user_ns": "", + "job_id": "sbx-vm", + "token": "", + } try: - with pytest.raises(APIError) as exc_info: - await client.create_ssh_session("sbx-vm") - assert "SSH" in str(exc_info.value) - assert recording.calls == [] + await client.create_ssh_session("sbx-vm", public_key="ssh-ed25519 AAAA...") + assert any( + method == "POST" and path.endswith("/ssh-session") + for method, path, _ in recording.calls + ) finally: await client.aclose() @pytest.mark.asyncio -async def test_async_close_ssh_session_blocked_for_vm(): +async def test_async_close_ssh_session_allowed_for_vm(): client, recording = _make_async_client(is_vm=True) try: - with pytest.raises(APIError) as exc_info: - await client.close_ssh_session("sbx-vm", "sess-1") - assert "SSH" in str(exc_info.value) - assert recording.calls == [] + await client.close_ssh_session("sbx-vm", "sess-1") + assert any( + method == "DELETE" and path.endswith("/ssh-session/sess-1") + for method, path, _ in recording.calls + ) finally: await client.aclose() diff --git a/packages/prime/src/prime_cli/commands/sandbox.py b/packages/prime/src/prime_cli/commands/sandbox.py index 8d43ccc82..178ad1250 100644 --- a/packages/prime/src/prime_cli/commands/sandbox.py +++ b/packages/prime/src/prime_cli/commands/sandbox.py @@ -5,6 +5,7 @@ import shutil import string import subprocess +import sys import tempfile import time from typing import Any, Dict, List, Optional @@ -1354,7 +1355,7 @@ def cleanup() -> None: with console.status("[bold blue]Checking sandbox status...", spinner="dots"): sandbox = sandbox_client.get(sandbox_id) - _guard_vm_unsupported(sandbox, "SSH") + is_vm_sandbox = bool(getattr(sandbox, "vm", False)) if sandbox.status != "RUNNING": console.print(f"[red]Error:[/red] Sandbox is not running (status: {sandbox.status})") @@ -1374,29 +1375,33 @@ def cleanup() -> None: with open(f"{key_path}.pub", "r") as f: public_key = f.read().strip() - # Create SSH session + # VM sandboxes take the public key here; containers authorize it below. console.print("[bold blue]Creating SSH session...[/bold blue]") with console.status("[bold blue]Setting up SSH session...", spinner="dots"): - session = sandbox_client.create_ssh_session(sandbox_id) + if is_vm_sandbox: + session = sandbox_client.create_ssh_session(sandbox_id, public_key=public_key) + else: + session = sandbox_client.create_ssh_session(sandbox_id) session_id = session.session_id - # Authorize the key - authorize_url = ( - f"{session.gateway_url.rstrip('/')}/{session.user_ns}/{session.job_id}/authorize" - ) - headers = {"Authorization": f"Bearer {session.token}"} - payload = { - "session_id": session.session_id, - "public_key": public_key, - "ttl_seconds": session.ttl_seconds, - } - try: - with httpx.Client(timeout=30) as client: - client.post(authorize_url, json=payload, headers=headers).raise_for_status() - except Exception as e: - console.print(f"[red]Error:[/red] Failed to authorize SSH key: {e}") - cleanup() - raise typer.Exit(1) + if not is_vm_sandbox: + # Containers authorize the key through the sidecar endpoint. + authorize_url = ( + f"{session.gateway_url.rstrip('/')}/{session.user_ns}/{session.job_id}/authorize" + ) + headers = {"Authorization": f"Bearer {session.token}"} + payload = { + "session_id": session.session_id, + "public_key": public_key, + "ttl_seconds": session.ttl_seconds, + } + try: + with httpx.Client(timeout=30) as client: + client.post(authorize_url, json=payload, headers=headers).raise_for_status() + except Exception as e: + console.print(f"[red]Error:[/red] Failed to authorize SSH key: {e}") + cleanup() + raise typer.Exit(1) ssh_host = session.host ssh_port = session.port @@ -1421,6 +1426,26 @@ def cleanup() -> None: ssh_cmd.extend(["-o", "UserKnownHostsFile=/dev/null"]) ssh_cmd.extend(["-o", "LogLevel=ERROR"]) + # VM SSH connections need a session prefix before the handshake, so + # route them through a small ProxyCommand that writes it first. + if is_vm_sandbox: + prefix = f"PRIME-SSH-SESSION {session.session_id}\n" + python_exec = sys.executable or "python3" + # Use `read1` for responsive stdin forwarding and flush each + # chunk received from the remote side. + proxy_script = ( + "import socket, sys, threading;" + f"s=socket.create_connection(({ssh_host!r},{int(ssh_port)}));" + f"s.sendall({prefix!r}.encode());" + "t=threading.Thread(target=lambda:[" + "(sys.stdout.buffer.write(b), sys.stdout.buffer.flush()) " + "for b in iter(lambda:s.recv(4096),b'')]);" + "t.daemon=True;t.start();" + "[s.sendall(b) for b in iter(lambda:sys.stdin.buffer.read1(4096),b'')]" + ) + proxy_cmd = f"{python_exec} -c {shlex.quote(proxy_script)}" + ssh_cmd.extend(["-o", f"ProxyCommand={proxy_cmd}"]) + # Add identity file if specified if key_path: ssh_cmd.extend(["-i", key_path]) From f5a00903ebccb40c5e3eeb4d5bca8dd8a5bc7941 Mon Sep 17 00:00:00 2001 From: Damian Barabonkov Date: Thu, 23 Apr 2026 16:02:43 -0400 Subject: [PATCH 6/7] fix: Improve SSH connection proxy handling in sandbox.py --- .../prime/src/prime_cli/commands/sandbox.py | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/packages/prime/src/prime_cli/commands/sandbox.py b/packages/prime/src/prime_cli/commands/sandbox.py index e5251bf7e..28b122da7 100644 --- a/packages/prime/src/prime_cli/commands/sandbox.py +++ b/packages/prime/src/prime_cli/commands/sandbox.py @@ -1445,18 +1445,41 @@ def cleanup() -> None: prefix = f"PRIME-SSH-SESSION {session.session_id}\n" python_exec = sys.executable or "python3" # Use `read1` for responsive stdin forwarding and flush each - # chunk received from the remote side. + # chunk received from the remote side. Relay loops are plain + # while-loops (not list comprehensions) so forwarded bytes are + # not accumulated in memory for the lifetime of the session. proxy_script = ( - "import socket, sys, threading;" - f"s=socket.create_connection(({ssh_host!r},{int(ssh_port)}));" - f"s.sendall({prefix!r}.encode());" - "t=threading.Thread(target=lambda:[" - "(sys.stdout.buffer.write(b), sys.stdout.buffer.flush()) " - "for b in iter(lambda:s.recv(4096),b'')]);" - "t.daemon=True;t.start();" - "[s.sendall(b) for b in iter(lambda:sys.stdin.buffer.read1(4096),b'')]" + "import socket, sys, threading\n" + f"s = socket.create_connection(({ssh_host!r}, {int(ssh_port)}))\n" + f"s.sendall({prefix!r}.encode())\n" + "def _reader():\n" + " try:\n" + " while True:\n" + " b = s.recv(4096)\n" + " if not b:\n" + " break\n" + " sys.stdout.buffer.write(b)\n" + " sys.stdout.buffer.flush()\n" + " except OSError:\n" + " pass\n" + "t = threading.Thread(target=_reader, daemon=True)\n" + "t.start()\n" + "try:\n" + " while True:\n" + " b = sys.stdin.buffer.read1(4096)\n" + " if not b:\n" + " break\n" + " s.sendall(b)\n" + "except OSError:\n" + " pass\n" + "finally:\n" + " try:\n" + " s.shutdown(socket.SHUT_WR)\n" + " except OSError:\n" + " pass\n" + " s.close()\n" ) - proxy_cmd = f"{python_exec} -c {shlex.quote(proxy_script)}" + proxy_cmd = f"{shlex.quote(python_exec)} -c {shlex.quote(proxy_script)}" ssh_cmd.extend(["-o", f"ProxyCommand={proxy_cmd}"]) # Add identity file if specified From df75d03e2c259d2717ae19bebeed4e1bc84ca76f Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 23 Apr 2026 20:44:51 +0000 Subject: [PATCH 7/7] test: cover VM SSH ProxyCommand quoting and streaming Co-authored-by: Damian Barabonkov --- packages/prime/tests/test_sandbox_cli.py | 83 ++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/packages/prime/tests/test_sandbox_cli.py b/packages/prime/tests/test_sandbox_cli.py index 11b94845e..762155983 100644 --- a/packages/prime/tests/test_sandbox_cli.py +++ b/packages/prime/tests/test_sandbox_cli.py @@ -1,3 +1,6 @@ +import shlex +import subprocess +from pathlib import Path from types import SimpleNamespace from typing import Any @@ -347,3 +350,83 @@ def mock_bulk_delete(self: Any, **kwargs: Any) -> Any: assert bulk_kwargs["user_id"] is None assert "Processed 1 sandbox(es)" in output + + +def test_sandbox_ssh_vm_proxy_command_quotes_python_and_streams_without_list_comprehensions( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PRIME_API_KEY", "dummy") + monkeypatch.setenv("PRIME_DISABLE_VERSION_CHECK", "1") + + temp_dir = tmp_path / "prime-ssh" + temp_dir.mkdir() + pub_key_path = temp_dir / "id_ed25519.pub" + pub_key_path.write_text("ssh-ed25519 AAAA-test-key user@test\n") + + captured: dict[str, Any] = {"ssh_cmd": None, "public_key": None} + + class FakeSandboxClient: + def __init__(self, _base_client: Any) -> None: + pass + + def get(self, _sandbox_id: str) -> Any: + return SimpleNamespace(status="RUNNING", vm=True) + + def create_ssh_session( + self, _sandbox_id: str, ttl_seconds: int | None = None, public_key: str | None = None + ) -> Any: + captured["public_key"] = public_key + assert ttl_seconds is None + return SimpleNamespace( + session_id="sess-123", + host="vm-host.example", + port=2222, + gateway_url="https://gateway.example", + user_ns="ns", + job_id="job", + token="token", + ttl_seconds=300, + ) + + def close_ssh_session(self, _sandbox_id: str, _session_id: str) -> None: + return None + + fake_python_exec = "/tmp/my python/bin/python3" + + monkeypatch.setattr("prime_cli.commands.sandbox.APIClient", lambda: object()) + monkeypatch.setattr("prime_cli.commands.sandbox.SandboxClient", FakeSandboxClient) + monkeypatch.setattr("prime_cli.commands.sandbox.tempfile.mkdtemp", lambda prefix: str(temp_dir)) + monkeypatch.setattr("prime_cli.commands.sandbox.time.sleep", lambda _seconds: None) + monkeypatch.setattr("prime_cli.commands.sandbox.sys.executable", fake_python_exec) + monkeypatch.setattr("prime_cli.commands.sandbox.shutil.which", lambda _name: "/usr/bin/fake") + + def _mock_subprocess_run(cmd: list[str], **kwargs: Any) -> subprocess.CompletedProcess[str]: + if cmd and cmd[0] == "ssh-keygen": + return subprocess.CompletedProcess(cmd, 0) + if cmd and cmd[0] == "ssh": + captured["ssh_cmd"] = cmd + return subprocess.CompletedProcess(cmd, 0) + raise AssertionError(f"Unexpected subprocess call: {cmd!r}, kwargs={kwargs!r}") + + monkeypatch.setattr("prime_cli.commands.sandbox.subprocess.run", _mock_subprocess_run) + + result = runner.invoke(app, ["sandbox", "ssh", "sbx-vm-1"]) + + output = strip_ansi(result.output) + assert result.exit_code == 0, f"Failed: {output}" + assert captured["public_key"] == "ssh-ed25519 AAAA-test-key user@test" + + ssh_cmd = captured["ssh_cmd"] + assert isinstance(ssh_cmd, list) + proxy_opt = next((arg for arg in ssh_cmd if arg.startswith("ProxyCommand=")), None) + assert proxy_opt is not None + + proxy_cmd = proxy_opt[len("ProxyCommand=") :] + assert proxy_cmd.startswith(f"{shlex.quote(fake_python_exec)} -c ") + parsed_proxy = shlex.split(proxy_cmd) + assert parsed_proxy[0] == fake_python_exec + assert parsed_proxy[1] == "-c" + proxy_script = parsed_proxy[2] + + assert "while True:" in proxy_script + assert "for b in iter(" not in proxy_script