Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions gitindex/catfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2016 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gitindex

import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"io"
"os/exec"
"strconv"
"sync"
"syscall"

"github.com/go-git/go-git/v5/plumbing"
)

// catfileReader provides streaming access to git blob objects via a pipelined
// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob
// SHAs to stdin while the caller reads responses one at a time, similar to
// archive/tar.Reader.
//
// The --buffer flag switches git's output from per-object flush (write_or_die)
// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git
// calls fflush(stdout) to deliver any remaining output.
//
// Usage:
//
// cr, err := newCatfileReader(repoDir, ids)
// if err != nil { ... }
// defer cr.Close()
//
// for {
// size, missing, err := cr.Next()
// if err == io.EOF { break }
// if missing { continue }
// if size > maxSize { continue } // unread bytes auto-skipped
// content := make([]byte, size)
// io.ReadFull(cr, content)
// }
type catfileReader struct {
cmd *exec.Cmd
reader *bufio.Reader
writeErr <-chan error

// pending tracks unread content bytes + trailing LF for the current
// entry. Next() discards any pending bytes before reading the next header.
pending int

closeOnce sync.Once
closeErr error
}

// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds
// all ids to its stdin via a background goroutine. The caller must call Close
// when done.
func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) {
cmd := exec.Command("git", "cat-file", "--batch", "--buffer")
cmd.Dir = repoDir

stdin, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("stdin pipe: %w", err)
}

stdout, err := cmd.StdoutPipe()
if err != nil {
stdin.Close()
return nil, fmt.Errorf("stdout pipe: %w", err)
}

if err := cmd.Start(); err != nil {
stdin.Close()
stdout.Close()
return nil, fmt.Errorf("start git cat-file: %w", err)
}

// Writer goroutine: feed all SHAs then close stdin to trigger flush.
writeErr := make(chan error, 1)
go func() {
defer close(writeErr)
defer stdin.Close()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also defer close(writeErr) so if you call Close twice it won't block forever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added defer close(writeErr) in the goroutine. Also wrapped Close() itself in sync.Once (same pattern as DirectoryWatcher.Stop in search/watcher.go) so that cmd.Wait() is also called exactly once. This makes Close fully idempotent — safe to call multiple times or concurrently. Added TestCatfileReader_DoubleClose and TestCatfileReader_ConcurrentClose that verify this.

bw := bufio.NewWriterSize(stdin, 64*1024)
var hexBuf [41]byte
hexBuf[40] = '\n'
for _, id := range ids {
hex.Encode(hexBuf[:40], id[:])
if _, err := bw.Write(hexBuf[:]); err != nil {
writeErr <- err
return
}
}
writeErr <- bw.Flush()
}()

return &catfileReader{
cmd: cmd,
reader: bufio.NewReaderSize(stdout, 512*1024),
writeErr: writeErr,
}, nil
}

// Next advances to the next blob entry. It returns the blob's size and whether
// it is missing. Any unread content from the previous entry is automatically
// discarded. Returns io.EOF when all entries have been consumed.
//
// After Next returns successfully with missing=false, call Read to consume the
// blob content, or call Next again to skip it.
func (cr *catfileReader) Next() (size int, missing bool, err error) {
// Discard unread content from the previous entry.
if cr.pending > 0 {
if _, err := cr.reader.Discard(cr.pending); err != nil {
return 0, false, fmt.Errorf("discard pending bytes: %w", err)
}
cr.pending = 0
}

headerBytes, err := cr.reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
return 0, false, io.EOF
}
return 0, false, fmt.Errorf("read header: %w", err)
}
header := headerBytes[:len(headerBytes)-1] // trim \n

if bytes.HasSuffix(header, []byte(" missing")) {
return 0, true, nil
}

// Parse size from "<oid> <type> <size>".
lastSpace := bytes.LastIndexByte(header, ' ')
if lastSpace == -1 {
return 0, false, fmt.Errorf("unexpected header: %q", header)
}
size, err = strconv.Atoi(string(header[lastSpace+1:]))
if err != nil {
return 0, false, fmt.Errorf("parse size from %q: %w", header, err)
}

// Track pending bytes: content + trailing LF.
cr.pending = size + 1
return size, false, nil
}

// Read reads from the current blob's content. Implements io.Reader. Returns
// io.EOF when the blob's content has been fully read (the trailing LF
// delimiter is consumed automatically).
func (cr *catfileReader) Read(p []byte) (int, error) {
if cr.pending <= 0 {
return 0, io.EOF
}

// Don't read into the trailing LF byte — reserve it.
contentRemaining := cr.pending - 1
if contentRemaining <= 0 {
// Only the trailing LF remains; consume it and signal EOF.
if _, err := cr.reader.ReadByte(); err != nil {
return 0, fmt.Errorf("read trailing LF: %w", err)
}
cr.pending = 0
return 0, io.EOF
}

// Limit the read to the remaining content bytes.
if len(p) > contentRemaining {
p = p[:contentRemaining]
}
n, err := cr.reader.Read(p)
cr.pending -= n
if err != nil {
return n, err
}

// If we've consumed all content bytes, also consume the trailing LF.
if cr.pending == 1 {
if _, err := cr.reader.ReadByte(); err != nil {
return n, fmt.Errorf("read trailing LF: %w", err)
}
cr.pending = 0
}

return n, nil
}

// Close shuts down the cat-file process and waits for it to exit.
// It is safe to call Close multiple times or concurrently.
func (cr *catfileReader) Close() error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about what happens when we call Close before git is finished giving us all its output. eg if we get an error during builder.Add we will call close before reading everything.

Maybe instead this should just always call something like cr.cmd.Kill() (I can't remember offhand what it is). Then I think all the intermediatte goroutines/etc will drain.

I do quite like that you nicely clean up here, but I want to make sure we have good behaviour for early termination.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — switched to kill-first. Close() now calls cmd.Process.Kill() before draining. This matches Gitaly's pattern — their cache eviction code has an explicit comment: "We first cancel the context such that the process gets a SIGKILL signal. Calling close() first may lead to a deadlock given that it waits for the process to exit, which may not happen if it hangs writing data to stdout."

After Kill, the drain completes instantly (stdout pipe gets EOF), the writer goroutine gets EPIPE on its next write and exits, and cmd.Wait() returns. The expected "signal: killed" exit error is suppressed via isKilledErr().

Added TestCatfileReader_CloseBeforeExhausted_ManyBlobs (200 files, reads only 1, then Close) and TestCatfileReader_CloseWithoutReading to exercise this.

cr.closeOnce.Do(func() {
// Kill first to avoid blocking on drain when there are many
// unconsumed entries. Gitaly uses the same kill-first pattern.
_ = cr.cmd.Process.Kill()
// Drain any buffered stdout so the pipe closes cleanly.
// Must complete before cmd.Wait(), which closes the pipe.
_, _ = io.Copy(io.Discard, cr.reader)
// Wait for writer goroutine (unblocks via broken pipe from Kill).
<-cr.writeErr
err := cr.cmd.Wait()
// Suppress the expected "signal: killed" error from our own Kill().
if isKilledErr(err) {
err = nil
}
cr.closeErr = err
})
return cr.closeErr
}

// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL.
func isKilledErr(err error) bool {
exitErr, ok := err.(*exec.ExitError)
if !ok {
return false
}
ws, ok := exitErr.Sys().(syscall.WaitStatus)
return ok && ws.Signal() == syscall.SIGKILL
}
159 changes: 159 additions & 0 deletions gitindex/catfile_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package gitindex

import (
"fmt"
"io"
"os"
"testing"

"github.com/go-git/go-git/v5/plumbing"
)

// Set ZOEKT_BENCH_REPO to a git checkout to enable these benchmarks.
//
// git clone --depth=1 https://github.com/kubernetes/kubernetes /tmp/k8s
// ZOEKT_BENCH_REPO=/tmp/k8s go test ./gitindex/ -bench=BenchmarkBlobRead -benchmem -count=5 -timeout=600s

func requireBenchGitRepo(b *testing.B) string {
b.Helper()
dir := os.Getenv("ZOEKT_BENCH_REPO")
if dir == "" {
b.Skip("ZOEKT_BENCH_REPO not set")
}
return dir
}

// collectBlobKeys opens the repo, walks HEAD, and returns all fileKeys with
// their BlobLocations plus the repo directory path.
func collectBlobKeys(b *testing.B, repoDir string) (map[fileKey]BlobLocation, string) {
b.Helper()

repo, closer, err := openRepo(repoDir)
if err != nil {
b.Fatalf("openRepo: %v", err)
}
b.Cleanup(func() { closer.Close() })

head, err := repo.Head()
if err != nil {
b.Fatalf("Head: %v", err)
}

commit, err := repo.CommitObject(head.Hash())
if err != nil {
b.Fatalf("CommitObject: %v", err)
}

tree, err := commit.Tree()
if err != nil {
b.Fatalf("Tree: %v", err)
}

rw := NewRepoWalker(repo, "https://example.com/repo", nil)
if _, err := rw.CollectFiles(tree, "HEAD", nil); err != nil {
b.Fatalf("CollectFiles: %v", err)
}

return rw.Files, repoDir
}

// sortedBlobKeys returns fileKeys for deterministic iteration.
func sortedBlobKeys(files map[fileKey]BlobLocation) []fileKey {
keys := make([]fileKey, 0, len(files))
for k := range files {
keys = append(keys, k)
}
return keys
}

// BenchmarkBlobRead_GoGit measures the current go-git BlobObject approach:
// sequential calls to repo.GitRepo.BlobObject(hash) for each file.
func BenchmarkBlobRead_GoGit(b *testing.B) {
repoDir := requireBenchGitRepo(b)
files, _ := collectBlobKeys(b, repoDir)
keys := sortedBlobKeys(files)
b.Logf("collected %d blob keys", len(keys))

for _, n := range []int{1_000, 5_000, len(keys)} {
n = min(n, len(keys))
subset := keys[:n]

b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) {
b.ReportAllocs()
var totalBytes int64
for b.Loop() {
totalBytes = 0
for _, key := range subset {
loc := files[key]
blob, err := loc.GitRepo.BlobObject(key.ID)
if err != nil {
b.Fatalf("BlobObject(%s): %v", key.ID, err)
}
r, err := blob.Reader()
if err != nil {
b.Fatalf("Reader: %v", err)
}
n, err := io.Copy(io.Discard, r)
r.Close()
if err != nil {
b.Fatalf("Read: %v", err)
}
totalBytes += n
}
}
b.ReportMetric(float64(totalBytes), "content-bytes/op")
b.ReportMetric(float64(len(subset)), "files/op")
})
}
}

// BenchmarkBlobRead_CatfileReader measures the streaming catfileReader approach:
// all SHAs written to stdin at once via --buffer, responses read one at a time.
// This is the production path used by indexGitRepo.
func BenchmarkBlobRead_CatfileReader(b *testing.B) {
repoDir := requireBenchGitRepo(b)
files, gitDir := collectBlobKeys(b, repoDir)
keys := sortedBlobKeys(files)
b.Logf("collected %d blob keys", len(keys))

ids := make([]plumbing.Hash, len(keys))
for i, k := range keys {
ids[i] = k.ID
}

for _, n := range []int{1_000, 5_000, len(keys)} {
n = min(n, len(keys))
subset := ids[:n]

b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) {
b.ReportAllocs()
var totalBytes int64
for b.Loop() {
totalBytes = 0
cr, err := newCatfileReader(gitDir, subset)
if err != nil {
b.Fatalf("newCatfileReader: %v", err)
}
for range subset {
size, missing, err := cr.Next()
if err != nil {
cr.Close()
b.Fatalf("Next: %v", err)
}
if missing {
continue
}
content := make([]byte, size)
if _, err := io.ReadFull(cr, content); err != nil {
cr.Close()
b.Fatalf("ReadFull: %v", err)
}
totalBytes += int64(len(content))
}
cr.Close()
}
b.ReportMetric(float64(totalBytes), "content-bytes/op")
b.ReportMetric(float64(len(subset)), "files/op")
})
}
}
Loading
Loading