Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Runner interface {
BranchExists(ctx context.Context, bareRepo, branch string) (bool, error)
DefaultBranch(ctx context.Context, bareRepo string) (string, error)
EnsureRemoteRef(ctx context.Context, bareRepo, branch string) error
ResetBranch(ctx context.Context, worktreePath, ref string) error
IsClean(ctx context.Context, worktreePath string) (bool, error)
Rebase(ctx context.Context, worktreePath, onto string) error
RebaseAbort(ctx context.Context, worktreePath string) error
Expand Down Expand Up @@ -156,6 +157,13 @@ func (r *RealRunner) EnsureRemoteRef(ctx context.Context, bareRepo, branch strin
"+refs/heads/"+branch+":refs/remotes/origin/"+branch)
}

// ResetBranch resets the current branch in a worktree to the given ref.
// This is used to fast-forward existing worktrees to the latest remote state.
func (r *RealRunner) ResetBranch(ctx context.Context, worktreePath, ref string) error {
r.log().Debug("resetting branch", "path", worktreePath, "ref", ref)
return r.run(ctx, "-C", worktreePath, "reset", "--hard", ref)
}

// IsClean returns true if the worktree has no uncommitted changes.
func (r *RealRunner) IsClean(ctx context.Context, worktreePath string) (bool, error) {
r.log().Debug("checking worktree cleanliness", "path", worktreePath)
Expand Down
221 changes: 158 additions & 63 deletions internal/workspace/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"os"
"path/filepath"
"sync"
"time"

"github.com/milldr/flow/internal/agents"
Expand Down Expand Up @@ -178,7 +179,18 @@ func (s *Service) Resolve(idOrName string) ([]Info, error) {
return matches, nil
}

// repoRenderContext holds pre-computed paths for rendering a single repo.
type repoRenderContext struct {
index int
repo state.Repo
repoPath string
barePath string
worktreePath string
}

// Render materializes a workspace: ensures bare clones and creates worktrees.
// Bare repos are fetched in parallel to ensure we always have the latest remote
// state before creating or updating worktrees.
// progress is called with status messages for each repo.
func (s *Service) Render(ctx context.Context, id string, progress func(msg string)) error {
st, err := s.Find(id)
Expand All @@ -193,75 +205,47 @@ func (s *Service) Render(ctx context.Context, id string, progress func(msg strin
wsDir := s.Config.WorkspacePath(id)
total := len(st.Spec.Repos)

// Build render contexts for all repos
repos := make([]repoRenderContext, total)
for i, repo := range st.Spec.Repos {
repoPath := state.RepoPath(repo)
barePath := s.Config.BareRepoPath(repo.URL)
worktreePath := filepath.Join(wsDir, repoPath)
repos[i] = repoRenderContext{
index: i,
repo: repo,
repoPath: state.RepoPath(repo),
barePath: s.Config.BareRepoPath(repo.URL),
worktreePath: filepath.Join(wsDir, state.RepoPath(repo)),
}
}

progress(fmt.Sprintf("[%d/%d] %s", i+1, total, repo.URL))
// Phase 1: Clone and fetch all bare repos in parallel.
// This ensures every bare clone has the latest remote refs before we
// create or update any worktrees.
fetchErrs := make([]error, total)
var wg sync.WaitGroup
for i := range repos {
wg.Add(1)
go func(rc *repoRenderContext) {
defer wg.Done()
fetchErrs[rc.index] = s.ensureBareRepo(ctx, rc)
}(&repos[i])
}
wg.Wait()

// Ensure bare clone exists
if _, err := os.Stat(barePath); os.IsNotExist(err) {
s.log().Debug("bare clone not found, cloning", "url", repo.URL, "dest", barePath)
if err := os.MkdirAll(filepath.Dir(barePath), 0o755); err != nil {
return err
}
if err := s.Git.BareClone(ctx, repo.URL, barePath); err != nil {
return fmt.Errorf("cloning %s: %w", repo.URL, err)
}
// Fetch after bare clone to create remote tracking refs
// (bare clones don't create refs/remotes/origin/* by default).
if err := s.Git.Fetch(ctx, barePath); err != nil {
return fmt.Errorf("fetching %s: %w", repo.URL, err)
}
} else {
s.log().Debug("bare clone exists, fetching", "url", repo.URL, "path", barePath)
if err := s.Git.Fetch(ctx, barePath); err != nil {
return fmt.Errorf("fetching %s: %w", repo.URL, err)
}
// Check for fetch errors — fail fast on any clone/fetch failure
for i, err := range fetchErrs {
if err != nil {
return fmt.Errorf("%s: %w", repos[i].repo.URL, err)
}
}

// Create worktree if it doesn't exist
if _, err := os.Stat(worktreePath); os.IsNotExist(err) {
exists, err := s.Git.BranchExists(ctx, barePath, repo.Branch)
if err != nil {
return fmt.Errorf("checking branch for %s: %w", repo.URL, err)
}
// Phase 2: Create or update worktrees (sequential — progress messages
// are order-dependent and worktree operations are fast).
for i := range repos {
rc := &repos[i]
progress(fmt.Sprintf("[%d/%d] %s", rc.index+1, total, rc.repo.URL))

if exists {
s.log().Debug("creating worktree from existing branch", "path", worktreePath, "branch", repo.Branch)
if err := s.Git.AddWorktree(ctx, barePath, worktreePath, repo.Branch); err != nil {
return fmt.Errorf("creating worktree for %s: %w", repo.URL, err)
}
progress(fmt.Sprintf(" └── %s (%s) ✓", repoPath, repo.Branch))
} else {
var baseBranch string
if repo.Base != "" {
baseBranch = repo.Base
} else {
baseBranch, err = s.Git.DefaultBranch(ctx, barePath)
if err != nil {
return fmt.Errorf("getting default branch for %s: %w", repo.URL, err)
}
}
// Ensure the remote tracking ref exists for the base branch so
// origin/{baseBranch} resolves (especially when baseBranch differs
// from the default branch, which Fetch only creates refs for).
if err := s.Git.EnsureRemoteRef(ctx, barePath, baseBranch); err != nil {
return fmt.Errorf("ensuring remote ref for %s: %w", repo.URL, err)
}
// Use the remote ref to ensure we branch from the latest fetched state,
// not a potentially stale local branch ref in the bare repo.
startPoint := "origin/" + baseBranch
s.log().Debug("creating worktree with new branch", "path", worktreePath, "branch", repo.Branch, "from", startPoint)
if err := s.Git.AddWorktreeNewBranch(ctx, barePath, worktreePath, repo.Branch, startPoint); err != nil {
return fmt.Errorf("creating worktree for %s: %w", repo.URL, err)
}
progress(fmt.Sprintf(" └── %s (%s, new branch from %s) ✓", repoPath, repo.Branch, baseBranch))
}
} else {
s.log().Debug("worktree already exists, skipping", "path", worktreePath)
progress(fmt.Sprintf(" └── %s (%s) exists", repoPath, repo.Branch))
if err := s.ensureWorktree(ctx, rc, progress); err != nil {
return err
}
}

Expand All @@ -273,6 +257,117 @@ func (s *Service) Render(ctx context.Context, id string, progress func(msg strin
return nil
}

// ensureBareRepo clones (if needed) and fetches a bare repository.
func (s *Service) ensureBareRepo(ctx context.Context, rc *repoRenderContext) error {
if _, err := os.Stat(rc.barePath); os.IsNotExist(err) {
s.log().Debug("bare clone not found, cloning", "url", rc.repo.URL, "dest", rc.barePath)
if err := os.MkdirAll(filepath.Dir(rc.barePath), 0o755); err != nil {
return err
}
if err := s.Git.BareClone(ctx, rc.repo.URL, rc.barePath); err != nil {
return fmt.Errorf("cloning: %w", err)
}
}

s.log().Debug("fetching bare repo", "url", rc.repo.URL, "path", rc.barePath)
if err := s.Git.Fetch(ctx, rc.barePath); err != nil {
return fmt.Errorf("fetching: %w", err)
}
return nil
}

// ensureWorktree creates a new worktree or updates an existing one to the
// latest remote state.
func (s *Service) ensureWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error {
if _, err := os.Stat(rc.worktreePath); os.IsNotExist(err) {
return s.createWorktree(ctx, rc, progress)
}
return s.updateWorktree(ctx, rc, progress)
}

// createWorktree creates a new worktree, either from an existing branch or
// by creating a new branch from the base.
func (s *Service) createWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error {
exists, err := s.Git.BranchExists(ctx, rc.barePath, rc.repo.Branch)
if err != nil {
return fmt.Errorf("checking branch for %s: %w", rc.repo.URL, err)
}

if exists {
s.log().Debug("creating worktree from existing branch", "path", rc.worktreePath, "branch", rc.repo.Branch)
if err := s.Git.AddWorktree(ctx, rc.barePath, rc.worktreePath, rc.repo.Branch); err != nil {
return fmt.Errorf("creating worktree for %s: %w", rc.repo.URL, err)
}
progress(fmt.Sprintf(" └── %s (%s) ✓", rc.repoPath, rc.repo.Branch))
return nil
}

baseBranch, err := s.resolveBaseBranch(ctx, rc)
if err != nil {
return err
}

if err := s.Git.EnsureRemoteRef(ctx, rc.barePath, baseBranch); err != nil {
return fmt.Errorf("ensuring remote ref for %s: %w", rc.repo.URL, err)
}

startPoint := "origin/" + baseBranch
s.log().Debug("creating worktree with new branch", "path", rc.worktreePath, "branch", rc.repo.Branch, "from", startPoint)
if err := s.Git.AddWorktreeNewBranch(ctx, rc.barePath, rc.worktreePath, rc.repo.Branch, startPoint); err != nil {
return fmt.Errorf("creating worktree for %s: %w", rc.repo.URL, err)
}
progress(fmt.Sprintf(" └── %s (%s, new branch from %s) ✓", rc.repoPath, rc.repo.Branch, baseBranch))
return nil
}

// updateWorktree resets an existing worktree to the latest remote ref for its
// branch so that re-rendering always picks up new upstream commits.
func (s *Service) updateWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error {
// Ensure the remote tracking ref exists for this branch so we can
// check if the branch exists on the remote.
if err := s.Git.EnsureRemoteRef(ctx, rc.barePath, rc.repo.Branch); err != nil {
// Branch doesn't exist on remote — this is a local-only feature
// branch. Leave it alone.
s.log().Debug("worktree exists, no remote branch to update from", "path", rc.worktreePath, "branch", rc.repo.Branch)
progress(fmt.Sprintf(" └── %s (%s) exists", rc.repoPath, rc.repo.Branch))
return nil
}

// Check if the worktree is clean before resetting
clean, err := s.Git.IsClean(ctx, rc.worktreePath)
if err != nil {
return fmt.Errorf("checking worktree status for %s: %w", rc.repo.URL, err)
}

if !clean {
s.log().Debug("worktree is dirty, skipping update", "path", rc.worktreePath)
progress(fmt.Sprintf(" └── %s (%s) exists (dirty, skipped update)", rc.repoPath, rc.repo.Branch))
return nil
}

// Reset to the latest remote ref
ref := "origin/" + rc.repo.Branch
s.log().Debug("updating worktree to latest remote", "path", rc.worktreePath, "ref", ref)
if err := s.Git.ResetBranch(ctx, rc.worktreePath, ref); err != nil {
return fmt.Errorf("updating worktree for %s: %w", rc.repo.URL, err)
}

progress(fmt.Sprintf(" └── %s (%s) updated ✓", rc.repoPath, rc.repo.Branch))
return nil
}

// resolveBaseBranch returns the base branch for creating new feature branches.
func (s *Service) resolveBaseBranch(ctx context.Context, rc *repoRenderContext) (string, error) {
if rc.repo.Base != "" {
return rc.repo.Base, nil
}
baseBranch, err := s.Git.DefaultBranch(ctx, rc.barePath)
if err != nil {
return "", fmt.Errorf("getting default branch for %s: %w", rc.repo.URL, err)
}
return baseBranch, nil
}

// Sync fetches and rebases worktrees onto their base branches.
// It continues through failures — one repo failing doesn't block others.
func (s *Service) Sync(ctx context.Context, id string, progress func(msg string)) error {
Expand Down
Loading
Loading