From ba22bf565abd34e5fa81e123b8f70d7aa6b006b5 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Fri, 9 Jan 2026 17:20:35 -0600 Subject: [PATCH 1/2] refactor: improve ClaudeCodeAdapter for session management and repository handling - Updated ClaudeCodeAdapter to use local variables for message IDs to prevent race conditions during concurrent runs. - Introduced a new `restart_session_tool` for handling session restarts via MCP tools. - Enhanced repository cloning logic to create and checkout feature branches named 'ambient/' when cloning at runtime. - Modified the add_repo function to only trigger notifications for newly cloned repositories, preventing duplicate notifications. - Improved logging for better observability of session and repository operations. This refactor enhances the reliability and clarity of session management and repository interactions within the ClaudeCodeAdapter. --- .../runners/claude-code-runner/adapter.py | 135 ++++++++++++------ components/runners/claude-code-runner/main.py | 114 +++++++++------ 2 files changed, 164 insertions(+), 85 deletions(-) diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index a96a907e..96fce69f 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -70,9 +70,9 @@ def __init__(self): self._skip_resume_on_restart = False self._turn_count = 0 - # AG-UI streaming state - self._current_message_id: Optional[str] = None - self._current_tool_id: Optional[str] = None + # AG-UI streaming state (per-run, not instance state) + # NOTE: _current_message_id and _current_tool_id are now local variables + # in _run_claude_agent_sdk to avoid race conditions with concurrent runs self._current_run_id: Optional[str] = None self._current_thread_id: Optional[str] = None @@ -289,6 +289,9 @@ async def _run_claude_agent_sdk( thread_id: AG-UI thread identifier run_id: AG-UI run identifier """ + # Per-run state - NOT instance variables to avoid race conditions with concurrent runs + current_message_id: Optional[str] = None + logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}, will create fresh client") try: # Check for authentication method @@ -331,6 +334,7 @@ async def _run_claude_agent_sdk( ToolResultBlock, ) from claude_agent_sdk.types import StreamEvent + from claude_agent_sdk import tool as sdk_tool, create_sdk_mcp_server from observability import ObservabilityManager @@ -401,6 +405,31 @@ async def _run_claude_agent_sdk( # Load MCP server configuration (webfetch is included in static .mcp.json) mcp_servers = self._load_mcp_config(cwd_path) or {} + # Create custom session control tools + # Capture self reference for the restart tool closure + adapter_ref = self + + @sdk_tool("restart_session", "Restart the Claude session to recover from issues, clear state, or get a fresh connection. Use this if you detect you're in a broken state or need to reset.", {}) + async def restart_session_tool(args: dict) -> dict: + """Tool that allows Claude to request a session restart.""" + adapter_ref._restart_requested = True + logger.info("🔄 Session restart requested by Claude via MCP tool") + return { + "content": [{ + "type": "text", + "text": "Session restart has been requested. The current run will complete and a fresh session will be established. Your conversation context will be preserved on disk." + }] + } + + # Create SDK MCP server for session tools + session_tools_server = create_sdk_mcp_server( + name="session", + version="1.0.0", + tools=[restart_session_tool] + ) + mcp_servers["session"] = session_tools_server + logger.info("Added custom session control MCP tools (restart_session)") + # Disable built-in WebFetch in favor of WebFetch.MCP from config allowed_tools = ["Read", "Write", "Bash", "Glob", "Grep", "Edit", "MultiEdit", "WebSearch"] if mcp_servers: @@ -428,20 +457,6 @@ async def _run_claude_agent_sdk( include_partial_messages=True, ) - # Enable continue_conversation for session resumption - if not self._first_run or is_continuation: - try: - options.continue_conversation = True - logger.info("Enabled continue_conversation for session resumption") - yield RawEvent( - type=EventType.RAW, - thread_id=thread_id, - run_id=run_id, - event={"type": "system_log", "message": "🔄 Continuing conversation from previous state"} - ) - except Exception as e: - logger.warning(f"Failed to set continue_conversation: {e}") - if self._skip_resume_on_restart: self._skip_resume_on_restart = False @@ -481,9 +496,24 @@ def create_sdk_client(opts, disable_continue=False): opts.continue_conversation = False return ClaudeSDKClient(options=opts) - # Always create a fresh client for each run (simple and reliable) + # Create fresh client for each run + # (Python SDK has issues with client reuse despite docs suggesting it should work) logger.info("Creating new ClaudeSDKClient for this run...") + # Enable continue_conversation to resume from disk state + if not self._first_run or is_continuation: + try: + options.continue_conversation = True + logger.info("Enabled continue_conversation (will resume from disk state)") + yield RawEvent( + type=EventType.RAW, + thread_id=thread_id, + run_id=run_id, + event={"type": "system_log", "message": "🔄 Resuming conversation from disk state"} + ) + except Exception as e: + logger.warning(f"Failed to set continue_conversation: {e}") + try: logger.info("Creating ClaudeSDKClient...") client = create_sdk_client(options) @@ -508,15 +538,6 @@ def create_sdk_client(opts, disable_continue=False): try: # Store client reference for interrupt support self._active_client = client - - if not self._first_run: - yield RawEvent( - type=EventType.RAW, - thread_id=thread_id, - run_id=run_id, - event={"type": "system_log", "message": "✅ Continuing conversation"} - ) - logger.info("SDK continuing conversation from local state") # Process the prompt step_id = str(uuid.uuid4()) @@ -533,8 +554,12 @@ def create_sdk_client(opts, disable_continue=False): logger.info("Query sent, waiting for response stream...") # Process response stream + logger.info("Starting to consume receive_response() iterator...") + message_count = 0 + async for message in client.receive_response(): - logger.info(f"[ClaudeSDKClient]: {message}") + message_count += 1 + logger.info(f"[ClaudeSDKClient Message #{message_count}]: {message}") # Handle StreamEvent for real-time streaming chunks if isinstance(message, StreamEvent): @@ -542,12 +567,12 @@ def create_sdk_client(opts, disable_continue=False): event_type = event_data.get('type') if event_type == 'message_start': - self._current_message_id = str(uuid.uuid4()) + current_message_id = str(uuid.uuid4()) yield TextMessageStartEvent( type=EventType.TEXT_MESSAGE_START, thread_id=thread_id, run_id=run_id, - message_id=self._current_message_id, + message_id=current_message_id, role="assistant", ) @@ -555,12 +580,12 @@ def create_sdk_client(opts, disable_continue=False): delta_data = event_data.get('delta', {}) if delta_data.get('type') == 'text_delta': text_chunk = delta_data.get('text', '') - if text_chunk: + if text_chunk and current_message_id: yield TextMessageContentEvent( type=EventType.TEXT_MESSAGE_CONTENT, thread_id=thread_id, run_id=run_id, - message_id=self._current_message_id, + message_id=current_message_id, delta=text_chunk, ) continue @@ -654,14 +679,14 @@ def create_sdk_client(opts, disable_continue=False): ) # End text message after processing all blocks - if getattr(message, 'content', []) and self._current_message_id: + if getattr(message, 'content', []) and current_message_id: yield TextMessageEndEvent( type=EventType.TEXT_MESSAGE_END, thread_id=thread_id, run_id=run_id, - message_id=self._current_message_id, + message_id=current_message_id, ) - self._current_message_id = None + current_message_id = None elif isinstance(message, SystemMessage): text = getattr(message, 'text', None) @@ -724,15 +749,31 @@ def create_sdk_client(opts, disable_continue=False): step_id=step_id, step_name="processing_prompt", ) + + logger.info(f"Response iterator fully consumed ({message_count} messages total)") # Mark first run complete self._first_run = False + + # Check if restart was requested by Claude + if self._restart_requested: + logger.info("🔄 Restart was requested, emitting restart event") + self._restart_requested = False # Reset flag + yield RawEvent( + type=EventType.RAW, + thread_id=thread_id, + run_id=run_id, + event={ + "type": "session_restart_requested", + "message": "Claude requested a session restart. Reconnecting..." + } + ) finally: - # Clear active client reference (interrupt no longer valid for this run) + # Clear active client reference self._active_client = None - # Always disconnect client at end of run (no persistence) + # Always disconnect client at end of run if client is not None: logger.info("Disconnecting client (end of run)") await client.disconnect() @@ -761,7 +802,6 @@ async def interrupt(self) -> None: except Exception as e: logger.error(f"Failed to interrupt Claude SDK: {e}") - def _setup_workflow_paths(self, active_workflow_url: str, repos_cfg: list) -> tuple[str, list, str]: """Setup paths for workflow mode.""" add_dirs = [] @@ -1300,25 +1340,34 @@ def _build_workspace_context_prompt(self, repos_cfg, workflow_name, artifacts_pa # Repositories if repos_cfg: + session_id = os.getenv('AGENTIC_SESSION_NAME', '').strip() + feature_branch = f"ambient/{session_id}" if session_id else None + repo_names = [repo.get('name', f'repo-{i}') for i, repo in enumerate(repos_cfg)] if len(repo_names) <= 5: - prompt += f"**Repositories**: {', '.join([f'repos/{name}/' for name in repo_names])}\n\n" + prompt += f"**Repositories**: {', '.join([f'repos/{name}/' for name in repo_names])}\n" else: - prompt += f"**Repositories** ({len(repo_names)} total): {', '.join([f'repos/{name}/' for name in repo_names[:5]])}, and {len(repo_names) - 5} more\n\n" + prompt += f"**Repositories** ({len(repo_names)} total): {', '.join([f'repos/{name}/' for name in repo_names[:5]])}, and {len(repo_names) - 5} more\n" + + if feature_branch: + prompt += f"**Working Branch**: `{feature_branch}` (all repos are on this feature branch)\n\n" + else: + prompt += "\n" # Add git push instructions for repos with autoPush enabled auto_push_repos = [repo for repo in repos_cfg if repo.get('autoPush', False)] if auto_push_repos: + push_branch = feature_branch or "ambient/" + prompt += "## Git Push Instructions\n\n" prompt += "The following repositories have auto-push enabled. When you make changes to these repositories, you MUST commit and push your changes:\n\n" for repo in auto_push_repos: repo_name = repo.get('name', 'unknown') - repo_branch = repo.get('branch', 'main') - prompt += f"- **repos/{repo_name}/** (branch: {repo_branch})\n" + prompt += f"- **repos/{repo_name}/**\n" prompt += "\nAfter making changes to any auto-push repository:\n" prompt += "1. Use `git add` to stage your changes\n" prompt += "2. Use `git commit -m \"description\"` to commit with a descriptive message\n" - prompt += "3. Use `git push origin ` to push to the remote repository\n\n" + prompt += f"3. Use `git push origin {push_branch}` to push to the remote repository\n\n" # MCP Integration Setup Instructions prompt += "## MCP Integrations\n" diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index f31f1d8a..079d4001 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -542,12 +542,13 @@ async def change_workflow(request: Request): return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} -async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[bool, str]: +async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[bool, str, bool]: """ - Clone a repository at runtime. + Clone a repository at runtime and create a feature branch. This mirrors the logic in hydrate.sh but runs when repos are added - after the pod has started. + after the pod has started. After cloning, creates and checks out a + feature branch named 'ambient/'. Args: git_url: Git repository URL @@ -555,14 +556,17 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b name: Name for the cloned directory (derived from URL if empty) Returns: - (success, repo_dir_path) tuple + (success, repo_dir_path, was_newly_cloned) tuple + - success: True if repo is available (either newly cloned or already existed) + - repo_dir_path: Path to the repo directory + - was_newly_cloned: True only if the repo was actually cloned this time """ import tempfile import shutil from pathlib import Path if not git_url: - return False, "" + return False, "", False # Derive repo name from URL if not provided if not name: @@ -576,10 +580,10 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b logger.info(f"Cloning repo '{name}' from {git_url}@{branch}") - # Skip if already cloned + # Skip if already cloned - not newly cloned if repo_final.exists(): logger.info(f"Repo '{name}' already exists at {repo_final}, skipping clone") - return True, str(repo_final) + return True, str(repo_final), False # Already existed, not newly cloned # Create temp directory for clone temp_dir = Path(tempfile.mkdtemp(prefix="repo-clone-")) @@ -600,9 +604,9 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b clone_url = git_url.replace("https://", f"https://oauth2:{gitlab_token}@") logger.info("Using GITLAB_TOKEN for authentication") - # Clone the repository + # Clone the repository (no --depth 1 to allow full branch operations) process = await asyncio.create_subprocess_exec( - "git", "clone", "--branch", branch, "--single-branch", "--depth", "1", + "git", "clone", "--branch", branch, "--single-branch", clone_url, str(temp_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE @@ -617,20 +621,41 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b if gitlab_token: error_msg = error_msg.replace(gitlab_token, "***REDACTED***") logger.error(f"Failed to clone repo: {error_msg}") - return False, "" - - logger.info("Clone successful, moving to final location...") + return False, "", False + + logger.info("Clone successful, creating feature branch...") + + # Create and checkout feature branch: ambient/ + session_id = os.getenv("AGENTIC_SESSION_NAME", "").strip() + if session_id: + feature_branch = f"ambient/{session_id}" + checkout_process = await asyncio.create_subprocess_exec( + "git", "checkout", "-b", feature_branch, + cwd=str(temp_dir), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + checkout_stdout, checkout_stderr = await checkout_process.communicate() + + if checkout_process.returncode != 0: + logger.warning(f"Failed to create feature branch '{feature_branch}': {checkout_stderr.decode()}") + # Continue anyway - repo is still usable on the original branch + else: + logger.info(f"Created and checked out feature branch: {feature_branch}") + else: + logger.warning("AGENTIC_SESSION_NAME not set, skipping feature branch creation") # Move to final location + logger.info("Moving to final location...") repo_final.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(temp_dir), str(repo_final)) logger.info(f"Repo '{name}' ready at {repo_final}") - return True, str(repo_final) + return True, str(repo_final), True # Newly cloned except Exception as e: logger.error(f"Error cloning repo: {e}") - return False, "" + return False, "", False finally: # Cleanup temp directory if it still exists if temp_dir.exists(): @@ -725,38 +750,43 @@ async def add_repo(request: Request): name = url.split("/")[-1].removesuffix(".git") # Clone the repository at runtime - success, repo_path = await clone_repo_at_runtime(url, branch, name) + success, repo_path, was_newly_cloned = await clone_repo_at_runtime(url, branch, name) if not success: raise HTTPException(status_code=500, detail=f"Failed to clone repository: {url}") - # Update REPOS_JSON env var - repos_json = os.getenv("REPOS_JSON", "[]") - try: - repos = json.loads(repos_json) if repos_json else [] - except: - repos = [] - - # Add new repo - repos.append({ - "name": name, - "input": { - "url": url, - "branch": branch - } - }) - - os.environ["REPOS_JSON"] = json.dumps(repos) - - # Reset adapter state to force reinitialization on next run - _adapter_initialized = False - adapter._first_run = True - - logger.info(f"Repo '{name}' added and cloned, adapter will reinitialize on next run") - - # Trigger a notification to Claude about the new repository - asyncio.create_task(trigger_repo_added_notification(name, url)) + # Only update state and trigger notification if repo was newly cloned + # This prevents duplicate notifications when both backend and operator call this endpoint + if was_newly_cloned: + # Update REPOS_JSON env var + repos_json = os.getenv("REPOS_JSON", "[]") + try: + repos = json.loads(repos_json) if repos_json else [] + except: + repos = [] + + # Add new repo + repos.append({ + "name": name, + "input": { + "url": url, + "branch": branch + } + }) + + os.environ["REPOS_JSON"] = json.dumps(repos) + + # Reset adapter state to force reinitialization on next run + _adapter_initialized = False + adapter._first_run = True + + logger.info(f"Repo '{name}' added and cloned, adapter will reinitialize on next run") + + # Trigger a notification to Claude about the new repository + asyncio.create_task(trigger_repo_added_notification(name, url)) + else: + logger.info(f"Repo '{name}' already existed, skipping notification (idempotent call)") - return {"message": "Repository added", "name": name, "path": repo_path} + return {"message": "Repository added", "name": name, "path": repo_path, "newly_cloned": was_newly_cloned} async def trigger_repo_added_notification(repo_name: str, repo_url: str): From 571281ca635b13767accc0702090be5363130fcc Mon Sep 17 00:00:00 2001 From: sallyom Date: Mon, 12 Jan 2026 23:38:43 -0500 Subject: [PATCH 2/2] Display currently active git branches in context & file explorer viewer and allow for multiple branches within a git repo to be added in context modal. Co-Authored-By: Claude Sonnet 4.5 Signed-off-by: sallyom --- components/backend/handlers/helpers.go | 8 + components/backend/handlers/sessions.go | 159 ++++++- components/backend/routes.go | 2 + components/backend/types/session.go | 3 + .../[sessionName]/repos/status/route.ts | 31 ++ .../accordions/repositories-accordion.tsx | 105 ++++- .../components/modals/add-context-modal.tsx | 17 +- .../[sessionName]/hooks/use-git-operations.ts | 2 +- .../[name]/sessions/[sessionName]/page.tsx | 188 ++++++--- .../frontend/src/components/file-tree.tsx | 8 + .../frontend/src/services/api/sessions.ts | 25 ++ .../src/services/queries/use-sessions.ts | 16 + .../frontend/src/types/agentic-session.ts | 5 +- components/frontend/src/types/api/sessions.ts | 9 +- .../base/crds/agenticsessions-crd.yaml | 12 + .../operator/internal/handlers/sessions.go | 81 +++- .../runners/claude-code-runner/adapter.py | 9 +- components/runners/claude-code-runner/main.py | 388 ++++++++++++++---- 18 files changed, 882 insertions(+), 186 deletions(-) create mode 100644 components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/repos/status/route.ts diff --git a/components/backend/handlers/helpers.go b/components/backend/handlers/helpers.go index c251e250..1bf4697f 100644 --- a/components/backend/handlers/helpers.go +++ b/components/backend/handlers/helpers.go @@ -48,6 +48,14 @@ func RetryWithBackoff(maxRetries int, initialDelay, maxDelay time.Duration, oper return fmt.Errorf("operation failed after %d retries: %w", maxRetries, lastErr) } +// ComputeAutoBranch generates the auto-branch name from a session name +// This is the single source of truth for auto-branch naming in the backend +// IMPORTANT: Keep pattern in sync with runner (main.py) +// Pattern: ambient/{session-name} +func ComputeAutoBranch(sessionName string) string { + return fmt.Sprintf("ambient/%s", sessionName) +} + // ValidateSecretAccess checks if the user has permission to perform the given verb on secrets // Returns an error if the user lacks the required permission // Accepts kubernetes.Interface for compatibility with dependency injection in tests diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index fe85b5f5..8f72bc92 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -409,6 +409,8 @@ func ListSessions(c *gin.Context) { session.Status = parseStatus(status) } + session.AutoBranch = ComputeAutoBranch(item.GetName()) + sessions = append(sessions, session) } @@ -553,9 +555,10 @@ func CreateSession(c *gin.Context) { timeout = *req.Timeout } - // Generate unique name + // Generate unique name (timestamp-based) + // Note: Runner will create branch as "ambient/{session-name}" timestamp := time.Now().Unix() - name := fmt.Sprintf("agentic-session-%d", timestamp) + name := fmt.Sprintf("session-%d", timestamp) // Create the custom resource // Metadata @@ -638,8 +641,11 @@ func CreateSession(c *gin.Context) { arr := make([]map[string]interface{}, 0, len(req.Repos)) for _, r := range req.Repos { m := map[string]interface{}{"url": r.URL} - if r.Branch != nil { + // Fill in branch if not provided (auto-generate from session name) + if r.Branch != nil && strings.TrimSpace(*r.Branch) != "" { m["branch"] = *r.Branch + } else { + m["branch"] = ComputeAutoBranch(name) } if r.AutoPush != nil { m["autoPush"] = *r.AutoPush @@ -722,9 +728,10 @@ func CreateSession(c *gin.Context) { // This ensures consistent behavior whether sessions are created via API or kubectl. c.JSON(http.StatusCreated, gin.H{ - "message": "Agentic session created successfully", - "name": name, - "uid": created.GetUID(), + "message": "Agentic session created successfully", + "name": name, + "uid": created.GetUID(), + "autoBranch": ComputeAutoBranch(name), }) } @@ -773,6 +780,8 @@ func GetSession(c *gin.Context) { session.Status = parseStatus(status) } + session.AutoBranch = ComputeAutoBranch(sessionName) + c.JSON(http.StatusOK, session) } @@ -1459,19 +1468,78 @@ func RemoveRepo(c *gin.Context) { repos, _ := spec["repos"].([]interface{}) filteredRepos := []interface{}{} - found := false + foundInSpec := false for _, r := range repos { rm, _ := r.(map[string]interface{}) url, _ := rm["url"].(string) if DeriveRepoFolderFromURL(url) != repoName { filteredRepos = append(filteredRepos, r) } else { - found = true + foundInSpec = true } } - if !found { - c.JSON(http.StatusNotFound, gin.H{"error": "Repository not found in session"}) + // Also check status.reconciledRepos for repos added directly to runner + // Note: status map is read-only here, not persisted back to CR + status, found, err := unstructured.NestedMap(item.Object, "status") + if !found || err != nil { + log.Printf("Failed to get status: %v", err) + status = make(map[string]interface{}) // Local empty map for safe reads + } + + reconciledRepos, found, err := unstructured.NestedSlice(status, "reconciledRepos") + if !found || err != nil { + log.Printf("Failed to get reconciledRepos: %v", err) + reconciledRepos = []interface{}{} + } + + foundInReconciled := false + for _, r := range reconciledRepos { + rm, ok := r.(map[string]interface{}) + if !ok { + continue + } + + name, found, err := unstructured.NestedString(rm, "name") + if found && err == nil && name == repoName { + foundInReconciled = true + break + } + + // Also try matching by URL + url, found, err := unstructured.NestedString(rm, "url") + if found && err == nil && DeriveRepoFolderFromURL(url) == repoName { + foundInReconciled = true + break + } + } + + // Always call runner to remove from filesystem (if session is running) + // Do this BEFORE checking if repo exists in CR, because it might only be on filesystem + phase, _, _ := unstructured.NestedString(status, "phase") + runnerRemoved := false + if phase == "Running" { + runnerURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001/repos/remove", sessionName, project) + runnerReq := map[string]string{"name": repoName} + reqBody, _ := json.Marshal(runnerReq) + resp, err := http.Post(runnerURL, "application/json", bytes.NewReader(reqBody)) + if err != nil { + log.Printf("Warning: failed to call runner /repos/remove: %v", err) + } else { + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + runnerRemoved = true + log.Printf("Runner successfully removed repo %s from filesystem", repoName) + } else { + body, _ := io.ReadAll(resp.Body) + log.Printf("Runner failed to remove repo %s (status %d): %s", repoName, resp.StatusCode, string(body)) + } + } + } + + // Allow delete if repo is in CR OR was successfully removed from runner + if !foundInSpec && !foundInReconciled && !runnerRemoved { + c.JSON(http.StatusNotFound, gin.H{"error": "Repository not found in session or runner"}) return } @@ -3069,6 +3137,77 @@ func DiffSessionRepo(c *gin.Context) { c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), bodyBytes) } +// GetReposStatus returns current status of all repositories (branches, current branch, etc.) +// GET /api/projects/:projectName/agentic-sessions/:sessionName/repos/status +func GetReposStatus(c *gin.Context) { + project := c.Param("projectName") + session := c.Param("sessionName") + + k8sClt, dynClt := GetK8sClientsForRequest(c) + if k8sClt == nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) + c.Abort() + return + } + + // Verify user has access to the session using user-scoped K8s client + // This ensures RBAC is enforced before we call the runner + gvr := GetAgenticSessionV1Alpha1Resource() + _, err := dynClt.Resource(gvr).Namespace(project).Get(context.TODO(), session, v1.GetOptions{}) + if errors.IsNotFound(err) { + c.JSON(http.StatusNotFound, gin.H{"error": "Session not found"}) + return + } + if err != nil { + log.Printf("GetReposStatus: failed to verify session access: %v", err) + c.JSON(http.StatusForbidden, gin.H{"error": "Access denied"}) + return + } + + // Call runner's /repos/status endpoint directly + // Authentication flow: + // 1. Backend validated user has access to session (above) + // 2. Backend calls runner as trusted internal service (no auth header forwarding) + // 3. Runner trusts backend's validation + // Port 8001 matches AG-UI Service defined in operator (sessions.go:1384) + // If changing this port, also update: operator containerPort, Service port, and AGUI_PORT env + runnerURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001/repos/status", session, project) + + req, err := http.NewRequestWithContext(c.Request.Context(), http.MethodGet, runnerURL, nil) + if err != nil { + log.Printf("GetReposStatus: failed to create HTTP request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create request"}) + return + } + // NOTE: Do NOT forward Authorization header to runner (matches pattern of AddWorkflow, AddRepository, RemoveRepo) + // Runner is treated as a trusted backend service; RBAC enforcement happens in backend + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + log.Printf("GetReposStatus: runner not reachable: %v", err) + // Return empty repos list instead of error for better UX + c.JSON(http.StatusOK, gin.H{"repos": []interface{}{}}) + return + } + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("GetReposStatus: failed to read response body: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to read response from runner"}) + return + } + + if resp.StatusCode != http.StatusOK { + log.Printf("GetReposStatus: runner returned status %d", resp.StatusCode) + c.JSON(http.StatusOK, gin.H{"repos": []interface{}{}}) + return + } + + c.Data(http.StatusOK, "application/json", bodyBytes) +} + // GetGitStatus returns git status for a directory in the workspace // GET /api/projects/:projectName/agentic-sessions/:sessionName/git/status?path=artifacts func GetGitStatus(c *gin.Context) { diff --git a/components/backend/routes.go b/components/backend/routes.go index a73c59a7..0561f499 100644 --- a/components/backend/routes.go +++ b/components/backend/routes.go @@ -65,6 +65,8 @@ func registerRoutes(r *gin.Engine) { projectGroup.POST("/agentic-sessions/:sessionName/workflow", handlers.SelectWorkflow) projectGroup.GET("/agentic-sessions/:sessionName/workflow/metadata", handlers.GetWorkflowMetadata) projectGroup.POST("/agentic-sessions/:sessionName/repos", handlers.AddRepo) + // NOTE: /repos/status must come BEFORE /repos/:repoName to avoid wildcard matching + projectGroup.GET("/agentic-sessions/:sessionName/repos/status", handlers.GetReposStatus) projectGroup.DELETE("/agentic-sessions/:sessionName/repos/:repoName", handlers.RemoveRepo) projectGroup.PUT("/agentic-sessions/:sessionName/displayname", handlers.UpdateSessionDisplayName) diff --git a/components/backend/types/session.go b/components/backend/types/session.go index dbf3edd5..bc2253b1 100644 --- a/components/backend/types/session.go +++ b/components/backend/types/session.go @@ -7,6 +7,9 @@ type AgenticSession struct { Metadata map[string]interface{} `json:"metadata"` Spec AgenticSessionSpec `json:"spec"` Status *AgenticSessionStatus `json:"status,omitempty"` + // Computed field: auto-generated branch name if user doesn't provide one + // IMPORTANT: Keep in sync with runner (main.py) and frontend (add-context-modal.tsx) + AutoBranch string `json:"autoBranch,omitempty"` } type AgenticSessionSpec struct { diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/repos/status/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/repos/status/route.ts new file mode 100644 index 00000000..68294013 --- /dev/null +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/repos/status/route.ts @@ -0,0 +1,31 @@ +import { BACKEND_URL } from '@/lib/config'; +import { buildForwardHeadersAsync } from '@/lib/auth'; + +export async function GET( + request: Request, + { params }: { params: Promise<{ name: string; sessionName: string }> }, +) { + const { name, sessionName } = await params; + const headers = await buildForwardHeadersAsync(request); + + const resp = await fetch( + `${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions/${encodeURIComponent(sessionName)}/repos/status`, + { + method: 'GET', + headers, + } + ); + + if (!resp.ok) { + return new Response( + JSON.stringify({ error: 'Failed to fetch repos status', repos: [] }), + { status: 200, headers: { 'Content-Type': 'application/json' } } + ); + } + + const data = await resp.text(); + return new Response(data, { + status: resp.status, + headers: { 'Content-Type': 'application/json' } + }); +} diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/accordions/repositories-accordion.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/accordions/repositories-accordion.tsx index 5724a08a..38e8524e 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/accordions/repositories-accordion.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/accordions/repositories-accordion.tsx @@ -1,14 +1,18 @@ "use client"; import { useState } from "react"; -import { GitBranch, X, Link, Loader2, CloudUpload } from "lucide-react"; +import { GitBranch, X, Link, Loader2, CloudUpload, ChevronDown, ChevronRight } from "lucide-react"; import { AccordionItem, AccordionTrigger, AccordionContent } from "@/components/ui/accordion"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; type Repository = { url: string; - branch?: string; + name?: string; + branch?: string; // DEPRECATED: Use currentActiveBranch instead + branches?: string[]; // All local branches available + currentActiveBranch?: string; // Currently checked out branch + defaultBranch?: string; // Default branch of remote }; type UploadedFile = { @@ -34,6 +38,7 @@ export function RepositoriesAccordion({ }: RepositoriesAccordionProps) { const [removingRepo, setRemovingRepo] = useState(null); const [removingFile, setRemovingFile] = useState(null); + const [expandedRepos, setExpandedRepos] = useState>(new Set()); const totalContextItems = repositories.length + uploadedFiles.length; @@ -95,29 +100,87 @@ export function RepositoriesAccordion({
{/* Repositories */} {repositories.map((repo, idx) => { - const repoName = repo.url.split('/').pop()?.replace('.git', '') || `repo-${idx}`; + const repoName = repo.name || repo.url.split('/').pop()?.replace('.git', '') || `repo-${idx}`; const isRemoving = removingRepo === repoName; + const isExpanded = expandedRepos.has(repoName); + const currentBranch = repo.currentActiveBranch || repo.branch; + const hasBranches = repo.branches && repo.branches.length > 0; + + const toggleExpanded = () => { + setExpandedRepos(prev => { + const next = new Set(prev); + if (next.has(repoName)) { + next.delete(repoName); + } else { + next.add(repoName); + } + return next; + }); + }; return ( -
- -
-
{repoName}
-
{repo.url}
-
- )} - + {!hasBranches &&
} + +
+
+
{repoName}
+ {currentBranch && ( + + {currentBranch} + + )} +
+
{repo.url}
+
+ +
+ + {/* Expandable branches list */} + {isExpanded && hasBranches && ( +
+
Available branches:
+ {repo.branches!.map((branch, branchIdx) => ( +
+ + {branch} + {branch === currentBranch && ( + + active + + )} +
+ ))} +
+ )}
); })} diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/modals/add-context-modal.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/modals/add-context-modal.tsx index 19e36d92..9a97869e 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/modals/add-context-modal.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/components/modals/add-context-modal.tsx @@ -16,6 +16,7 @@ type AddContextModalProps = { onAddRepository: (url: string, branch: string, autoPush?: boolean) => Promise; onUploadFile?: () => void; isLoading?: boolean; + autoBranch?: string; // Auto-generated branch from backend (single source of truth) }; export function AddContextModal({ @@ -24,25 +25,28 @@ export function AddContextModal({ onAddRepository, onUploadFile, isLoading = false, + autoBranch, }: AddContextModalProps) { const [contextUrl, setContextUrl] = useState(""); - const [contextBranch, setContextBranch] = useState("main"); + const [contextBranch, setContextBranch] = useState(""); // Empty = use auto-generated branch const [autoPush, setAutoPush] = useState(false); const handleSubmit = async () => { if (!contextUrl.trim()) return; - await onAddRepository(contextUrl.trim(), contextBranch.trim() || 'main', autoPush); + // Use autoBranch from backend (single source of truth), or empty to let runner auto-generate + const defaultBranch = autoBranch || ''; + await onAddRepository(contextUrl.trim(), contextBranch.trim() || defaultBranch, autoPush); // Reset form setContextUrl(""); - setContextBranch("main"); + setContextBranch(""); setAutoPush(false); }; const handleCancel = () => { setContextUrl(""); - setContextBranch("main"); + setContextBranch(""); setAutoPush(false); onOpenChange(false); }; @@ -82,12 +86,13 @@ export function AddContextModal({ setContextBranch(e.target.value)} />

- Leave empty to use the default branch + If left empty, a unique feature branch will be created for this session

diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-git-operations.ts b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-git-operations.ts index f23551bd..c70f5c91 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-git-operations.ts +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-git-operations.ts @@ -37,7 +37,7 @@ export function useGitOperations({ sessionName, path: directoryPath, remoteUrl: remoteUrl.trim(), - branch: branch.trim() || "main", + branch: branch.trim(), }); successToast("Remote configured successfully"); diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index 207bdf28..355b08ae 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -69,7 +69,7 @@ import { useFileOperations } from "./hooks/use-file-operations"; import { useSessionQueue } from "@/hooks/use-session-queue"; import type { DirectoryOption, DirectoryRemote } from "./lib/types"; -import type { MessageObject, ToolUseMessages, HierarchicalToolMessage } from "@/types/agentic-session"; +import type { MessageObject, ToolUseMessages, HierarchicalToolMessage, ReconciledRepo, SessionRepo } from "@/types/agentic-session"; import type { AGUIToolCall } from "@/types/agui"; // AG-UI streaming @@ -81,6 +81,7 @@ import { useStopSession, useDeleteSession, useContinueSession, + useReposStatus, } from "@/services/queries"; import { useWorkspaceList, @@ -190,6 +191,13 @@ export default function ProjectSessionDetailPage({ // Extract phase for sidebar state management const phase = session?.status?.phase || "Pending"; + // Fetch repos status directly from runner (real-time branch info) + const { data: reposStatus } = useReposStatus( + projectName, + sessionName, + phase === "Running" // Only poll when session is running + ); + // AG-UI streaming hook - replaces useSessionMessages and useSendChatMessage // Note: autoConnect is intentionally false to avoid SSR hydration mismatch // Connection is triggered manually in useEffect after client hydration @@ -500,7 +508,7 @@ export default function ProjectSessionDetailPage({ // Git operations for selected directory const currentRemote = directoryRemotes[selectedDirectory.path]; - + // Removed: mergeStatus and remoteBranches - agent handles all git operations now // Git operations hook @@ -511,6 +519,26 @@ export default function ProjectSessionDetailPage({ remoteBranch: currentRemote?.branch || "main", }); + // Get repo info from reposStatus for repo-type directories + const repoInfo = selectedDirectory.type === "repo" + ? reposStatus?.repos?.find((r) => r.name === selectedDirectory.name) + : undefined; + + // Get current branch for selected directory (use real-time reposStatus for repos) + const currentBranch = selectedDirectory.type === "repo" + ? repoInfo?.currentActiveBranch || gitOps.gitStatus?.branch || "main" + : gitOps.gitStatus?.branch || "main"; + + // Get hasRemote status for selected directory (use real-time reposStatus for repos) + const hasRemote = selectedDirectory.type === "repo" + ? !!repoInfo?.url + : gitOps.gitStatus?.hasRemote ?? false; + + // Get remote URL for selected directory (use real-time reposStatus for repos) + const remoteUrl = selectedDirectory.type === "repo" + ? repoInfo?.url + : gitOps.gitStatus?.remoteUrl; + // File operations for directory explorer const fileOps = useFileOperations({ projectName, @@ -605,17 +633,27 @@ export default function ProjectSessionDetailPage({ { type: "file-uploads", name: "File Uploads", path: "file-uploads" }, ]; - if (session?.spec?.repos) { - session.spec.repos.forEach((repo, idx) => { - const repoName = repo.url.split('/').pop()?.replace('.git', '') || `repo-${idx}`; - // Repos are cloned to /workspace/repos/{name} - options.push({ - type: "repo", - name: repoName, - path: `repos/${repoName}`, - }); + // Use real-time repos status from runner when available, otherwise fall back to CR status + const reposToDisplay = reposStatus?.repos || session?.status?.reconciledRepos || session?.spec?.repos || []; + + // Deduplicate repos by name - only show one entry per repo directory + const seenRepos = new Set(); + reposToDisplay.forEach((repo: ReconciledRepo | SessionRepo) => { + const repoName = ('name' in repo ? repo.name : undefined) || repo.url?.split('/').pop()?.replace('.git', '') || 'repo'; + + // Skip if we've already added this repo + if (seenRepos.has(repoName)) { + return; + } + seenRepos.add(repoName); + + // Repos are cloned to /workspace/repos/{name} + options.push({ + type: "repo", + name: repoName, + path: `repos/${repoName}`, }); - } + }); if (workflowManagement.activeWorkflow && session?.spec?.activeWorkflow) { const workflowName = @@ -631,7 +669,7 @@ export default function ProjectSessionDetailPage({ } return options; - }, [session, workflowManagement.activeWorkflow]); + }, [session, workflowManagement.activeWorkflow, reposStatus]); // Workflow change handler const handleWorkflowChange = (value: string) => { @@ -1533,7 +1571,7 @@ export default function ProjectSessionDetailPage({ /> ({ name: f.name, path: f.path, @@ -1628,34 +1666,68 @@ export default function ProjectSessionDetailPage({ if (option) setSelectedDirectory(option); }} > - - + +
+ +
- {directoryOptions.map((opt) => ( - -
- {opt.type === "artifacts" && ( - - )} - {opt.type === "file-uploads" && ( - - )} - {opt.type === "repo" && ( - - )} - {opt.type === "workflow" && ( - - )} - - {opt.name} - -
-
- ))} + {directoryOptions.map((opt) => { + // Find branch info for repo directories from real-time status + let branchName: string | undefined; + if (opt.type === "repo") { + // Extract repo name from path (repos/repoName -> repoName) + const repoName = opt.path.replace(/^repos\//, ""); + + // Try real-time repos status first + const realtimeRepo = reposStatus?.repos?.find( + (r) => r.name === repoName + ); + + // Fall back to CR status + const reconciledRepo = session?.status?.reconciledRepos?.find( + (r: ReconciledRepo) => { + const rName = r.name || r.url?.split("/").pop()?.replace(".git", ""); + return rName === repoName; + } + ); + + branchName = realtimeRepo?.currentActiveBranch + || reconciledRepo?.currentActiveBranch + || reconciledRepo?.branch; + } + + return ( + +
+ {opt.type === "artifacts" && ( + + )} + {opt.type === "file-uploads" && ( + + )} + {opt.type === "repo" && ( + + )} + {opt.type === "workflow" && ( + + )} + + {opt.name} + + {branchName && ( + + {branchName} + + )} +
+
+ ); + })}
@@ -1769,14 +1841,21 @@ export default function ProjectSessionDetailPage({ ) : ( ({ - name: item.name, - path: item.path, - type: item.isDir ? "folder" : "file", - sizeKb: item.size - ? item.size / 1024 - : undefined, - }), + (item): FileTreeNode => { + const node: FileTreeNode = { + name: item.name, + path: item.path, + type: item.isDir ? "folder" : "file", + sizeKb: item.size + ? item.size / 1024 + : undefined, + }; + + // Don't add branch badges to individual files/folders + // The branch is already shown in the directory selector dropdown + + return node; + }, )} onSelect={fileOps.handleFileOrFolderSelect} /> @@ -1810,13 +1889,13 @@ export default function ProjectSessionDetailPage({

No git repository. Ask the agent to initialize git if needed.

- ) : !gitOps.gitStatus?.hasRemote ? ( + ) : !hasRemote ? ( /* State 2: Has Git, No Remote */
- {gitOps.gitStatus?.branch || "main"} + {currentBranch} (local only)
@@ -1838,19 +1917,19 @@ export default function ProjectSessionDetailPage({
- {gitOps.gitStatus?.remoteUrl + {remoteUrl ?.split("/") .slice(-2) .join("/") .replace(".git", "") || ""}
- - {/* Branch Tracking - only show arrow if different */} + + {/* Branch Tracking */}
- {gitOps.gitStatus?.branch || "main"} + {currentBranch}
@@ -1937,6 +2016,7 @@ export default function ProjectSessionDetailPage({ }} onUploadFile={() => setUploadModalOpen(true)} isLoading={addRepoMutation.isPending} + autoBranch={session?.autoBranch} /> {node.name} + {node.branch && ( + + {node.branch} + + )} + {typeof node.sizeKb === "number" && ( {node.sizeKb.toFixed(1)}K )} diff --git a/components/frontend/src/services/api/sessions.ts b/components/frontend/src/services/api/sessions.ts index 48044835..610715f8 100644 --- a/components/frontend/src/services/api/sessions.ts +++ b/components/frontend/src/services/api/sessions.ts @@ -221,3 +221,28 @@ export async function getMcpStatus( `/projects/${projectName}/agentic-sessions/${sessionName}/mcp/status` ); } + +export type RepoStatus = { + url: string; + name: string; + branches: string[]; + currentActiveBranch: string; + defaultBranch: string; +}; + +export type ReposStatusResponse = { + repos: RepoStatus[]; +}; + +/** + * Get current status of all repositories (branches, current branch, etc.) + * Fetches directly from runner for real-time updates + */ +export async function getReposStatus( + projectName: string, + sessionName: string +): Promise { + return apiClient.get( + `/projects/${projectName}/agentic-sessions/${sessionName}/repos/status` + ); +} diff --git a/components/frontend/src/services/queries/use-sessions.ts b/components/frontend/src/services/queries/use-sessions.ts index 041ca377..5dc8f011 100644 --- a/components/frontend/src/services/queries/use-sessions.ts +++ b/components/frontend/src/services/queries/use-sessions.ts @@ -27,6 +27,8 @@ export const sessionKeys = { [...sessionKeys.detail(projectName, sessionName), 'messages'] as const, export: (projectName: string, sessionName: string) => [...sessionKeys.detail(projectName, sessionName), 'export'] as const, + reposStatus: (projectName: string, sessionName: string) => + [...sessionKeys.detail(projectName, sessionName), 'repos-status'] as const, }; /** @@ -322,3 +324,17 @@ export function useSessionExport(projectName: string, sessionName: string, enabl staleTime: 60000, // Cache for 1 minute }); } + +/** + * Hook to fetch repository status (branches, current branch) from runner + * Polls every 30 seconds for real-time updates + */ +export function useReposStatus(projectName: string, sessionName: string, enabled: boolean = true) { + return useQuery({ + queryKey: sessionKeys.reposStatus(projectName, sessionName), + queryFn: () => sessionsApi.getReposStatus(projectName, sessionName), + enabled: enabled && !!projectName && !!sessionName, + refetchInterval: 30000, // Poll every 30 seconds + staleTime: 25000, // Consider stale after 25 seconds + }); +} diff --git a/components/frontend/src/types/agentic-session.ts b/components/frontend/src/types/agentic-session.ts index 1c9aa5ce..b2cd6a0f 100644 --- a/components/frontend/src/types/agentic-session.ts +++ b/components/frontend/src/types/agentic-session.ts @@ -38,8 +38,11 @@ export type AgenticSessionSpec = { export type ReconciledRepo = { url: string; - branch: string; + branch: string; // DEPRECATED: Use currentActiveBranch instead name?: string; + branches?: string[]; // All local branches available + currentActiveBranch?: string; // Currently checked out branch + defaultBranch?: string; // Default branch of remote status?: "Cloning" | "Ready" | "Failed"; clonedAt?: string; }; diff --git a/components/frontend/src/types/api/sessions.ts b/components/frontend/src/types/api/sessions.ts index 69ad9115..976de296 100644 --- a/components/frontend/src/types/api/sessions.ts +++ b/components/frontend/src/types/api/sessions.ts @@ -59,8 +59,11 @@ export type AgenticSessionSpec = { export type ReconciledRepo = { url: string; - branch: string; + branch: string; // DEPRECATED: Use currentActiveBranch instead name?: string; + branches?: string[]; // All local branches available + currentActiveBranch?: string; // Currently checked out branch + defaultBranch?: string; // Default branch of remote status?: 'Cloning' | 'Ready' | 'Failed'; clonedAt?: string; }; @@ -106,6 +109,9 @@ export type AgenticSession = { }; spec: AgenticSessionSpec; status?: AgenticSessionStatus; + // Computed field from backend - auto-generated branch name + // IMPORTANT: Keep in sync with backend (sessions.go) and runner (main.py) + autoBranch?: string; }; export type CreateAgenticSessionRequest = { @@ -127,6 +133,7 @@ export type CreateAgenticSessionResponse = { message: string; name: string; uid: string; + autoBranch: string; // Auto-generated branch name (e.g., "ambient/1234567890") }; export type GetAgenticSessionResponse = { diff --git a/components/manifests/base/crds/agenticsessions-crd.yaml b/components/manifests/base/crds/agenticsessions-crd.yaml index 1e508065..3d71a15b 100644 --- a/components/manifests/base/crds/agenticsessions-crd.yaml +++ b/components/manifests/base/crds/agenticsessions-crd.yaml @@ -129,6 +129,18 @@ spec: type: string branch: type: string + description: "DEPRECATED: Use currentActiveBranch instead" + branches: + type: array + description: "All local branches available in this repository" + items: + type: string + currentActiveBranch: + type: string + description: "Currently checked out branch (polled from filesystem)" + defaultBranch: + type: string + description: "Default branch of the remote repository (e.g., main, master)" name: type: string status: diff --git a/components/operator/internal/handlers/sessions.go b/components/operator/internal/handlers/sessions.go index 76a71c8b..28812b3a 100644 --- a/components/operator/internal/handlers/sessions.go +++ b/components/operator/internal/handlers/sessions.go @@ -909,6 +909,8 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error { {Name: "SESSION_ID", Value: name}, {Name: "WORKSPACE_PATH", Value: "/workspace"}, {Name: "ARTIFACTS_DIR", Value: "artifacts"}, + // AG-UI server port (must match containerPort and Service) + {Name: "AGUI_PORT", Value: "8001"}, // Google MCP credentials directory for workspace-mcp server (writable workspace location) {Name: "GOOGLE_MCP_CREDENTIALS_DIR", Value: "/workspace/.google_workspace_mcp/credentials"}, // Google OAuth client credentials for workspace-mcp @@ -1423,8 +1425,10 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ return nil } - // Parse spec repos - specRepos := make([]map[string]string, 0, len(repoSlice)) + // Parse spec repos and deduplicate by URL (keep last occurrence) + // When the same repo URL appears multiple times with different branches, + // we only keep one entry (the last one) since we only have one physical clone + specReposMap := make(map[string]map[string]string) for _, entry := range repoSlice { if repoMap, ok := entry.(map[string]interface{}); ok { url, _ := repoMap["url"].(string) @@ -1435,13 +1439,20 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ if b, ok := repoMap["branch"].(string); ok && strings.TrimSpace(b) != "" { branch = b } - specRepos = append(specRepos, map[string]string{ + // Use URL as key to deduplicate - last branch wins + specReposMap[url] = map[string]string{ "url": url, "branch": branch, - }) + } } } + // Convert map back to slice + specRepos := make([]map[string]string, 0, len(specReposMap)) + for _, repo := range specReposMap { + specRepos = append(specRepos, repo) + } + // Get current reconciled repos from status status, _, _ := unstructured.NestedMap(session.Object, "status") reconciledReposRaw, _, _ := unstructured.NestedSlice(status, "reconciledRepos") @@ -1459,16 +1470,22 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ } } - // Detect drift: repos added or removed + // Detect drift: repos added, removed, or branch changed toAdd := []map[string]string{} toRemove := []map[string]string{} + branchChanged := []map[string]string{} // Find repos in spec but not in reconciled (need to add) + // Also detect branch changes for existing repos for _, specRepo := range specRepos { found := false for _, reconciledRepo := range reconciledRepos { if specRepo["url"] == reconciledRepo["url"] { found = true + // Check if branch changed + if specRepo["branch"] != reconciledRepo["branch"] { + branchChanged = append(branchChanged, specRepo) + } break } } @@ -1491,7 +1508,7 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ } } - if len(toAdd) == 0 && len(toRemove) == 0 { + if len(toAdd) == 0 && len(toRemove) == 0 && len(branchChanged) == 0 { return nil } @@ -1499,7 +1516,7 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ // Runner will restart Claude SDK client with new repo configuration runnerBaseURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001", sessionName, sessionNamespace) - // Add repos + // Add new repos for _, repo := range toAdd { repoName := deriveRepoNameFromURL(repo["url"]) @@ -1530,6 +1547,38 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ } } + // Handle branch changes for existing repos (checkout different branch) + for _, repo := range branchChanged { + repoName := deriveRepoNameFromURL(repo["url"]) + log.Printf("[Reconcile] Branch changed for repo '%s' to '%s', checking out new branch", repoName, repo["branch"]) + + payload := map[string]interface{}{ + "url": repo["url"], + "branch": repo["branch"], + "name": repoName, + } + payloadBytes, _ := json.Marshal(payload) + + req, err := http.NewRequest("POST", runnerBaseURL+"/repos/add", bytes.NewReader(payloadBytes)) + if err != nil { + log.Printf("[Reconcile] Failed to create branch change request: %v", err) + continue + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + log.Printf("[Reconcile] Failed to change branch via runner: %v", err) + continue + } + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("[Reconcile] Runner returned %d for branch change", resp.StatusCode) + } + } + // Remove repos for _, repo := range toRemove { repoName := deriveRepoNameFromURL(repo["url"]) @@ -1559,22 +1608,25 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[ } } - // Update status to reflect the reconciled state (via statusPatch) + // Build simple reconciled status (frontend now polls runner directly for real-time branch info) reconciled := make([]interface{}, 0, len(specRepos)) for _, repo := range specRepos { - reconciled = append(reconciled, map[string]interface{}{ + repoName := deriveRepoNameFromURL(repo["url"]) + reconciledEntry := map[string]interface{}{ "url": repo["url"], - "branch": repo["branch"], - "status": "Ready", + "name": repoName, + "branch": repo["branch"], // Intended branch from spec (deprecated) "clonedAt": time.Now().UTC().Format(time.RFC3339), - }) + "status": "Ready", // Simplified - frontend polls runner for detailed status + } + reconciled = append(reconciled, reconciledEntry) } statusPatch.SetField("reconciledRepos", reconciled) statusPatch.AddCondition(conditionUpdate{ Type: conditionReposReconciled, Status: "True", Reason: "Reconciled", - Message: fmt.Sprintf("Reconciled %d repos (added: %d, removed: %d)", len(specRepos), len(toAdd), len(toRemove)), + Message: fmt.Sprintf("Reconciled %d repos (added: %d, removed: %d, branch changed: %d)", len(specRepos), len(toAdd), len(toRemove), len(branchChanged)), }) return nil @@ -2251,6 +2303,9 @@ func deriveRepoNameFromURL(repoURL string) string { return "repo" } +// pollRunnerReposStatus removed - frontend now polls runner directly via backend API +// for real-time branch information. Operator no longer needs to maintain this in CR status. + // regenerateRunnerToken provisions a fresh ServiceAccount, Role, RoleBinding, and token Secret for a session. // This is called when restarting sessions to ensure fresh tokens. func regenerateRunnerToken(sessionNamespace, sessionName string, session *unstructured.Unstructured) error { diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 96fce69f..f48f4f83 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -1234,7 +1234,14 @@ def _get_repos_config(self) -> list[dict]: # Extract simple format fields url = str(it.get('url') or '').strip() - branch = str(it.get('branch') or 'main').strip() + # Auto-generate branch from session name if not provided + branch_from_json = it.get('branch') + if branch_from_json and str(branch_from_json).strip(): + branch = str(branch_from_json).strip() + else: + # Fallback: use AGENTIC_SESSION_NAME to match backend logic + session_id = os.getenv('AGENTIC_SESSION_NAME', '').strip() + branch = f"ambient/{session_id}" if session_id else 'main' # Parse autoPush as boolean, defaulting to False for invalid types auto_push_raw = it.get('autoPush', False) auto_push = auto_push_raw if isinstance(auto_push_raw, bool) else False diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 079d4001..9bf99ca3 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -542,19 +542,84 @@ async def change_workflow(request: Request): return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} +async def get_default_branch(repo_path: str) -> str: + """ + Get the default branch of a repository with robust fallback. + + Tries multiple methods in order: + 1. symbolic-ref on origin/HEAD + 2. git remote show origin (more reliable but slower) + 3. Fallback to common defaults: main, master, develop + + Args: + repo_path: Path to the git repository + + Returns: + The default branch name + """ + # Method 1: symbolic-ref (fast but may not be set) + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_path), "symbolic-ref", "refs/remotes/origin/HEAD", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + if process.returncode == 0: + # Output is like "refs/remotes/origin/main" + default_branch = stdout.decode().strip().split("/")[-1] + if default_branch: + logger.info(f"Default branch from symbolic-ref: {default_branch}") + return default_branch + + # Method 2: remote show origin (more reliable) + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_path), "remote", "show", "origin", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + if process.returncode == 0: + # Look for line like " HEAD branch: main" + for line in stdout.decode().split("\n"): + if "HEAD branch:" in line: + default_branch = line.split(":")[-1].strip() + if default_branch and default_branch != "(unknown)": + logger.info(f"Default branch from remote show: {default_branch}") + return default_branch + + # Method 3: Try common default branch names + for candidate in ["main", "master", "develop"]: + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_path), "rev-parse", "--verify", f"origin/{candidate}", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await process.communicate() + if process.returncode == 0: + logger.info(f"Default branch found by trying common names: {candidate}") + return candidate + + # Final fallback + logger.warning("Could not determine default branch, falling back to 'main'") + return "main" + + async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[bool, str, bool]: """ - Clone a repository at runtime and create a feature branch. - - This mirrors the logic in hydrate.sh but runs when repos are added - after the pod has started. After cloning, creates and checks out a - feature branch named 'ambient/'. - + Clone a repository at runtime or add a new branch to existing repo. + + Behavior: + - If repo doesn't exist: clone it (no --single-branch to support multi-branch) + - If repo exists: fetch and checkout the new branch (idempotent) + - If branch is empty/None: auto-generate unique ambient/ branch + - If branch doesn't exist remotely: create it from default branch + + Args: git_url: Git repository URL - branch: Branch to clone + branch: Branch to checkout (or empty/None to auto-generate) name: Name for the cloned directory (derived from URL if empty) - + Returns: (success, repo_dir_path, was_newly_cloned) tuple - success: True if repo is available (either newly cloned or already existed) @@ -564,57 +629,126 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b import tempfile import shutil from pathlib import Path - + if not git_url: return False, "", False - + # Derive repo name from URL if not provided if not name: name = git_url.split("/")[-1].removesuffix(".git") - + + # Generate unique branch name if not specified (only if user didn't provide one) + # IMPORTANT: Keep in sync with backend (sessions.go) and frontend (add-context-modal.tsx) + if not branch or branch.strip() == "": + session_id = os.getenv("AGENTIC_SESSION_NAME", "").strip() or os.getenv("SESSION_ID", "unknown") + branch = f"ambient/{session_id}" + logger.info(f"No branch specified, auto-generated: {branch}") + # Repos are stored in /workspace/repos/{name} (matching hydrate.sh) workspace_path = os.getenv("WORKSPACE_PATH", "/workspace") repos_dir = Path(workspace_path) / "repos" repos_dir.mkdir(parents=True, exist_ok=True) repo_final = repos_dir / name - - logger.info(f"Cloning repo '{name}' from {git_url}@{branch}") - - # Skip if already cloned - not newly cloned + + # Build clone URL with auth token + github_token = os.getenv("GITHUB_TOKEN", "").strip() + gitlab_token = os.getenv("GITLAB_TOKEN", "").strip() + clone_url = git_url + if github_token and "github" in git_url.lower(): + clone_url = git_url.replace("https://", f"https://x-access-token:{github_token}@") + logger.info("Using GITHUB_TOKEN for authentication") + elif gitlab_token and "gitlab" in git_url.lower(): + clone_url = git_url.replace("https://", f"https://oauth2:{gitlab_token}@") + logger.info("Using GITLAB_TOKEN for authentication") + + # Case 1: Repo already exists - add new branch if repo_final.exists(): - logger.info(f"Repo '{name}' already exists at {repo_final}, skipping clone") - return True, str(repo_final), False # Already existed, not newly cloned - + logger.info(f"Repo '{name}' already exists at {repo_final}, adding branch '{branch}'") + try: + # Fetch latest refs + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_final), "fetch", "origin", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await process.communicate() + + # Try to checkout the branch + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_final), "checkout", branch, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode == 0: + logger.info(f"Checked out existing branch '{branch}'") + return True, str(repo_final), False # Already existed, not newly cloned + + # Branch doesn't exist locally, try to checkout from remote + logger.info(f"Branch '{branch}' not found locally, trying origin/{branch}") + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_final), "checkout", "-b", branch, f"origin/{branch}", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode == 0: + logger.info(f"Checked out branch '{branch}' from origin") + return True, str(repo_final), False # Already existed, not newly cloned + + # Branch doesn't exist remotely, create from default branch + logger.info(f"Branch '{branch}' not found on remote, creating from default branch") + + # Get default branch using robust detection + default_branch = await get_default_branch(str(repo_final)) + + # Checkout default branch first + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_final), "checkout", default_branch, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await process.communicate() + + # Create new branch from default + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_final), "checkout", "-b", branch, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode == 0: + logger.info(f"Created new branch '{branch}' from '{default_branch}'") + return True, str(repo_final), False # Already existed, not newly cloned + else: + logger.error(f"Failed to create branch: {stderr.decode()}") + return False, "", False + + except Exception as e: + logger.error(f"Error adding branch to existing repo: {e}") + return False, "", False + + # Case 2: Repo doesn't exist - clone it + logger.info(f"Cloning repo '{name}' from {git_url}@{branch}") + # Create temp directory for clone temp_dir = Path(tempfile.mkdtemp(prefix="repo-clone-")) - + try: - # Build git clone command with optional auth token - github_token = os.getenv("GITHUB_TOKEN", "").strip() - gitlab_token = os.getenv("GITLAB_TOKEN", "").strip() - - # Determine which token to use based on URL - clone_url = git_url - if github_token and "github" in git_url.lower(): - # Add GitHub token to URL - clone_url = git_url.replace("https://", f"https://x-access-token:{github_token}@") - logger.info("Using GITHUB_TOKEN for authentication") - elif gitlab_token and "gitlab" in git_url.lower(): - # Add GitLab token to URL - clone_url = git_url.replace("https://", f"https://oauth2:{gitlab_token}@") - logger.info("Using GITLAB_TOKEN for authentication") - - # Clone the repository (no --depth 1 to allow full branch operations) + # Clone without --single-branch to support multi-branch workflows + # No --depth=1 to allow full branch operations process = await asyncio.create_subprocess_exec( - "git", "clone", "--branch", branch, "--single-branch", + "git", "clone", clone_url, str(temp_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() - + if process.returncode != 0: - # Redact tokens from error message error_msg = stderr.decode() if github_token: error_msg = error_msg.replace(github_token, "***REDACTED***") @@ -622,42 +756,40 @@ async def clone_repo_at_runtime(git_url: str, branch: str, name: str) -> tuple[b error_msg = error_msg.replace(gitlab_token, "***REDACTED***") logger.error(f"Failed to clone repo: {error_msg}") return False, "", False - - logger.info("Clone successful, creating feature branch...") - - # Create and checkout feature branch: ambient/ - session_id = os.getenv("AGENTIC_SESSION_NAME", "").strip() - if session_id: - feature_branch = f"ambient/{session_id}" - checkout_process = await asyncio.create_subprocess_exec( - "git", "checkout", "-b", feature_branch, - cwd=str(temp_dir), + + logger.info("Clone successful, checking out requested branch...") + + # Try to checkout requested/auto-generated branch + process = await asyncio.create_subprocess_exec( + "git", "-C", str(temp_dir), "checkout", branch, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode != 0: + # Branch doesn't exist, create it from default branch + logger.info(f"Branch '{branch}' not found, creating from default branch") + process = await asyncio.create_subprocess_exec( + "git", "-C", str(temp_dir), "checkout", "-b", branch, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) - checkout_stdout, checkout_stderr = await checkout_process.communicate() - - if checkout_process.returncode != 0: - logger.warning(f"Failed to create feature branch '{feature_branch}': {checkout_stderr.decode()}") - # Continue anyway - repo is still usable on the original branch - else: - logger.info(f"Created and checked out feature branch: {feature_branch}") - else: - logger.warning("AGENTIC_SESSION_NAME not set, skipping feature branch creation") - + await process.communicate() + + # Move to final location logger.info("Moving to final location...") repo_final.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(temp_dir), str(repo_final)) - - logger.info(f"Repo '{name}' ready at {repo_final}") + + logger.info(f"Repo '{name}' ready at {repo_final} on branch '{branch}'") return True, str(repo_final), True # Newly cloned - + except Exception as e: logger.error(f"Error cloning repo: {e}") return False, "", False finally: - # Cleanup temp directory if it still exists if temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) @@ -753,7 +885,7 @@ async def add_repo(request: Request): success, repo_path, was_newly_cloned = await clone_repo_at_runtime(url, branch, name) if not success: raise HTTPException(status_code=500, detail=f"Failed to clone repository: {url}") - + # Only update state and trigger notification if repo was newly cloned # This prevents duplicate notifications when both backend and operator call this endpoint if was_newly_cloned: @@ -763,7 +895,7 @@ async def add_repo(request: Request): repos = json.loads(repos_json) if repos_json else [] except: repos = [] - + # Add new repo repos.append({ "name": name, @@ -772,20 +904,20 @@ async def add_repo(request: Request): "branch": branch } }) - + os.environ["REPOS_JSON"] = json.dumps(repos) - + # Reset adapter state to force reinitialization on next run _adapter_initialized = False adapter._first_run = True - + logger.info(f"Repo '{name}' added and cloned, adapter will reinitialize on next run") - + # Trigger a notification to Claude about the new repository asyncio.create_task(trigger_repo_added_notification(name, url)) else: logger.info(f"Repo '{name}' already existed, skipping notification (idempotent call)") - + return {"message": "Repository added", "name": name, "path": repo_path, "newly_cloned": was_newly_cloned} @@ -849,39 +981,139 @@ async def trigger_repo_added_notification(repo_name: str, repo_url: str): async def remove_repo(request: Request): """ Remove repository - triggers Claude SDK client restart. - + Accepts: {"name": "..."} """ + import shutil + from pathlib import Path + global _adapter_initialized - + if not adapter: raise HTTPException(status_code=503, detail="Adapter not initialized") - + body = await request.json() repo_name = body.get("name", "") logger.info(f"Remove repo request: {repo_name}") - + + # Delete repository from filesystem + workspace_path = os.getenv("WORKSPACE_PATH", "/workspace") + repo_path = Path(workspace_path) / "repos" / repo_name + + if repo_path.exists(): + try: + shutil.rmtree(repo_path) + logger.info(f"Deleted repository directory: {repo_path}") + except Exception as e: + logger.error(f"Failed to delete repository directory {repo_path}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to delete repository: {e}") + else: + logger.warning(f"Repository directory not found: {repo_path}") + # Update REPOS_JSON env var repos_json = os.getenv("REPOS_JSON", "[]") try: repos = json.loads(repos_json) if repos_json else [] except: repos = [] - + # Remove repo by name repos = [r for r in repos if r.get("name") != repo_name] - + os.environ["REPOS_JSON"] = json.dumps(repos) - + # Reset adapter state _adapter_initialized = False adapter._first_run = True - + logger.info(f"Repo removed, adapter will reinitialize on next run") - + return {"message": "Repository removed"} +@app.get("/repos/status") +async def get_repos_status(): + """ + Get current status of all repositories in the workspace. + + Returns for each repo: + - url: Repository URL + - name: Directory name + - branches: All local branches + - currentActiveBranch: Currently checked out branch + - defaultBranch: Default branch of remote + """ + if not adapter: + raise HTTPException(status_code=503, detail="Adapter not initialized") + + import re + from pathlib import Path + + workspace_path = os.getenv("WORKSPACE_PATH", "/workspace") + repos_dir = Path(workspace_path) / "repos" + + if not repos_dir.exists(): + return {"repos": []} + + repos_status = [] + + # Iterate through all directories in repos/ + for repo_path in repos_dir.iterdir(): + if not repo_path.is_dir() or not (repo_path / ".git").exists(): + continue + + try: + repo_name = repo_path.name + + # Get remote URL + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_path), "config", "--get", "remote.origin.url", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + repo_url = stdout.decode().strip() if process.returncode == 0 else "" + + # Strip any embedded tokens from URL before returning (security) + # Remove patterns like: https://x-access-token:TOKEN@github.com -> https://github.com + repo_url = re.sub(r'https://[^:]+:[^@]+@', 'https://', repo_url) + + # Get current active branch + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + current_branch = stdout.decode().strip() if process.returncode == 0 else "unknown" + + # Get all local branches + process = await asyncio.create_subprocess_exec( + "git", "-C", str(repo_path), "branch", "--format=%(refname:short)", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + branches = [b.strip() for b in stdout.decode().split("\n") if b.strip()] if process.returncode == 0 else [] + + # Get default branch using robust detection + default_branch = await get_default_branch(str(repo_path)) + + repos_status.append({ + "url": repo_url, + "name": repo_name, + "branches": branches, + "currentActiveBranch": current_branch, + "defaultBranch": default_branch, + }) + + except Exception as e: + logger.error(f"Error getting status for repo {repo_path}: {e}") + continue + + return {"repos": repos_status} + + @app.get("/health") async def health(): """Health check endpoint."""