From 6126c244ce01c47fc6fbad602c4485d10740a557 Mon Sep 17 00:00:00 2001 From: Brian Krabach Date: Mon, 9 Mar 2026 06:30:45 -0700 Subject: [PATCH] feat: add progress visibility to amplifier update command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace silent 'Updating...' with animated Rich progress spinner showing dynamic status updates during long update operations (up to 120 seconds). Fixes three root causes of silent hangs: 1. execute_updates() lacked progress_callback parameter - callback infrastructure existed in lower-level functions but wasn't wired from the top. Now accepts and passes through progress callbacks. 2. execute_self_update() ran 'uv tool install --force' with capture_output=True, silencing output. Changed to subprocess.Popen with line-by-line stderr streaming so uv's progress shows (Resolved, Prepared, Installed). 3. Bundle updates had zero per-item progress. Now shows 'Loading bundle X...' and 'Refreshing bundle X...' for each bundle. Spinner now displays dynamic messages: - 'Updating provider-anthropic...' - 'Clearing cache for tool-web...' - 'Downloading hook-shell...' - 'Updating Amplifier (uv tool install)...' - 'Updating Amplifier: Resolved 15 packages in 2.31s' - 'Loading bundle core...' - 'Refreshing bundle core...' - 'Checking module dependencies...' Follows the same pattern introduced for bundle preparation in PR #136. Code Review Fixes (zen-architect, code-quality-reviewer, security-guardian): - CRITICAL: Changed stdout=PIPE to stdout=DEVNULL to prevent classic Popen deadlock (unread stdout pipe fills, blocks uv) - CRITICAL: Implemented threaded stderr draining with 120s timeout covering entire operation (previously timeout only covered process.wait() after stderr was already drained) - IMPORTANT: Added process.wait() after process.kill() in TimeoutExpired handler to reap zombie processes - IMPORTANT: Added process cleanup (kill+wait) in generic Exception handler to prevent orphan processes - Broadened ANSI escape regex from SGR-only to full CSI range - Moved 'import re' from function-level to module-level imports - Added 'import threading' for threaded stderr drain - Renamed spinner variable from 'status' to 'spinner' to eliminate name collision with BundleStatus loop variables - Fixed phase labels in _format_update_progress() to use 'name' parameter instead of hardcoding 'Amplifier' - Added docstring documenting the dual-use progress_callback contract 🤖 Generated with Amplifier Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- amplifier_app_cli/commands/update.py | 111 +++++++++++++++------ amplifier_app_cli/utils/update_executor.py | 97 +++++++++++++++--- 2 files changed, 165 insertions(+), 43 deletions(-) diff --git a/amplifier_app_cli/commands/update.py b/amplifier_app_cli/commands/update.py index ec01f79e..62a49fe4 100644 --- a/amplifier_app_cli/commands/update.py +++ b/amplifier_app_cli/commands/update.py @@ -840,6 +840,41 @@ def _show_verbose_report( print_legend() +def _format_update_progress(name: str, phase: str) -> str: + """Format an update progress callback into a human-readable label for the spinner. + + Maps update phases to user-friendly descriptions, following the same pattern + as _format_progress() in runtime/config.py for bundle preparation. + + Args: + name: Item being updated (module name, "amplifier", bundle name). + phase: Progress phase (e.g., "updating", "clearing", "downloading"). + + Returns: + Human-readable progress label. + """ + labels = { + "updating": f"Updating {name}...", + "clearing": f"Clearing cache for {name}...", + "downloading": f"Downloading {name}...", + "done": f"Updated {name}", + "failed": f"Failed to update {name}", + "installing": f"Installing {name} (uv tool install)...", + "checking_deps": f"Checking {name} dependencies...", + "updating_bundle": f"Loading bundle {name}...", + "refreshing_bundle": f"Refreshing bundle {name}...", + } + label = labels.get(phase) + if label: + return label + # For raw uv output lines streamed during self-update, show them directly. + # execute_self_update passes cleaned stderr lines as the phase value + # (see its docstring for the dual-use contract). + if name == "amplifier" and phase: + return f"Updating Amplifier: {phase}" + return f"{phase}: {name}" + + @click.command() @click.option("--check-only", is_flag=True, help="Check for updates without installing") @click.option("--yes", "-y", is_flag=True, help="Skip confirmations") @@ -989,44 +1024,60 @@ async def _check_sources(): console.print("[dim]Update cancelled[/dim]") return - # Execute updates with progress + # Execute updates with progress spinner (same pattern as bundle preparation) console.print() - console.print("Updating...") - result = asyncio.run( - execute_updates( - report, umbrella_info=umbrella_info if has_umbrella_updates else None - ) + spinner = console.status( + "[dim]Updating...[/dim]", + spinner="dots", ) + spinner.start() - # Execute bundle updates - bundle_updated: list[str] = [] - bundle_failed: list[str] = [] - bundle_errors: dict[str, str] = {} + def _on_update_progress(name: str, phase: str) -> None: + label = _format_update_progress(name, phase) + spinner.update(f"[dim]{label}[/dim]") - if has_bundle_updates: - from amplifier_foundation import update_bundle + try: + result = asyncio.run( + execute_updates( + report, + umbrella_info=umbrella_info if has_umbrella_updates else None, + progress_callback=_on_update_progress, + ) + ) - registry = create_bundle_registry() - bundles_to_update = [ - name for name, status in bundle_results.items() if status.has_updates - ] + # Execute bundle updates + bundle_updated: list[str] = [] + bundle_failed: list[str] = [] + bundle_errors: dict[str, str] = {} + + if has_bundle_updates: + from amplifier_foundation import update_bundle - for bundle_name in bundles_to_update: - try: - loaded = asyncio.run(registry.load(bundle_name)) - if isinstance(loaded, dict): - bundle_errors[bundle_name] = "Expected single bundle, got dict" + registry = create_bundle_registry() + bundles_to_update = [ + name for name, status in bundle_results.items() if status.has_updates + ] + + for bundle_name in bundles_to_update: + try: + _on_update_progress(bundle_name, "updating_bundle") + loaded = asyncio.run(registry.load(bundle_name)) + if isinstance(loaded, dict): + bundle_errors[bundle_name] = "Expected single bundle, got dict" + bundle_failed.append(bundle_name) + continue + bundle_obj = loaded + + # Refresh bundle sources + _on_update_progress(bundle_name, "refreshing_bundle") + asyncio.run(update_bundle(bundle_obj)) + bundle_updated.append(bundle_name) + except Exception as exc: + bundle_errors[bundle_name] = str(exc) bundle_failed.append(bundle_name) - continue - bundle_obj = loaded - - # Refresh bundle sources - asyncio.run(update_bundle(bundle_obj)) - bundle_updated.append(bundle_name) - except Exception as exc: - bundle_errors[bundle_name] = str(exc) - bundle_failed.append(bundle_name) + finally: + spinner.stop() # Show results console.print() diff --git a/amplifier_app_cli/utils/update_executor.py b/amplifier_app_cli/utils/update_executor.py index 045c179e..aec99c75 100644 --- a/amplifier_app_cli/utils/update_executor.py +++ b/amplifier_app_cli/utils/update_executor.py @@ -5,7 +5,9 @@ """ import logging +import re import subprocess +import threading from collections.abc import Callable from dataclasses import dataclass from dataclasses import field @@ -390,9 +392,7 @@ def _invalidate_modules_with_missing_deps() -> tuple[int, int]: missing_deps.append(dep) if missing_deps: - logger.debug( - f"Module {module_path.name} has missing deps: {missing_deps}" - ) + logger.debug(f"Module {module_path.name} has missing deps: {missing_deps}") modules_to_invalidate.append(module_path_str) # Remove invalidated entries @@ -400,7 +400,9 @@ def _invalidate_modules_with_missing_deps() -> tuple[int, int]: for path_str in modules_to_invalidate: del state["modules"][path_str] module_name = Path(path_str).name - logger.info(f"Invalidated install state for {module_name} (missing dependencies)") + logger.info( + f"Invalidated install state for {module_name} (missing dependencies)" + ) # Write back the modified state try: @@ -412,7 +414,10 @@ def _invalidate_modules_with_missing_deps() -> tuple[int, int]: return (modules_checked, len(modules_to_invalidate)) -async def execute_self_update(umbrella_info: UmbrellaInfo) -> ExecutionResult: +async def execute_self_update( + umbrella_info: UmbrellaInfo, + progress_callback: Callable[[str, str], None] | None = None, +) -> ExecutionResult: """Delegate to 'uv tool install --force'. Philosophy: uv is designed for this, use it. @@ -421,23 +426,76 @@ async def execute_self_update(umbrella_info: UmbrellaInfo) -> ExecutionResult: dependencies are no longer available in the new Python environment. This avoids unnecessary reinstallation of modules whose dependencies are still satisfied. + + Progress reporting: The progress_callback receives two kinds of phase values: + - Structured keywords: "installing", "checking_deps" (stable, mapped by caller) + - Raw uv output lines: e.g. "Resolved 15 packages in 2.31s" (streamed from stderr) + The caller's format function should handle both (see _format_update_progress). """ url = f"git+{umbrella_info.url}@{umbrella_info.ref}" + if progress_callback: + progress_callback("amplifier", "installing") + + # Matches all CSI sequences: colors, cursor movement, erase, show/hide cursor + ansi_csi = re.compile(r"\x1b\[[0-9;?]*[A-Za-z]") + + process: subprocess.Popen[str] | None = None + stderr_lines: list[str] = [] + + def _drain_stderr() -> None: + """Read uv's stderr line-by-line in a background thread. + + Runs in a daemon thread so the 120s timeout in the main thread + applies to the entire operation, not just the post-drain wait. + """ + assert process is not None and process.stderr is not None + for line in process.stderr: + raw = line.rstrip("\n\r") + if not raw: + continue + stderr_lines.append(raw) + if progress_callback: + # Strip ANSI escape codes for clean spinner display + clean = ansi_csi.sub("", raw).strip() + if clean: + progress_callback("amplifier", clean) + try: - result = subprocess.run( + # Use Popen to stream uv output for progress visibility. + # Previously used subprocess.run(capture_output=True) which silenced + # all output for up to 120 seconds with no feedback. + process = subprocess.Popen( ["uv", "tool", "install", "--force", url], - capture_output=True, + stdout=subprocess.DEVNULL, # uv progress goes to stderr; discard stdout + stderr=subprocess.PIPE, text=True, - timeout=120, ) - if result.returncode == 0: + # Drain stderr in a background thread so the 120s timeout covers + # the entire operation. Without threading, `for line in process.stderr` + # blocks indefinitely if uv stalls mid-output. + reader = threading.Thread(target=_drain_stderr, daemon=True) + reader.start() + reader.join(timeout=120) + + if reader.is_alive(): + # Timeout expired while still reading stderr — uv is hung + process.kill() + process.wait() + raise subprocess.TimeoutExpired(process.args, 120) + + # stderr is fully drained; brief wait for process exit + process.wait(timeout=5) + + if process.returncode == 0: # Surgically invalidate only modules with missing dependencies. # The --force flag may recreate the Python environment, wiping # some module dependencies. Rather than clearing all install state, # we check which modules actually have missing deps and only # invalidate those entries. + if progress_callback: + progress_callback("amplifier", "checking_deps") checked, invalidated = _invalidate_modules_with_missing_deps() if invalidated > 0: logger.info( @@ -449,7 +507,7 @@ async def execute_self_update(umbrella_info: UmbrellaInfo) -> ExecutionResult: updated=["amplifier"], messages=["Amplifier updated successfully"], ) - error_msg = result.stderr.strip() or "Unknown error" + error_msg = "\n".join(stderr_lines).strip() or "Unknown error" return ExecutionResult( success=False, failed=["amplifier"], @@ -458,6 +516,9 @@ async def execute_self_update(umbrella_info: UmbrellaInfo) -> ExecutionResult: ) except subprocess.TimeoutExpired: + if process: + process.kill() + process.wait() # reap zombie, close pipe fds return ExecutionResult( success=False, failed=["amplifier"], @@ -474,6 +535,9 @@ async def execute_self_update(umbrella_info: UmbrellaInfo) -> ExecutionResult: ], ) except Exception as e: + if process and process.poll() is None: + process.kill() + process.wait() # reap zombie, close pipe fds return ExecutionResult( success=False, failed=["amplifier"], @@ -483,7 +547,9 @@ async def execute_self_update(umbrella_info: UmbrellaInfo) -> ExecutionResult: async def execute_updates( - report: UpdateReport, umbrella_info: UmbrellaInfo | None = None + report: UpdateReport, + umbrella_info: UmbrellaInfo | None = None, + progress_callback: Callable[[str, str], None] | None = None, ) -> ExecutionResult: """Orchestrate all updates based on report. @@ -492,6 +558,7 @@ async def execute_updates( Args: report: Update status report from check_all_sources umbrella_info: Optional umbrella info if already checked for updates + progress_callback: Optional callback(name, phase) for progress reporting """ all_updated = [] all_failed = [] @@ -503,7 +570,9 @@ async def execute_updates( modules_needing_update = [s for s in report.cached_git_sources if s.has_update] if modules_needing_update: logger.info(f"Selectively updating {len(modules_needing_update)} module(s)...") - result = await execute_selective_module_update(modules_needing_update) + result = await execute_selective_module_update( + modules_needing_update, progress_callback=progress_callback + ) all_updated.extend(result.updated) all_failed.extend(result.failed) @@ -516,7 +585,9 @@ async def execute_updates( # 2. Execute self-update if umbrella_info provided (already checked by caller) if umbrella_info: logger.info("Updating Amplifier (umbrella dependencies have updates)...") - result = await execute_self_update(umbrella_info) + result = await execute_self_update( + umbrella_info, progress_callback=progress_callback + ) all_updated.extend(result.updated) all_failed.extend(result.failed)