diff --git a/internal/git/git.go b/internal/git/git.go index 8db063b..f71c15d 100644 --- a/internal/git/git.go +++ b/internal/git/git.go @@ -21,6 +21,7 @@ type Runner interface { BranchExists(ctx context.Context, bareRepo, branch string) (bool, error) DefaultBranch(ctx context.Context, bareRepo string) (string, error) EnsureRemoteRef(ctx context.Context, bareRepo, branch string) error + ResetBranch(ctx context.Context, worktreePath, ref string) error IsClean(ctx context.Context, worktreePath string) (bool, error) Rebase(ctx context.Context, worktreePath, onto string) error RebaseAbort(ctx context.Context, worktreePath string) error @@ -156,6 +157,13 @@ func (r *RealRunner) EnsureRemoteRef(ctx context.Context, bareRepo, branch strin "+refs/heads/"+branch+":refs/remotes/origin/"+branch) } +// ResetBranch resets the current branch in a worktree to the given ref. +// This is used to fast-forward existing worktrees to the latest remote state. +func (r *RealRunner) ResetBranch(ctx context.Context, worktreePath, ref string) error { + r.log().Debug("resetting branch", "path", worktreePath, "ref", ref) + return r.run(ctx, "-C", worktreePath, "reset", "--hard", ref) +} + // IsClean returns true if the worktree has no uncommitted changes. func (r *RealRunner) IsClean(ctx context.Context, worktreePath string) (bool, error) { r.log().Debug("checking worktree cleanliness", "path", worktreePath) diff --git a/internal/workspace/workspace.go b/internal/workspace/workspace.go index e53046c..febf90f 100644 --- a/internal/workspace/workspace.go +++ b/internal/workspace/workspace.go @@ -8,6 +8,7 @@ import ( "log/slog" "os" "path/filepath" + "sync" "time" "github.com/milldr/flow/internal/agents" @@ -178,7 +179,18 @@ func (s *Service) Resolve(idOrName string) ([]Info, error) { return matches, nil } +// repoRenderContext holds pre-computed paths for rendering a single repo. +type repoRenderContext struct { + index int + repo state.Repo + repoPath string + barePath string + worktreePath string +} + // Render materializes a workspace: ensures bare clones and creates worktrees. +// Bare repos are fetched in parallel to ensure we always have the latest remote +// state before creating or updating worktrees. // progress is called with status messages for each repo. func (s *Service) Render(ctx context.Context, id string, progress func(msg string)) error { st, err := s.Find(id) @@ -193,75 +205,47 @@ func (s *Service) Render(ctx context.Context, id string, progress func(msg strin wsDir := s.Config.WorkspacePath(id) total := len(st.Spec.Repos) + // Build render contexts for all repos + repos := make([]repoRenderContext, total) for i, repo := range st.Spec.Repos { - repoPath := state.RepoPath(repo) - barePath := s.Config.BareRepoPath(repo.URL) - worktreePath := filepath.Join(wsDir, repoPath) + repos[i] = repoRenderContext{ + index: i, + repo: repo, + repoPath: state.RepoPath(repo), + barePath: s.Config.BareRepoPath(repo.URL), + worktreePath: filepath.Join(wsDir, state.RepoPath(repo)), + } + } - progress(fmt.Sprintf("[%d/%d] %s", i+1, total, repo.URL)) + // Phase 1: Clone and fetch all bare repos in parallel. + // This ensures every bare clone has the latest remote refs before we + // create or update any worktrees. + fetchErrs := make([]error, total) + var wg sync.WaitGroup + for i := range repos { + wg.Add(1) + go func(rc *repoRenderContext) { + defer wg.Done() + fetchErrs[rc.index] = s.ensureBareRepo(ctx, rc) + }(&repos[i]) + } + wg.Wait() - // Ensure bare clone exists - if _, err := os.Stat(barePath); os.IsNotExist(err) { - s.log().Debug("bare clone not found, cloning", "url", repo.URL, "dest", barePath) - if err := os.MkdirAll(filepath.Dir(barePath), 0o755); err != nil { - return err - } - if err := s.Git.BareClone(ctx, repo.URL, barePath); err != nil { - return fmt.Errorf("cloning %s: %w", repo.URL, err) - } - // Fetch after bare clone to create remote tracking refs - // (bare clones don't create refs/remotes/origin/* by default). - if err := s.Git.Fetch(ctx, barePath); err != nil { - return fmt.Errorf("fetching %s: %w", repo.URL, err) - } - } else { - s.log().Debug("bare clone exists, fetching", "url", repo.URL, "path", barePath) - if err := s.Git.Fetch(ctx, barePath); err != nil { - return fmt.Errorf("fetching %s: %w", repo.URL, err) - } + // Check for fetch errors — fail fast on any clone/fetch failure + for i, err := range fetchErrs { + if err != nil { + return fmt.Errorf("%s: %w", repos[i].repo.URL, err) } + } - // Create worktree if it doesn't exist - if _, err := os.Stat(worktreePath); os.IsNotExist(err) { - exists, err := s.Git.BranchExists(ctx, barePath, repo.Branch) - if err != nil { - return fmt.Errorf("checking branch for %s: %w", repo.URL, err) - } + // Phase 2: Create or update worktrees (sequential — progress messages + // are order-dependent and worktree operations are fast). + for i := range repos { + rc := &repos[i] + progress(fmt.Sprintf("[%d/%d] %s", rc.index+1, total, rc.repo.URL)) - if exists { - s.log().Debug("creating worktree from existing branch", "path", worktreePath, "branch", repo.Branch) - if err := s.Git.AddWorktree(ctx, barePath, worktreePath, repo.Branch); err != nil { - return fmt.Errorf("creating worktree for %s: %w", repo.URL, err) - } - progress(fmt.Sprintf(" └── %s (%s) ✓", repoPath, repo.Branch)) - } else { - var baseBranch string - if repo.Base != "" { - baseBranch = repo.Base - } else { - baseBranch, err = s.Git.DefaultBranch(ctx, barePath) - if err != nil { - return fmt.Errorf("getting default branch for %s: %w", repo.URL, err) - } - } - // Ensure the remote tracking ref exists for the base branch so - // origin/{baseBranch} resolves (especially when baseBranch differs - // from the default branch, which Fetch only creates refs for). - if err := s.Git.EnsureRemoteRef(ctx, barePath, baseBranch); err != nil { - return fmt.Errorf("ensuring remote ref for %s: %w", repo.URL, err) - } - // Use the remote ref to ensure we branch from the latest fetched state, - // not a potentially stale local branch ref in the bare repo. - startPoint := "origin/" + baseBranch - s.log().Debug("creating worktree with new branch", "path", worktreePath, "branch", repo.Branch, "from", startPoint) - if err := s.Git.AddWorktreeNewBranch(ctx, barePath, worktreePath, repo.Branch, startPoint); err != nil { - return fmt.Errorf("creating worktree for %s: %w", repo.URL, err) - } - progress(fmt.Sprintf(" └── %s (%s, new branch from %s) ✓", repoPath, repo.Branch, baseBranch)) - } - } else { - s.log().Debug("worktree already exists, skipping", "path", worktreePath) - progress(fmt.Sprintf(" └── %s (%s) exists", repoPath, repo.Branch)) + if err := s.ensureWorktree(ctx, rc, progress); err != nil { + return err } } @@ -273,6 +257,117 @@ func (s *Service) Render(ctx context.Context, id string, progress func(msg strin return nil } +// ensureBareRepo clones (if needed) and fetches a bare repository. +func (s *Service) ensureBareRepo(ctx context.Context, rc *repoRenderContext) error { + if _, err := os.Stat(rc.barePath); os.IsNotExist(err) { + s.log().Debug("bare clone not found, cloning", "url", rc.repo.URL, "dest", rc.barePath) + if err := os.MkdirAll(filepath.Dir(rc.barePath), 0o755); err != nil { + return err + } + if err := s.Git.BareClone(ctx, rc.repo.URL, rc.barePath); err != nil { + return fmt.Errorf("cloning: %w", err) + } + } + + s.log().Debug("fetching bare repo", "url", rc.repo.URL, "path", rc.barePath) + if err := s.Git.Fetch(ctx, rc.barePath); err != nil { + return fmt.Errorf("fetching: %w", err) + } + return nil +} + +// ensureWorktree creates a new worktree or updates an existing one to the +// latest remote state. +func (s *Service) ensureWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error { + if _, err := os.Stat(rc.worktreePath); os.IsNotExist(err) { + return s.createWorktree(ctx, rc, progress) + } + return s.updateWorktree(ctx, rc, progress) +} + +// createWorktree creates a new worktree, either from an existing branch or +// by creating a new branch from the base. +func (s *Service) createWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error { + exists, err := s.Git.BranchExists(ctx, rc.barePath, rc.repo.Branch) + if err != nil { + return fmt.Errorf("checking branch for %s: %w", rc.repo.URL, err) + } + + if exists { + s.log().Debug("creating worktree from existing branch", "path", rc.worktreePath, "branch", rc.repo.Branch) + if err := s.Git.AddWorktree(ctx, rc.barePath, rc.worktreePath, rc.repo.Branch); err != nil { + return fmt.Errorf("creating worktree for %s: %w", rc.repo.URL, err) + } + progress(fmt.Sprintf(" └── %s (%s) ✓", rc.repoPath, rc.repo.Branch)) + return nil + } + + baseBranch, err := s.resolveBaseBranch(ctx, rc) + if err != nil { + return err + } + + if err := s.Git.EnsureRemoteRef(ctx, rc.barePath, baseBranch); err != nil { + return fmt.Errorf("ensuring remote ref for %s: %w", rc.repo.URL, err) + } + + startPoint := "origin/" + baseBranch + s.log().Debug("creating worktree with new branch", "path", rc.worktreePath, "branch", rc.repo.Branch, "from", startPoint) + if err := s.Git.AddWorktreeNewBranch(ctx, rc.barePath, rc.worktreePath, rc.repo.Branch, startPoint); err != nil { + return fmt.Errorf("creating worktree for %s: %w", rc.repo.URL, err) + } + progress(fmt.Sprintf(" └── %s (%s, new branch from %s) ✓", rc.repoPath, rc.repo.Branch, baseBranch)) + return nil +} + +// updateWorktree resets an existing worktree to the latest remote ref for its +// branch so that re-rendering always picks up new upstream commits. +func (s *Service) updateWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error { + // Ensure the remote tracking ref exists for this branch so we can + // check if the branch exists on the remote. + if err := s.Git.EnsureRemoteRef(ctx, rc.barePath, rc.repo.Branch); err != nil { + // Branch doesn't exist on remote — this is a local-only feature + // branch. Leave it alone. + s.log().Debug("worktree exists, no remote branch to update from", "path", rc.worktreePath, "branch", rc.repo.Branch) + progress(fmt.Sprintf(" └── %s (%s) exists", rc.repoPath, rc.repo.Branch)) + return nil + } + + // Check if the worktree is clean before resetting + clean, err := s.Git.IsClean(ctx, rc.worktreePath) + if err != nil { + return fmt.Errorf("checking worktree status for %s: %w", rc.repo.URL, err) + } + + if !clean { + s.log().Debug("worktree is dirty, skipping update", "path", rc.worktreePath) + progress(fmt.Sprintf(" └── %s (%s) exists (dirty, skipped update)", rc.repoPath, rc.repo.Branch)) + return nil + } + + // Reset to the latest remote ref + ref := "origin/" + rc.repo.Branch + s.log().Debug("updating worktree to latest remote", "path", rc.worktreePath, "ref", ref) + if err := s.Git.ResetBranch(ctx, rc.worktreePath, ref); err != nil { + return fmt.Errorf("updating worktree for %s: %w", rc.repo.URL, err) + } + + progress(fmt.Sprintf(" └── %s (%s) updated ✓", rc.repoPath, rc.repo.Branch)) + return nil +} + +// resolveBaseBranch returns the base branch for creating new feature branches. +func (s *Service) resolveBaseBranch(ctx context.Context, rc *repoRenderContext) (string, error) { + if rc.repo.Base != "" { + return rc.repo.Base, nil + } + baseBranch, err := s.Git.DefaultBranch(ctx, rc.barePath) + if err != nil { + return "", fmt.Errorf("getting default branch for %s: %w", rc.repo.URL, err) + } + return baseBranch, nil +} + // Sync fetches and rebases worktrees onto their base branches. // It continues through failures — one repo failing doesn't block others. func (s *Service) Sync(ctx context.Context, id string, progress func(msg string)) error { diff --git a/internal/workspace/workspace_test.go b/internal/workspace/workspace_test.go index bf61b6a..d211539 100644 --- a/internal/workspace/workspace_test.go +++ b/internal/workspace/workspace_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "github.com/milldr/flow/internal/agents" @@ -14,13 +15,16 @@ import ( ) // mockRunner records calls without executing git. +// All slice fields are guarded by mu for concurrent access during parallel render. type mockRunner struct { + mu sync.Mutex clones []string fetches []string worktrees []string removed []string startPoints []string remoteRefs []string + resets []string rebases []string aborts []string @@ -30,44 +34,61 @@ type mockRunner struct { branchExists bool isClean bool rebaseErr error + resetErr error } func (m *mockRunner) BareClone(_ context.Context, url, dest string) error { + m.mu.Lock() m.clones = append(m.clones, url) - if m.cloneErr != nil { - return m.cloneErr + cloneErr := m.cloneErr + m.mu.Unlock() + if cloneErr != nil { + return cloneErr } return os.MkdirAll(dest, 0o755) // create dir so stat checks pass } func (m *mockRunner) Fetch(_ context.Context, repoPath string) error { + m.mu.Lock() m.fetches = append(m.fetches, repoPath) - return m.fetchErr + fetchErr := m.fetchErr + m.mu.Unlock() + return fetchErr } func (m *mockRunner) AddWorktree(_ context.Context, _, worktreePath, _ string) error { + m.mu.Lock() m.worktrees = append(m.worktrees, worktreePath) - if m.addWTErr != nil { - return m.addWTErr + addWTErr := m.addWTErr + m.mu.Unlock() + if addWTErr != nil { + return addWTErr } return os.MkdirAll(worktreePath, 0o755) } func (m *mockRunner) AddWorktreeNewBranch(_ context.Context, _, worktreePath, _, startPoint string) error { + m.mu.Lock() m.startPoints = append(m.startPoints, startPoint) m.worktrees = append(m.worktrees, worktreePath) - if m.addWTErr != nil { - return m.addWTErr + addWTErr := m.addWTErr + m.mu.Unlock() + if addWTErr != nil { + return addWTErr } return os.MkdirAll(worktreePath, 0o755) } func (m *mockRunner) RemoveWorktree(_ context.Context, _, worktreePath string) error { + m.mu.Lock() m.removed = append(m.removed, worktreePath) + m.mu.Unlock() return os.RemoveAll(worktreePath) } func (m *mockRunner) BranchExists(_ context.Context, _, _ string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() return m.branchExists, nil } @@ -76,21 +97,38 @@ func (m *mockRunner) DefaultBranch(_ context.Context, _ string) (string, error) } func (m *mockRunner) EnsureRemoteRef(_ context.Context, _, branch string) error { + m.mu.Lock() m.remoteRefs = append(m.remoteRefs, branch) + m.mu.Unlock() return nil } +func (m *mockRunner) ResetBranch(_ context.Context, _, ref string) error { + m.mu.Lock() + m.resets = append(m.resets, ref) + resetErr := m.resetErr + m.mu.Unlock() + return resetErr +} + func (m *mockRunner) IsClean(_ context.Context, _ string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() return m.isClean, nil } func (m *mockRunner) Rebase(_ context.Context, _, onto string) error { + m.mu.Lock() m.rebases = append(m.rebases, onto) - return m.rebaseErr + rebaseErr := m.rebaseErr + m.mu.Unlock() + return rebaseErr } func (m *mockRunner) RebaseAbort(_ context.Context, worktreePath string) error { + m.mu.Lock() m.aborts = append(m.aborts, worktreePath) + m.mu.Unlock() return nil } @@ -227,10 +265,13 @@ func TestRender(t *testing.T) { t.Errorf("worktrees = %d, want 2", len(mock.worktrees)) } - // Second render should fetch, not clone + // Second render should fetch (not clone) and update existing worktrees mock.clones = nil mock.fetches = nil mock.worktrees = nil + mock.remoteRefs = nil + mock.resets = nil + mock.isClean = true err = svc.Render(ctx, "render-ws", noop) if err != nil { t.Fatalf("Render (2nd): %v", err) @@ -244,6 +285,13 @@ func TestRender(t *testing.T) { if len(mock.worktrees) != 0 { t.Errorf("second render worktrees = %d, want 0 (already exist)", len(mock.worktrees)) } + // Existing worktrees should be updated via EnsureRemoteRef + ResetBranch + if len(mock.remoteRefs) != 2 { + t.Errorf("second render remoteRefs = %d, want 2", len(mock.remoteRefs)) + } + if len(mock.resets) != 2 { + t.Errorf("second render resets = %d, want 2", len(mock.resets)) + } } func TestDelete(t *testing.T) { @@ -378,6 +426,76 @@ func TestListMultipleWorkspaces(t *testing.T) { } } +func TestRenderExistingWorktreeDirtySkip(t *testing.T) { + svc, mock := testService(t) + ctx := context.Background() + + st := state.NewState("Dirty skip test", "Dirty skip test", []state.Repo{ + {URL: "github.com/org/repo", Branch: "main", Path: "./repo"}, + }) + if err := svc.Create("dirty-ws", st); err != nil { + t.Fatal(err) + } + + // First render creates worktree + if err := svc.Render(ctx, "dirty-ws", noop); err != nil { + t.Fatal(err) + } + + // Second render with dirty worktree should skip reset + mock.resets = nil + mock.isClean = false + var messages []string + err := svc.Render(ctx, "dirty-ws", func(msg string) { messages = append(messages, msg) }) + if err != nil { + t.Fatalf("Render (dirty): %v", err) + } + if len(mock.resets) != 0 { + t.Errorf("resets = %d, want 0 (dirty worktree should skip)", len(mock.resets)) + } + // Check progress message mentions dirty + found := false + for _, msg := range messages { + if strings.Contains(msg, "dirty") { + found = true + break + } + } + if !found { + t.Error("expected progress message mentioning dirty worktree") + } +} + +func TestRenderParallelFetch(t *testing.T) { + svc, mock := testService(t) + ctx := context.Background() + + st := state.NewState("Parallel fetch", "Parallel fetch test", []state.Repo{ + {URL: "github.com/org/repo-a", Branch: "main", Path: "./repo-a"}, + {URL: "github.com/org/repo-b", Branch: "main", Path: "./repo-b"}, + {URL: "github.com/org/repo-c", Branch: "main", Path: "./repo-c"}, + }) + if err := svc.Create("parallel-ws", st); err != nil { + t.Fatal(err) + } + + err := svc.Render(ctx, "parallel-ws", noop) + if err != nil { + t.Fatalf("Render: %v", err) + } + + // All repos should have been cloned and fetched + if len(mock.clones) != 3 { + t.Errorf("clones = %d, want 3", len(mock.clones)) + } + if len(mock.fetches) != 3 { + t.Errorf("fetches = %d, want 3", len(mock.fetches)) + } + if len(mock.worktrees) != 3 { + t.Errorf("worktrees = %d, want 3", len(mock.worktrees)) + } +} + func TestRenderCloneError(t *testing.T) { svc, mock := testService(t) ctx := context.Background()