From dfbedcf873a6e16ccbf167d8f4ea2e5d7ac591e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 20 Mar 2026 14:33:09 +0100 Subject: [PATCH 1/3] gitindex: replace go-git blob reading with pipelined git cat-file --batch Replace the serial go-git BlobObject calls in indexGitRepo with a single pipelined "git cat-file --batch --buffer" subprocess. A writer goroutine feeds all blob SHAs to stdin while the main goroutine reads responses from stdout, forming a concurrent pipeline that eliminates per-object packfile seek overhead and leverages git's internal delta base cache. Submodule blobs fall back to the existing go-git createDocument path. Benchmarked on kubernetes (29,188 files, 261 MB), Apple M1 Max, 5 runs: go-git BlobObject (before): Time: 2.94s Allocs: 685K Memory: 691 MB cat-file pipelined (after): Time: 0.60s Allocs: 58K Memory: 276 MB Speedup: 4.9x time, 12x fewer allocs, 2.5x less memory --- gitindex/catfile.go | 172 +++++++++++++++++++++++++++++++++ gitindex/catfile_bench_test.go | 148 ++++++++++++++++++++++++++++ gitindex/catfile_test.go | 138 ++++++++++++++++++++++++++ gitindex/index.go | 77 ++++++++++++--- 4 files changed, 524 insertions(+), 11 deletions(-) create mode 100644 gitindex/catfile.go create mode 100644 gitindex/catfile_bench_test.go create mode 100644 gitindex/catfile_test.go diff --git a/gitindex/catfile.go b/gitindex/catfile.go new file mode 100644 index 000000000..df9252ae5 --- /dev/null +++ b/gitindex/catfile.go @@ -0,0 +1,172 @@ +// Copyright 2016 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gitindex + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "io" + "os/exec" + "strconv" + + "github.com/go-git/go-git/v5/plumbing" +) + +// blobResult holds the result of reading a single blob from a pipelined +// cat-file --batch --buffer process. +type blobResult struct { + ID plumbing.Hash + Content []byte + Size int64 + Missing bool + Err error +} + +// readBlobsPipelined reads all blobs for the given IDs using a single +// "git cat-file --batch --buffer" process. A writer goroutine feeds SHAs +// to stdin while the main goroutine reads responses from stdout, forming a +// concurrent pipeline. The --buffer flag switches git's output from per-object +// flush (write_or_die) to libc stdio buffering (fwrite), reducing syscalls. +// After stdin EOF, git calls fflush(stdout) to deliver any remaining output. +// Results are returned in the same order as ids. +func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, error) { + cmd := exec.Command("git", "cat-file", "--batch", "--buffer") + cmd.Dir = repoDir + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("stdin pipe: %w", err) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + stdin.Close() + return nil, fmt.Errorf("stdout pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + stdin.Close() + stdout.Close() + return nil, fmt.Errorf("start git cat-file: %w", err) + } + + // Writer goroutine: feed all SHAs then close stdin to trigger flush. + // Uses bufio.Writer to coalesce small writes into fewer syscalls. + // Stack-allocated hex buffer avoids per-SHA heap allocation. + writeErr := make(chan error, 1) + go func() { + defer stdin.Close() + bw := bufio.NewWriterSize(stdin, 64*1024) // 64KB write buffer + var hexBuf [41]byte + hexBuf[40] = '\n' + for _, id := range ids { + hex.Encode(hexBuf[:40], id[:]) + if _, err := bw.Write(hexBuf[:]); err != nil { + writeErr <- err + return + } + } + writeErr <- bw.Flush() + }() + + // Reader: consume all responses in order. + // Manual header parsing avoids SplitN allocation. + reader := bufio.NewReaderSize(stdout, 512*1024) + results := make([]blobResult, len(ids)) + var readErr error + + for i, id := range ids { + results[i].ID = id + + headerBytes, err := reader.ReadBytes('\n') + if err != nil { + readErr = fmt.Errorf("read header for %s: %w", id, err) + results[i].Err = readErr + break + } + header := headerBytes[:len(headerBytes)-1] // trim \n + + if bytes.HasSuffix(header, []byte(" missing")) { + results[i].Missing = true + continue + } + + // Parse size from " ". + lastSpace := bytes.LastIndexByte(header, ' ') + if lastSpace == -1 { + readErr = fmt.Errorf("unexpected header: %q", header) + results[i].Err = readErr + break + } + size, err := strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) + if err != nil { + readErr = fmt.Errorf("parse size from %q: %w", header, err) + results[i].Err = readErr + break + } + results[i].Size = size + + // Read exactly size bytes into a dedicated slice (must survive + // until consumed by builder.Add). Exact-size avoids allocator + // rounding waste (e.g. make(4097) → 8192 bytes). + content := make([]byte, size) + if _, err := io.ReadFull(reader, content); err != nil { + readErr = fmt.Errorf("read content (%d bytes): %w", size, err) + results[i].Err = readErr + break + } + results[i].Content = content + + // Consume trailing LF delimiter. + if _, err := reader.ReadByte(); err != nil { + readErr = fmt.Errorf("read trailing LF: %w", err) + results[i].Err = readErr + break + } + } + + // Mark all unprocessed results as failed if we broke out early. + if readErr != nil { + for j := range results { + if results[j].Err == nil && results[j].Content == nil && !results[j].Missing { + results[j].Err = readErr + } + } + } + + // Drain stdout so git can exit without blocking on a full pipe buffer. + _, _ = io.Copy(io.Discard, reader) + + // Wait for writer goroutine to finish. + wErr := <-writeErr + + // Wait for the git process to exit. + waitErr := cmd.Wait() + + // Return the first meaningful error. + if readErr != nil { + return results, readErr + } + if wErr != nil { + return results, fmt.Errorf("write to cat-file: %w", wErr) + } + if waitErr != nil { + return results, waitErr + } + + return results, nil +} diff --git a/gitindex/catfile_bench_test.go b/gitindex/catfile_bench_test.go new file mode 100644 index 000000000..fe7f96892 --- /dev/null +++ b/gitindex/catfile_bench_test.go @@ -0,0 +1,148 @@ +package gitindex + +import ( + "fmt" + "io" + "os" + "testing" + + "github.com/go-git/go-git/v5/plumbing" +) + +// Set ZOEKT_BENCH_REPO to a git checkout to enable these benchmarks. +// +// git clone --depth=1 https://github.com/kubernetes/kubernetes /tmp/k8s +// ZOEKT_BENCH_REPO=/tmp/k8s go test ./gitindex/ -bench=BenchmarkBlobRead -benchmem -count=5 -timeout=600s + +func requireBenchGitRepo(b *testing.B) string { + b.Helper() + dir := os.Getenv("ZOEKT_BENCH_REPO") + if dir == "" { + b.Skip("ZOEKT_BENCH_REPO not set") + } + return dir +} + +// collectBlobKeys opens the repo, walks HEAD, and returns all fileKeys with +// their BlobLocations plus the repo directory path. +func collectBlobKeys(b *testing.B, repoDir string) (map[fileKey]BlobLocation, string) { + b.Helper() + + repo, closer, err := openRepo(repoDir) + if err != nil { + b.Fatalf("openRepo: %v", err) + } + b.Cleanup(func() { closer.Close() }) + + head, err := repo.Head() + if err != nil { + b.Fatalf("Head: %v", err) + } + + commit, err := repo.CommitObject(head.Hash()) + if err != nil { + b.Fatalf("CommitObject: %v", err) + } + + tree, err := commit.Tree() + if err != nil { + b.Fatalf("Tree: %v", err) + } + + rw := NewRepoWalker(repo, "https://example.com/repo", nil) + if _, err := rw.CollectFiles(tree, "HEAD", nil); err != nil { + b.Fatalf("CollectFiles: %v", err) + } + + return rw.Files, repoDir +} + +// sortedBlobKeys returns fileKeys for deterministic iteration. +func sortedBlobKeys(files map[fileKey]BlobLocation) []fileKey { + keys := make([]fileKey, 0, len(files)) + for k := range files { + keys = append(keys, k) + } + return keys +} + +// BenchmarkBlobRead_GoGit measures the current go-git BlobObject approach: +// sequential calls to repo.GitRepo.BlobObject(hash) for each file. +func BenchmarkBlobRead_GoGit(b *testing.B) { + repoDir := requireBenchGitRepo(b) + files, _ := collectBlobKeys(b, repoDir) + keys := sortedBlobKeys(files) + b.Logf("collected %d blob keys", len(keys)) + + for _, n := range []int{1_000, 5_000, len(keys)} { + n = min(n, len(keys)) + subset := keys[:n] + + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { + b.ReportAllocs() + var totalBytes int64 + for b.Loop() { + totalBytes = 0 + for _, key := range subset { + loc := files[key] + blob, err := loc.GitRepo.BlobObject(key.ID) + if err != nil { + b.Fatalf("BlobObject(%s): %v", key.ID, err) + } + r, err := blob.Reader() + if err != nil { + b.Fatalf("Reader: %v", err) + } + n, err := io.Copy(io.Discard, r) + r.Close() + if err != nil { + b.Fatalf("Read: %v", err) + } + totalBytes += n + } + } + b.ReportMetric(float64(totalBytes), "content-bytes/op") + b.ReportMetric(float64(len(subset)), "files/op") + }) + } +} + +// BenchmarkBlobRead_CatfilePipelined measures the pipelined approach: +// all SHAs written to stdin at once via --buffer, then all responses read. +// This is the production path used by indexGitRepo. +func BenchmarkBlobRead_CatfilePipelined(b *testing.B) { + repoDir := requireBenchGitRepo(b) + files, gitDir := collectBlobKeys(b, repoDir) + keys := sortedBlobKeys(files) + b.Logf("collected %d blob keys", len(keys)) + + ids := make([]plumbing.Hash, len(keys)) + for i, k := range keys { + ids[i] = k.ID + } + + for _, n := range []int{1_000, 5_000, len(keys)} { + n = min(n, len(keys)) + subset := ids[:n] + + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { + b.ReportAllocs() + var totalBytes int64 + for b.Loop() { + totalBytes = 0 + results, err := readBlobsPipelined(gitDir, subset) + if err != nil { + b.Fatalf("readBlobsPipelined: %v", err) + } + for _, r := range results { + if r.Err != nil { + b.Fatalf("blob %s: %v", r.ID, r.Err) + } + totalBytes += int64(len(r.Content)) + } + } + b.ReportMetric(float64(totalBytes), "content-bytes/op") + b.ReportMetric(float64(len(subset)), "files/op") + }) + } +} diff --git a/gitindex/catfile_test.go b/gitindex/catfile_test.go new file mode 100644 index 000000000..b6eb99841 --- /dev/null +++ b/gitindex/catfile_test.go @@ -0,0 +1,138 @@ +package gitindex + +import ( + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/go-git/go-git/v5/plumbing" +) + +// createTestRepo creates a git repo with various test files and returns +// the repo path and a map of filename -> blob SHA. +func createTestRepo(t *testing.T) (string, map[string]plumbing.Hash) { + t.Helper() + dir := t.TempDir() + repoDir := filepath.Join(dir, "repo") + + script := ` +set -e +git init -b main repo +cd repo +git config user.email "test@test.com" +git config user.name "Test" + +# Normal text file +echo "hello world" > hello.txt + +# Empty file +touch empty.txt + +# Binary file with newlines embedded +printf '\x00\x01\x02\nhello\nworld\n\x03\x04' > binary.bin + +# Large-ish file (64KB of data) +dd if=/dev/urandom bs=1024 count=64 of=large.bin 2>/dev/null + +git add -A +git commit -m "initial" +` + cmd := exec.Command("/bin/sh", "-c", script) + cmd.Dir = dir + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + t.Fatalf("create test repo: %v", err) + } + + // Get blob SHAs for each file. + blobs := map[string]plumbing.Hash{} + for _, name := range []string{"hello.txt", "empty.txt", "binary.bin", "large.bin"} { + out, err := exec.Command("git", "-C", repoDir, "rev-parse", "HEAD:"+name).Output() + if err != nil { + t.Fatalf("rev-parse %s: %v", name, err) + } + sha := string(out[:len(out)-1]) // trim newline + blobs[name] = plumbing.NewHash(sha) + } + + return repoDir, blobs +} + +func TestReadBlobsPipelined(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["empty.txt"], + blobs["binary.bin"], + blobs["large.bin"], + } + + results, err := readBlobsPipelined(repoDir, ids) + if err != nil { + t.Fatalf("readBlobsPipelined: %v", err) + } + + if len(results) != 4 { + t.Fatalf("got %d results, want 4", len(results)) + } + + // hello.txt + if results[0].Err != nil { + t.Fatalf("hello.txt err: %v", results[0].Err) + } + if string(results[0].Content) != "hello world\n" { + t.Errorf("hello.txt = %q", results[0].Content) + } + + // empty.txt + if results[1].Err != nil { + t.Fatalf("empty.txt err: %v", results[1].Err) + } + if len(results[1].Content) != 0 { + t.Errorf("empty.txt len = %d, want 0", len(results[1].Content)) + } + + // binary.bin — verify exact content survived the pipeline. + if results[2].Err != nil { + t.Fatalf("binary.bin err: %v", results[2].Err) + } + if results[2].Content[0] != 0x00 { + t.Errorf("binary.bin first byte = %x, want 0x00", results[2].Content[0]) + } + + // large.bin + if results[3].Err != nil { + t.Fatalf("large.bin err: %v", results[3].Err) + } + if results[3].Size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", results[3].Size, 64*1024) + } +} + +func TestReadBlobsPipelined_WithMissing(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + ids := []plumbing.Hash{ + blobs["hello.txt"], + fakeHash, + blobs["empty.txt"], + } + + results, err := readBlobsPipelined(repoDir, ids) + if err != nil { + t.Fatalf("readBlobsPipelined: %v", err) + } + + if !results[1].Missing { + t.Errorf("expected result[1] to be missing") + } + if string(results[0].Content) != "hello world\n" { + t.Errorf("hello.txt = %q", results[0].Content) + } + if len(results[2].Content) != 0 { + t.Errorf("empty.txt len = %d, want 0", len(results[2].Content)) + } +} diff --git a/gitindex/index.go b/gitindex/index.go index 5fbeba0d0..77de23d1b 100644 --- a/gitindex/index.go +++ b/gitindex/index.go @@ -585,23 +585,78 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { sort.Strings(names) names = uniq(names) - log.Printf("attempting to index %d total files", totalFiles) - for idx, name := range names { - keys := fileKeys[name] + // Build the ordered list of (name, key) pairs and collect blob SHAs for + // the main repo so we can read them all at once via git cat-file --batch. + type indexEntry struct { + key fileKey + blobIndex int // index into blobResults, or -1 for submodule blobs + } + entries := make([]indexEntry, 0, totalFiles) + mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) + + for _, name := range names { + for _, key := range fileKeys[name] { + if key.SubRepoPath == "" { + entries = append(entries, indexEntry{key: key, blobIndex: len(mainRepoIDs)}) + mainRepoIDs = append(mainRepoIDs, key.ID) + } else { + entries = append(entries, indexEntry{key: key, blobIndex: -1}) + } + } + } + + // Bulk-read all main-repo blobs via pipelined cat-file --batch --buffer. + var blobResults []blobResult + if len(mainRepoIDs) > 0 { + var err error + blobResults, err = readBlobsPipelined(opts.RepoDir, mainRepoIDs) + if err != nil { + return false, fmt.Errorf("readBlobsPipelined: %w", err) + } + } + + log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), totalFiles-len(mainRepoIDs)) + for idx, entry := range entries { + var doc index.Document + + if entry.blobIndex >= 0 { + // Main repo blob: use pipelined result. + r := blobResults[entry.blobIndex] + if r.Err != nil { + return false, fmt.Errorf("read blob %s: %w", r.ID, r.Err) + } - for _, key := range keys { - doc, err := createDocument(key, repos, opts.BuildOptions) + branches := repos[entry.key].Branches + if r.Missing { + doc = skippedLargeDoc(entry.key, branches) + } else { + keyFullPath := entry.key.FullPath() + if r.Size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { + doc = skippedLargeDoc(entry.key, branches) + } else { + doc = index.Document{ + SubRepositoryPath: entry.key.SubRepoPath, + Name: keyFullPath, + Content: r.Content, + Branches: branches, + } + } + } + } else { + // Submodule blob: fall back to go-git. + var err error + doc, err = createDocument(entry.key, repos, opts.BuildOptions) if err != nil { return false, err } + } - if err := builder.Add(doc); err != nil { - return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) - } + if err := builder.Add(doc); err != nil { + return false, fmt.Errorf("error adding document with name %s: %w", entry.key.FullPath(), err) + } - if idx%10_000 == 0 { - builder.CheckMemoryUsage() - } + if idx%10_000 == 0 { + builder.CheckMemoryUsage() } } return true, builder.Finish() From dec1a91f40ddd9bbbc40d9a92f3823def8d2f564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 20 Mar 2026 18:06:50 +0100 Subject: [PATCH 2/3] gitindex: streaming catfileReader API, skip large blobs without reading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the bulk readBlobsPipelined (which read all blobs into a []blobResult slice) with a streaming catfileReader modeled after archive/tar.Reader: cr, _ := newCatfileReader(repoDir, ids) for { size, missing, err := cr.Next() if size > maxSize { continue } // auto-skipped, never read content := make([]byte, size) io.ReadFull(cr, content) } Next() reads the cat-file header and returns the blob's size. The caller decides whether to Read the content or skip it — calling Next() again automatically discards unread bytes via bufio.Reader.Discard. Large blobs over SizeMax are never allocated or read into Go memory. Also split the single interleaved loop into two: one for main-repo blobs streamed via cat-file, one for submodule blobs via go-git's createDocument. The builder sorts documents internally so ordering between the loops does not matter. Peak memory is now bounded by ShardMax (one shard's worth of content) rather than total repository size. --- gitindex/catfile.go | 206 +++++++++++++++++++-------------- gitindex/catfile_bench_test.go | 29 +++-- gitindex/catfile_test.go | 158 +++++++++++++++++++------ gitindex/index.go | 94 ++++++++------- 4 files changed, 317 insertions(+), 170 deletions(-) diff --git a/gitindex/catfile.go b/gitindex/catfile.go index df9252ae5..89418fdcc 100644 --- a/gitindex/catfile.go +++ b/gitindex/catfile.go @@ -26,24 +26,43 @@ import ( "github.com/go-git/go-git/v5/plumbing" ) -// blobResult holds the result of reading a single blob from a pipelined -// cat-file --batch --buffer process. -type blobResult struct { - ID plumbing.Hash - Content []byte - Size int64 - Missing bool - Err error +// catfileReader provides streaming access to git blob objects via a pipelined +// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob +// SHAs to stdin while the caller reads responses one at a time, similar to +// archive/tar.Reader. +// +// The --buffer flag switches git's output from per-object flush (write_or_die) +// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git +// calls fflush(stdout) to deliver any remaining output. +// +// Usage: +// +// cr, err := newCatfileReader(repoDir, ids) +// if err != nil { ... } +// defer cr.Close() +// +// for { +// size, missing, err := cr.Next() +// if err == io.EOF { break } +// if missing { continue } +// if size > maxSize { continue } // unread bytes auto-skipped +// content := make([]byte, size) +// io.ReadFull(cr, content) +// } +type catfileReader struct { + cmd *exec.Cmd + reader *bufio.Reader + writeErr <-chan error + + // pending tracks unread content bytes + trailing LF for the current + // entry. Next() discards any pending bytes before reading the next header. + pending int64 } -// readBlobsPipelined reads all blobs for the given IDs using a single -// "git cat-file --batch --buffer" process. A writer goroutine feeds SHAs -// to stdin while the main goroutine reads responses from stdout, forming a -// concurrent pipeline. The --buffer flag switches git's output from per-object -// flush (write_or_die) to libc stdio buffering (fwrite), reducing syscalls. -// After stdin EOF, git calls fflush(stdout) to deliver any remaining output. -// Results are returned in the same order as ids. -func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, error) { +// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds +// all ids to its stdin via a background goroutine. The caller must call Close +// when done. +func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) { cmd := exec.Command("git", "cat-file", "--batch", "--buffer") cmd.Dir = repoDir @@ -65,12 +84,10 @@ func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, erro } // Writer goroutine: feed all SHAs then close stdin to trigger flush. - // Uses bufio.Writer to coalesce small writes into fewer syscalls. - // Stack-allocated hex buffer avoids per-SHA heap allocation. writeErr := make(chan error, 1) go func() { defer stdin.Close() - bw := bufio.NewWriterSize(stdin, 64*1024) // 64KB write buffer + bw := bufio.NewWriterSize(stdin, 64*1024) var hexBuf [41]byte hexBuf[40] = '\n' for _, id := range ids { @@ -83,90 +100,101 @@ func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, erro writeErr <- bw.Flush() }() - // Reader: consume all responses in order. - // Manual header parsing avoids SplitN allocation. - reader := bufio.NewReaderSize(stdout, 512*1024) - results := make([]blobResult, len(ids)) - var readErr error - - for i, id := range ids { - results[i].ID = id - - headerBytes, err := reader.ReadBytes('\n') - if err != nil { - readErr = fmt.Errorf("read header for %s: %w", id, err) - results[i].Err = readErr - break - } - header := headerBytes[:len(headerBytes)-1] // trim \n + return &catfileReader{ + cmd: cmd, + reader: bufio.NewReaderSize(stdout, 512*1024), + writeErr: writeErr, + }, nil +} - if bytes.HasSuffix(header, []byte(" missing")) { - results[i].Missing = true - continue +// Next advances to the next blob entry. It returns the blob's size and whether +// it is missing. Any unread content from the previous entry is automatically +// discarded. Returns io.EOF when all entries have been consumed. +// +// After Next returns successfully with missing=false, call Read to consume the +// blob content, or call Next again to skip it. +func (cr *catfileReader) Next() (size int64, missing bool, err error) { + // Discard unread content from the previous entry. + if cr.pending > 0 { + if _, err := cr.reader.Discard(int(cr.pending)); err != nil { + return 0, false, fmt.Errorf("discard pending bytes: %w", err) } + cr.pending = 0 + } - // Parse size from " ". - lastSpace := bytes.LastIndexByte(header, ' ') - if lastSpace == -1 { - readErr = fmt.Errorf("unexpected header: %q", header) - results[i].Err = readErr - break - } - size, err := strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) - if err != nil { - readErr = fmt.Errorf("parse size from %q: %w", header, err) - results[i].Err = readErr - break - } - results[i].Size = size - - // Read exactly size bytes into a dedicated slice (must survive - // until consumed by builder.Add). Exact-size avoids allocator - // rounding waste (e.g. make(4097) → 8192 bytes). - content := make([]byte, size) - if _, err := io.ReadFull(reader, content); err != nil { - readErr = fmt.Errorf("read content (%d bytes): %w", size, err) - results[i].Err = readErr - break + headerBytes, err := cr.reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + return 0, false, io.EOF } - results[i].Content = content + return 0, false, fmt.Errorf("read header: %w", err) + } + header := headerBytes[:len(headerBytes)-1] // trim \n - // Consume trailing LF delimiter. - if _, err := reader.ReadByte(); err != nil { - readErr = fmt.Errorf("read trailing LF: %w", err) - results[i].Err = readErr - break - } + if bytes.HasSuffix(header, []byte(" missing")) { + return 0, true, nil } - // Mark all unprocessed results as failed if we broke out early. - if readErr != nil { - for j := range results { - if results[j].Err == nil && results[j].Content == nil && !results[j].Missing { - results[j].Err = readErr - } - } + // Parse size from " ". + lastSpace := bytes.LastIndexByte(header, ' ') + if lastSpace == -1 { + return 0, false, fmt.Errorf("unexpected header: %q", header) + } + size, err = strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) + if err != nil { + return 0, false, fmt.Errorf("parse size from %q: %w", header, err) } - // Drain stdout so git can exit without blocking on a full pipe buffer. - _, _ = io.Copy(io.Discard, reader) + // Track pending bytes: content + trailing LF. + cr.pending = size + 1 + return size, false, nil +} - // Wait for writer goroutine to finish. - wErr := <-writeErr +// Read reads from the current blob's content. Implements io.Reader. Returns +// io.EOF when the blob's content has been fully read (the trailing LF +// delimiter is consumed automatically). +func (cr *catfileReader) Read(p []byte) (int, error) { + if cr.pending <= 0 { + return 0, io.EOF + } - // Wait for the git process to exit. - waitErr := cmd.Wait() + // Don't read into the trailing LF byte — reserve it. + contentRemaining := cr.pending - 1 + if contentRemaining <= 0 { + // Only the trailing LF remains; consume it and signal EOF. + if _, err := cr.reader.ReadByte(); err != nil { + return 0, fmt.Errorf("read trailing LF: %w", err) + } + cr.pending = 0 + return 0, io.EOF + } - // Return the first meaningful error. - if readErr != nil { - return results, readErr + // Limit the read to the remaining content bytes. + if int64(len(p)) > contentRemaining { + p = p[:contentRemaining] } - if wErr != nil { - return results, fmt.Errorf("write to cat-file: %w", wErr) + n, err := cr.reader.Read(p) + cr.pending -= int64(n) + if err != nil { + return n, err } - if waitErr != nil { - return results, waitErr + + // If we've consumed all content bytes, also consume the trailing LF. + if cr.pending == 1 { + if _, err := cr.reader.ReadByte(); err != nil { + return n, fmt.Errorf("read trailing LF: %w", err) + } + cr.pending = 0 } - return results, nil + return n, nil +} + +// Close shuts down the cat-file process and waits for it to exit. +func (cr *catfileReader) Close() error { + // Drain stdout so git can flush and exit without blocking. + _, _ = io.Copy(io.Discard, cr.reader) + // Wait for writer goroutine. + <-cr.writeErr + return cr.cmd.Wait() } diff --git a/gitindex/catfile_bench_test.go b/gitindex/catfile_bench_test.go index fe7f96892..ec2626a7b 100644 --- a/gitindex/catfile_bench_test.go +++ b/gitindex/catfile_bench_test.go @@ -107,10 +107,10 @@ func BenchmarkBlobRead_GoGit(b *testing.B) { } } -// BenchmarkBlobRead_CatfilePipelined measures the pipelined approach: -// all SHAs written to stdin at once via --buffer, then all responses read. +// BenchmarkBlobRead_CatfileReader measures the streaming catfileReader approach: +// all SHAs written to stdin at once via --buffer, responses read one at a time. // This is the production path used by indexGitRepo. -func BenchmarkBlobRead_CatfilePipelined(b *testing.B) { +func BenchmarkBlobRead_CatfileReader(b *testing.B) { repoDir := requireBenchGitRepo(b) files, gitDir := collectBlobKeys(b, repoDir) keys := sortedBlobKeys(files) @@ -130,16 +130,27 @@ func BenchmarkBlobRead_CatfilePipelined(b *testing.B) { var totalBytes int64 for b.Loop() { totalBytes = 0 - results, err := readBlobsPipelined(gitDir, subset) + cr, err := newCatfileReader(gitDir, subset) if err != nil { - b.Fatalf("readBlobsPipelined: %v", err) + b.Fatalf("newCatfileReader: %v", err) } - for _, r := range results { - if r.Err != nil { - b.Fatalf("blob %s: %v", r.ID, r.Err) + for range subset { + size, missing, err := cr.Next() + if err != nil { + cr.Close() + b.Fatalf("Next: %v", err) + } + if missing { + continue + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + cr.Close() + b.Fatalf("ReadFull: %v", err) } - totalBytes += int64(len(r.Content)) + totalBytes += int64(len(content)) } + cr.Close() } b.ReportMetric(float64(totalBytes), "content-bytes/op") b.ReportMetric(float64(len(subset)), "files/op") diff --git a/gitindex/catfile_test.go b/gitindex/catfile_test.go index b6eb99841..c871bd7d2 100644 --- a/gitindex/catfile_test.go +++ b/gitindex/catfile_test.go @@ -1,6 +1,7 @@ package gitindex import ( + "io" "os" "os/exec" "path/filepath" @@ -59,7 +60,7 @@ git commit -m "initial" return repoDir, blobs } -func TestReadBlobsPipelined(t *testing.T) { +func TestCatfileReader(t *testing.T) { repoDir, blobs := createTestRepo(t) ids := []plumbing.Hash{ @@ -69,49 +70,118 @@ func TestReadBlobsPipelined(t *testing.T) { blobs["large.bin"], } - results, err := readBlobsPipelined(repoDir, ids) + cr, err := newCatfileReader(repoDir, ids) if err != nil { - t.Fatalf("readBlobsPipelined: %v", err) - } - - if len(results) != 4 { - t.Fatalf("got %d results, want 4", len(results)) + t.Fatalf("newCatfileReader: %v", err) } + defer cr.Close() // hello.txt - if results[0].Err != nil { - t.Fatalf("hello.txt err: %v", results[0].Err) + size, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next hello.txt: %v", err) + } + if missing { + t.Fatal("hello.txt unexpectedly missing") + } + if size != 12 { + t.Errorf("hello.txt size = %d, want 12", size) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull hello.txt: %v", err) } - if string(results[0].Content) != "hello world\n" { - t.Errorf("hello.txt = %q", results[0].Content) + if string(content) != "hello world\n" { + t.Errorf("hello.txt content = %q", content) } // empty.txt - if results[1].Err != nil { - t.Fatalf("empty.txt err: %v", results[1].Err) + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next empty.txt: %v", err) } - if len(results[1].Content) != 0 { - t.Errorf("empty.txt len = %d, want 0", len(results[1].Content)) + if size != 0 { + t.Errorf("empty.txt size = %d, want 0", size) } - // binary.bin — verify exact content survived the pipeline. - if results[2].Err != nil { - t.Fatalf("binary.bin err: %v", results[2].Err) + // binary.bin — read content and verify binary data survives. + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin: %v", err) + } + binContent := make([]byte, size) + if _, err := io.ReadFull(cr, binContent); err != nil { + t.Fatalf("ReadFull binary.bin: %v", err) } - if results[2].Content[0] != 0x00 { - t.Errorf("binary.bin first byte = %x, want 0x00", results[2].Content[0]) + if binContent[0] != 0x00 || binContent[3] != '\n' { + t.Errorf("binary.bin unexpected leading bytes: %x", binContent[:5]) } // large.bin - if results[3].Err != nil { - t.Fatalf("large.bin err: %v", results[3].Err) + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next large.bin: %v", err) + } + if size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", size, 64*1024) + } + largeContent := make([]byte, size) + if _, err := io.ReadFull(cr, largeContent); err != nil { + t.Fatalf("ReadFull large.bin: %v", err) + } + + // EOF after all entries. + _, _, err = cr.Next() + if err != io.EOF { + t.Errorf("expected io.EOF after last entry, got %v", err) + } +} + +func TestCatfileReader_Skip(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["large.bin"], + blobs["binary.bin"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatalf("newCatfileReader: %v", err) + } + defer cr.Close() + + // Skip hello.txt by calling Next again without reading. + _, _, err = cr.Next() + if err != nil { + t.Fatalf("Next hello.txt: %v", err) + } + + // Skip large.bin too. + size, _, err := cr.Next() + if err != nil { + t.Fatalf("Next large.bin: %v", err) + } + if size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", size, 64*1024) } - if results[3].Size != 64*1024 { - t.Errorf("large.bin size = %d, want %d", results[3].Size, 64*1024) + + // Read binary.bin after skipping two entries. + size, _, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin: %v", err) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull binary.bin: %v", err) + } + if content[0] != 0x00 { + t.Errorf("binary.bin first byte = %x, want 0x00", content[0]) } } -func TestReadBlobsPipelined_WithMissing(t *testing.T) { +func TestCatfileReader_Missing(t *testing.T) { repoDir, blobs := createTestRepo(t) fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") @@ -121,18 +191,40 @@ func TestReadBlobsPipelined_WithMissing(t *testing.T) { blobs["empty.txt"], } - results, err := readBlobsPipelined(repoDir, ids) + cr, err := newCatfileReader(repoDir, ids) if err != nil { - t.Fatalf("readBlobsPipelined: %v", err) + t.Fatalf("newCatfileReader: %v", err) } + defer cr.Close() - if !results[1].Missing { - t.Errorf("expected result[1] to be missing") + // hello.txt — read normally. + size, missing, err := cr.Next() + if err != nil || missing { + t.Fatalf("Next hello.txt: err=%v missing=%v", err, missing) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull hello.txt: %v", err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt = %q", content) } - if string(results[0].Content) != "hello world\n" { - t.Errorf("hello.txt = %q", results[0].Content) + + // fakeHash — missing. + _, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next fakeHash: %v", err) + } + if !missing { + t.Error("expected fakeHash to be missing") + } + + // empty.txt — still works after missing entry. + size, missing, err = cr.Next() + if err != nil || missing { + t.Fatalf("Next empty.txt: err=%v missing=%v", err, missing) } - if len(results[2].Content) != 0 { - t.Errorf("empty.txt len = %d, want 0", len(results[2].Content)) + if size != 0 { + t.Errorf("empty.txt size = %d, want 0", size) } } diff --git a/gitindex/index.go b/gitindex/index.go index 77de23d1b..0d8bc6697 100644 --- a/gitindex/index.go +++ b/gitindex/index.go @@ -585,80 +585,96 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { sort.Strings(names) names = uniq(names) - // Build the ordered list of (name, key) pairs and collect blob SHAs for - // the main repo so we can read them all at once via git cat-file --batch. - type indexEntry struct { - key fileKey - blobIndex int // index into blobResults, or -1 for submodule blobs - } - entries := make([]indexEntry, 0, totalFiles) + // Separate main-repo keys from submodule keys, collecting blob SHAs + // for the main repo so we can stream them via git cat-file --batch. + mainRepoKeys := make([]fileKey, 0, totalFiles) mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) + var submoduleKeys []fileKey for _, name := range names { for _, key := range fileKeys[name] { if key.SubRepoPath == "" { - entries = append(entries, indexEntry{key: key, blobIndex: len(mainRepoIDs)}) + mainRepoKeys = append(mainRepoKeys, key) mainRepoIDs = append(mainRepoIDs, key.ID) } else { - entries = append(entries, indexEntry{key: key, blobIndex: -1}) + submoduleKeys = append(submoduleKeys, key) } } } - // Bulk-read all main-repo blobs via pipelined cat-file --batch --buffer. - var blobResults []blobResult + log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), len(submoduleKeys)) + + // Stream main-repo blobs via pipelined cat-file --batch --buffer. + // Large blobs are skipped without reading content into memory. if len(mainRepoIDs) > 0 { - var err error - blobResults, err = readBlobsPipelined(opts.RepoDir, mainRepoIDs) + cr, err := newCatfileReader(opts.RepoDir, mainRepoIDs) if err != nil { - return false, fmt.Errorf("readBlobsPipelined: %w", err) + return false, fmt.Errorf("newCatfileReader: %w", err) } - } - log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), totalFiles-len(mainRepoIDs)) - for idx, entry := range entries { - var doc index.Document - - if entry.blobIndex >= 0 { - // Main repo blob: use pipelined result. - r := blobResults[entry.blobIndex] - if r.Err != nil { - return false, fmt.Errorf("read blob %s: %w", r.ID, r.Err) + for idx, key := range mainRepoKeys { + size, missing, err := cr.Next() + if err != nil { + cr.Close() + return false, fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err) } - branches := repos[entry.key].Branches - if r.Missing { - doc = skippedLargeDoc(entry.key, branches) + branches := repos[key].Branches + var doc index.Document + + if missing { + doc = skippedLargeDoc(key, branches) } else { - keyFullPath := entry.key.FullPath() - if r.Size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { - doc = skippedLargeDoc(entry.key, branches) + keyFullPath := key.FullPath() + if size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { + // Skip without reading content into memory. + doc = skippedLargeDoc(key, branches) } else { + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + cr.Close() + return false, fmt.Errorf("read blob %s: %w", keyFullPath, err) + } doc = index.Document{ - SubRepositoryPath: entry.key.SubRepoPath, + SubRepositoryPath: key.SubRepoPath, Name: keyFullPath, - Content: r.Content, + Content: content, Branches: branches, } } } - } else { - // Submodule blob: fall back to go-git. - var err error - doc, err = createDocument(entry.key, repos, opts.BuildOptions) - if err != nil { - return false, err + + if err := builder.Add(doc); err != nil { + cr.Close() + return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) + } + + if idx%10_000 == 0 { + builder.CheckMemoryUsage() } } + if err := cr.Close(); err != nil { + return false, fmt.Errorf("close cat-file: %w", err) + } + } + + // Index submodule blobs via go-git. + for idx, key := range submoduleKeys { + doc, err := createDocument(key, repos, opts.BuildOptions) + if err != nil { + return false, err + } + if err := builder.Add(doc); err != nil { - return false, fmt.Errorf("error adding document with name %s: %w", entry.key.FullPath(), err) + return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) } if idx%10_000 == 0 { builder.CheckMemoryUsage() } } + return true, builder.Finish() } From e0d964bb27752fbecff6abb3ccc13056e0f0c84d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Tue, 24 Mar 2026 14:47:05 +0100 Subject: [PATCH 3/3] gitindex: harden catfileReader Close, add kill switch and SkipReasonMissing Address review feedback on PR #1021: - Make Close() idempotent via sync.Once; kill the git process first (matching Gitaly's pattern) instead of draining all remaining stdout, so early termination is fast. Suppress the expected SIGKILL exit error. Add defer close(writeErr) in the writer goroutine to prevent deadlock on double-close. - Change Next() return and pending field from int64 to int, use strconv.Atoi. Removes casts at all call sites; SizeMax is already int. - Add SkipReasonMissing for blobs that git cat-file reports as missing, instead of reusing SkipReasonTooLarge. Missing is unexpected for local repos (corruption, shallow clone, gc race) so log a warning. - Extract indexCatfileBlobs() with defer cr.Close(), eliminating four manual Close() calls on error paths. - Add ZOEKT_DISABLE_CATFILE_BATCH env var kill switch following the existing ZOEKT_DISABLE_GOGIT_OPTIMIZATION pattern. When set, all blobs fall back to the go-git createDocument path. - Deduplicate skippedLargeDoc/skippedMissingDoc into skippedDoc(reason). - Add 19 hardening tests covering Close lifecycle (double close, concurrent close, early termination), Read edge cases (partial reads, 1-byte buffer, empty blobs, read-without-next), missing object sequences, large blob byte precision, and duplicate SHAs. Benchmarked on kubernetes (29,188 files): no performance regression (geomean -0.89%, within noise). --- gitindex/catfile.go | 51 +- gitindex/catfile_hardening_test.go | 813 +++++++++++++++++++++++++++++ gitindex/index.go | 121 +++-- index/document.go | 3 + index/file_category.go | 7 +- 5 files changed, 932 insertions(+), 63 deletions(-) create mode 100644 gitindex/catfile_hardening_test.go diff --git a/gitindex/catfile.go b/gitindex/catfile.go index 89418fdcc..0c7c7753c 100644 --- a/gitindex/catfile.go +++ b/gitindex/catfile.go @@ -22,6 +22,8 @@ import ( "io" "os/exec" "strconv" + "sync" + "syscall" "github.com/go-git/go-git/v5/plumbing" ) @@ -56,7 +58,10 @@ type catfileReader struct { // pending tracks unread content bytes + trailing LF for the current // entry. Next() discards any pending bytes before reading the next header. - pending int64 + pending int + + closeOnce sync.Once + closeErr error } // newCatfileReader starts a "git cat-file --batch --buffer" process and feeds @@ -86,6 +91,7 @@ func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, erro // Writer goroutine: feed all SHAs then close stdin to trigger flush. writeErr := make(chan error, 1) go func() { + defer close(writeErr) defer stdin.Close() bw := bufio.NewWriterSize(stdin, 64*1024) var hexBuf [41]byte @@ -113,10 +119,10 @@ func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, erro // // After Next returns successfully with missing=false, call Read to consume the // blob content, or call Next again to skip it. -func (cr *catfileReader) Next() (size int64, missing bool, err error) { +func (cr *catfileReader) Next() (size int, missing bool, err error) { // Discard unread content from the previous entry. if cr.pending > 0 { - if _, err := cr.reader.Discard(int(cr.pending)); err != nil { + if _, err := cr.reader.Discard(cr.pending); err != nil { return 0, false, fmt.Errorf("discard pending bytes: %w", err) } cr.pending = 0 @@ -140,7 +146,7 @@ func (cr *catfileReader) Next() (size int64, missing bool, err error) { if lastSpace == -1 { return 0, false, fmt.Errorf("unexpected header: %q", header) } - size, err = strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) + size, err = strconv.Atoi(string(header[lastSpace+1:])) if err != nil { return 0, false, fmt.Errorf("parse size from %q: %w", header, err) } @@ -170,11 +176,11 @@ func (cr *catfileReader) Read(p []byte) (int, error) { } // Limit the read to the remaining content bytes. - if int64(len(p)) > contentRemaining { + if len(p) > contentRemaining { p = p[:contentRemaining] } n, err := cr.reader.Read(p) - cr.pending -= int64(n) + cr.pending -= n if err != nil { return n, err } @@ -191,10 +197,33 @@ func (cr *catfileReader) Read(p []byte) (int, error) { } // Close shuts down the cat-file process and waits for it to exit. +// It is safe to call Close multiple times or concurrently. func (cr *catfileReader) Close() error { - // Drain stdout so git can flush and exit without blocking. - _, _ = io.Copy(io.Discard, cr.reader) - // Wait for writer goroutine. - <-cr.writeErr - return cr.cmd.Wait() + cr.closeOnce.Do(func() { + // Kill first to avoid blocking on drain when there are many + // unconsumed entries. Gitaly uses the same kill-first pattern. + _ = cr.cmd.Process.Kill() + // Drain any buffered stdout so the pipe closes cleanly. + // Must complete before cmd.Wait(), which closes the pipe. + _, _ = io.Copy(io.Discard, cr.reader) + // Wait for writer goroutine (unblocks via broken pipe from Kill). + <-cr.writeErr + err := cr.cmd.Wait() + // Suppress the expected "signal: killed" error from our own Kill(). + if isKilledErr(err) { + err = nil + } + cr.closeErr = err + }) + return cr.closeErr +} + +// isKilledErr reports whether err is an exec.ExitError caused by SIGKILL. +func isKilledErr(err error) bool { + exitErr, ok := err.(*exec.ExitError) + if !ok { + return false + } + ws, ok := exitErr.Sys().(syscall.WaitStatus) + return ok && ws.Signal() == syscall.SIGKILL } diff --git a/gitindex/catfile_hardening_test.go b/gitindex/catfile_hardening_test.go new file mode 100644 index 000000000..06a730a25 --- /dev/null +++ b/gitindex/catfile_hardening_test.go @@ -0,0 +1,813 @@ +package gitindex + +import ( + "bytes" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/go-git/go-git/v5/plumbing" +) + +// --- Close lifecycle tests --- + +// TestCatfileReader_DoubleClose verifies that Close is idempotent. +// Calling Close twice must not deadlock or panic. +func TestCatfileReader_DoubleClose(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["hello.txt"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + + // Consume the entry so the process can exit cleanly. + if _, _, err := cr.Next(); err != nil { + t.Fatal(err) + } + + if err := cr.Close(); err != nil { + t.Fatalf("first Close: %v", err) + } + + // Second Close must not deadlock or panic. + done := make(chan error, 1) + go func() { + done <- cr.Close() + }() + + select { + case <-done: + // Success — whether err is nil or not, it didn't block. + case <-time.After(5 * time.Second): + t.Fatal("second Close() deadlocked — writeErr channel was never closed") + } +} + +// TestCatfileReader_ConcurrentClose verifies that calling Close from +// multiple goroutines simultaneously does not panic, deadlock, or +// corrupt state. +func TestCatfileReader_ConcurrentClose(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["large.bin"], + blobs["binary.bin"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + + // Read one entry, leave two unconsumed. + if _, _, err := cr.Next(); err != nil { + t.Fatal(err) + } + + const goroutines = 5 + var wg sync.WaitGroup + wg.Add(goroutines) + barrier := make(chan struct{}) + + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + <-barrier // all start at once + cr.Close() + }() + } + + done := make(chan struct{}) + go func() { + close(barrier) + wg.Wait() + close(done) + }() + + select { + case <-done: + // All goroutines returned. + case <-time.After(10 * time.Second): + t.Fatal("concurrent Close() deadlocked") + } +} + +// TestCatfileReader_CloseWithoutReading verifies that closing +// immediately after creation (without reading any entries) completes +// without hanging. +func TestCatfileReader_CloseWithoutReading(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["large.bin"], + blobs["binary.bin"], + blobs["empty.txt"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { + done <- cr.Close() + }() + + select { + case err := <-done: + if err != nil { + t.Fatalf("Close: %v", err) + } + case <-time.After(10 * time.Second): + t.Fatal("Close() without reading any entries hung") + } +} + +// TestCatfileReader_CloseBeforeExhausted_ManyBlobs simulates early +// termination (e.g., builder.Add error) with many unconsumed blobs. +// Close should complete promptly — not drain the entire git output. +func TestCatfileReader_CloseBeforeExhausted_ManyBlobs(t *testing.T) { + // Create a repo with many non-trivial files. + dir := t.TempDir() + repoDir := filepath.Join(dir, "repo") + + script := ` +set -e +git init -b main repo +cd repo +git config user.email "test@test.com" +git config user.name "Test" +for i in $(seq 1 200); do + dd if=/dev/urandom bs=1024 count=10 of="file_$i.bin" 2>/dev/null +done +git add -A +git commit -m "many files" +` + cmd := exec.Command("/bin/sh", "-c", script) + cmd.Dir = dir + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + t.Fatalf("create test repo: %v", err) + } + + var ids []plumbing.Hash + for i := 1; i <= 200; i++ { + name := fmt.Sprintf("file_%d.bin", i) + out, err := exec.Command("git", "-C", repoDir, "rev-parse", "HEAD:"+name).Output() + if err != nil { + t.Fatalf("rev-parse %s: %v", name, err) + } + ids = append(ids, plumbing.NewHash(string(out[:len(out)-1]))) + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + + // Read only 1 of 200 entries. + if _, _, err := cr.Next(); err != nil { + t.Fatal(err) + } + + // Close should be fast (kill, not drain). With drain it still works but + // is slow — we enforce a generous bound. + start := time.Now() + done := make(chan error, 1) + go func() { + done <- cr.Close() + }() + + select { + case <-done: + elapsed := time.Since(start) + // With Kill: sub-millisecond. Draining 200×10KB is fast too, so we + // use a generous 3s bound that still catches pathological stalls. + if elapsed > 3*time.Second { + t.Errorf("Close took %v after reading 1 of 200 entries — consider killing instead of draining", elapsed) + } + case <-time.After(30 * time.Second): + t.Fatal("Close() deadlocked with many unconsumed blobs") + } +} + +// --- Read edge-case tests --- + +// TestCatfileReader_ReadWithoutNext verifies that calling Read +// before calling Next returns io.EOF, not a panic or garbage data. +func TestCatfileReader_ReadWithoutNext(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["hello.txt"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + buf := make([]byte, 10) + n, err := cr.Read(buf) + if n != 0 || err != io.EOF { + t.Fatalf("Read without Next: n=%d err=%v, want n=0 err=io.EOF", n, err) + } +} + +// TestCatfileReader_ReadAfterFullConsumption verifies that extra Read +// calls after a blob is fully consumed return io.EOF, not duplicate +// data or trailing LF bytes. +func TestCatfileReader_ReadAfterFullConsumption(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["hello.txt"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + size, _, _ := cr.Next() + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + + // Blob is fully read — additional Reads must return EOF. + for i := 0; i < 3; i++ { + buf := make([]byte, 10) + n, err := cr.Read(buf) + if n != 0 || err != io.EOF { + t.Fatalf("Read #%d after full consumption: n=%d err=%v, want n=0 err=io.EOF", i, n, err) + } + } +} + +// TestCatfileReader_SmallBufferReads reads a blob one byte at a time +// and verifies the entire content is reconstructed correctly without +// any trailing LF leaking into user content. +func TestCatfileReader_SmallBufferReads(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["hello.txt"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + size, _, _ := cr.Next() + + var result []byte + buf := make([]byte, 1) + for { + n, err := cr.Read(buf) + if n > 0 { + result = append(result, buf[:n]...) + } + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + } + + if len(result) != size { + t.Fatalf("read %d bytes, want %d", len(result), size) + } + if string(result) != "hello world\n" { + t.Errorf("content = %q, want %q", result, "hello world\n") + } +} + +// TestCatfileReader_PartialReadThenNext reads only part of a blob's +// content, then advances to the next entry. Verifies that the discard +// of pending bytes doesn't corrupt the stream. +func TestCatfileReader_PartialReadThenNext(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{ + blobs["hello.txt"], // 12 bytes: "hello world\n" + blobs["binary.bin"], // variable, starts with 0x00 + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + // Read only 5 of 12 bytes from hello.txt. + size, _, _ := cr.Next() + if size != 12 { + t.Fatalf("hello.txt size = %d, want 12", size) + } + partial := make([]byte, 5) + if _, err := io.ReadFull(cr, partial); err != nil { + t.Fatal(err) + } + if string(partial) != "hello" { + t.Fatalf("partial = %q, want %q", partial, "hello") + } + + // Advance — must discard remaining 7 content bytes + trailing LF. + size, _, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin after partial read: %v", err) + } + + // Verify binary.bin content is intact. + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + if content[0] != 0x00 { + t.Errorf("binary.bin first byte = 0x%02x after partial-read skip, want 0x00", content[0]) + } +} + +// TestCatfileReader_PartialReadExactlyOneByteShort reads size-1 bytes +// from a blob. The pending field should be exactly 2 (1 content byte + +// 1 trailing LF). This stresses the boundary between content and LF +// in the discard path. +func TestCatfileReader_PartialReadExactlyOneByteShort(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{ + blobs["hello.txt"], // 12 bytes + blobs["binary.bin"], // starts with 0x00 + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + size, _, _ := cr.Next() + // Read exactly size-1 bytes — leaves 1 content byte + trailing LF. + buf := make([]byte, size-1) + if _, err := io.ReadFull(cr, buf); err != nil { + t.Fatal(err) + } + if string(buf) != "hello world" { // missing final \n + t.Fatalf("partial = %q", buf) + } + + // Advance — pending should be 2 (1 content byte + 1 LF). The + // Discard call must handle this exact boundary correctly. + size, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next after size-1 partial read: %v", err) + } + if missing { + t.Fatal("binary.bin unexpectedly missing") + } + + // Read binary.bin to verify stream integrity. + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + if content[0] != 0x00 { + t.Errorf("binary.bin[0] = 0x%02x after boundary skip, want 0x00", content[0]) + } +} + +// --- Empty / degenerate input tests --- + +// TestCatfileReader_EmptyIds verifies that an empty id slice produces +// immediate EOF without errors. +func TestCatfileReader_EmptyIds(t *testing.T) { + repoDir, _ := createTestRepo(t) + + cr, err := newCatfileReader(repoDir, nil) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("expected io.EOF for empty ids, got %v", err) + } +} + +// TestCatfileReader_MultipleEmptyBlobs stresses the trailing-LF +// handling for size-0 blobs. Git still outputs a LF after a 0-byte +// blob body. Repeated empty blobs test the pending=1 discard path. +func TestCatfileReader_MultipleEmptyBlobs(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + // Send the empty blob SHA 5 times — git outputs each independently. + emptyID := blobs["empty.txt"] + ids := []plumbing.Hash{emptyID, emptyID, emptyID, emptyID, emptyID} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + for i := range ids { + size, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next #%d: %v", i, err) + } + if missing { + t.Fatalf("#%d unexpectedly missing", i) + } + if size != 0 { + t.Fatalf("#%d size = %d, want 0", i, size) + } + // Don't read — Next should discard the trailing LF for us. + } + + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("expected EOF after %d empty blobs, got %v", len(ids), err) + } +} + +// TestCatfileReader_EmptyBlobRead verifies that reading a 0-byte blob +// through the io.Reader interface returns 0 bytes and io.EOF, and that +// the trailing LF is consumed transparently. +func TestCatfileReader_EmptyBlobRead(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{ + blobs["empty.txt"], // 0 bytes + blobs["hello.txt"], // 12 bytes — sentinel + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + size, _, _ := cr.Next() + if size != 0 { + t.Fatalf("empty.txt size = %d", size) + } + + // Explicitly Read on the 0-byte blob. + buf := make([]byte, 10) + n, err := cr.Read(buf) + if n != 0 || err != io.EOF { + t.Fatalf("Read empty blob: n=%d err=%v, want n=0 err=io.EOF", n, err) + } + + // The trailing LF must have been consumed. Verify by reading the + // next entry — if the LF leaked, the header parse would fail. + size, _, err = cr.Next() + if err != nil { + t.Fatalf("Next hello.txt after empty blob Read: %v", err) + } + if size != 12 { + t.Fatalf("hello.txt size = %d, want 12", size) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt = %q", content) + } +} + +// --- Missing object edge cases --- + +// TestCatfileReader_AllMissing verifies that a sequence of entirely +// missing objects is handled gracefully — no errors, no panics, just +// missing=true for each followed by EOF. +func TestCatfileReader_AllMissing(t *testing.T) { + repoDir, _ := createTestRepo(t) + + ids := []plumbing.Hash{ + plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), + plumbing.NewHash("1111111111111111111111111111111111111111"), + plumbing.NewHash("2222222222222222222222222222222222222222"), + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + for i, id := range ids { + _, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next #%d (%s): %v", i, id, err) + } + if !missing { + t.Errorf("expected #%d (%s) to be missing", i, id) + } + } + + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("expected EOF after all missing, got %v", err) + } +} + +// TestCatfileReader_AlternatingMissingPresent interleaves missing and +// present objects, verifying that stream alignment is maintained. +func TestCatfileReader_AlternatingMissingPresent(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + fake1 := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + fake2 := plumbing.NewHash("1111111111111111111111111111111111111111") + + ids := []plumbing.Hash{ + fake1, + blobs["hello.txt"], + fake2, + blobs["empty.txt"], + blobs["binary.bin"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + // fake1 — missing + _, missing, err := cr.Next() + if err != nil || !missing { + t.Fatalf("fake1: err=%v missing=%v", err, missing) + } + + // hello.txt — present, read it + size, missing, err := cr.Next() + if err != nil || missing { + t.Fatalf("hello.txt: err=%v missing=%v", err, missing) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt = %q", content) + } + + // fake2 — missing + _, missing, err = cr.Next() + if err != nil || !missing { + t.Fatalf("fake2: err=%v missing=%v", err, missing) + } + + // empty.txt — present, skip it + size, missing, err = cr.Next() + if err != nil || missing { + t.Fatalf("empty.txt: err=%v missing=%v", err, missing) + } + if size != 0 { + t.Errorf("empty.txt size = %d", size) + } + + // binary.bin — present, read it + size, missing, err = cr.Next() + if err != nil || missing { + t.Fatalf("binary.bin: err=%v missing=%v", err, missing) + } + binContent := make([]byte, size) + if _, err := io.ReadFull(cr, binContent); err != nil { + t.Fatal(err) + } + if binContent[0] != 0x00 { + t.Errorf("binary.bin[0] = 0x%02x, want 0x00", binContent[0]) + } + + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("expected EOF, got %v", err) + } +} + +// TestCatfileReader_MissingThenSkip verifies that a missing object +// followed by a present but skipped (unread) object doesn't corrupt +// the stream. Missing objects have no content body, so there must be +// no stale pending bytes interfering with the next header read. +func TestCatfileReader_MissingThenSkip(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + fake := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + ids := []plumbing.Hash{ + fake, + blobs["large.bin"], // 64KB — skip without reading + blobs["hello.txt"], // sentinel — read to verify integrity + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + // missing + _, missing, _ := cr.Next() + if !missing { + t.Fatal("expected missing") + } + + // large.bin — skip + size, missing, err := cr.Next() + if err != nil || missing { + t.Fatalf("large.bin: err=%v missing=%v", err, missing) + } + if size != 64*1024 { + t.Fatalf("large.bin size = %d", size) + } + // deliberately don't read + + // hello.txt — read after missing+skip + size, missing, err = cr.Next() + if err != nil || missing { + t.Fatalf("hello.txt: err=%v missing=%v", err, missing) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt = %q", content) + } +} + +// --- Next() edge cases --- + +// TestCatfileReader_RepeatedNextAfterEOF verifies that calling Next +// after EOF keeps returning EOF — not a panic, not a different error. +func TestCatfileReader_RepeatedNextAfterEOF(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["hello.txt"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + // Consume and skip the only entry. + if _, _, err := cr.Next(); err != nil { + t.Fatal(err) + } + + // First EOF. + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("first post-exhaust Next: %v, want io.EOF", err) + } + + // Second and third EOF — must be stable. + for i := 0; i < 2; i++ { + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("Next #%d after EOF: %v, want io.EOF", i+2, err) + } + } +} + +// --- Large blob precision tests --- + +// TestCatfileReader_LargeBlobBytePrecision verifies that a 64KB blob +// is read with byte-exact precision — no off-by-one from trailing LF +// handling, no truncation, no extra bytes. +func TestCatfileReader_LargeBlobBytePrecision(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["large.bin"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + size, _, err := cr.Next() + if err != nil { + t.Fatal(err) + } + if size != 64*1024 { + t.Fatalf("size = %d, want %d", size, 64*1024) + } + + // Read the full blob content. + content := make([]byte, size) + n, err := io.ReadFull(cr, content) + if err != nil { + t.Fatalf("ReadFull: %v (read %d of %d)", err, n, size) + } + if n != size { + t.Fatalf("read %d bytes, want %d", n, size) + } + + // Verify git agrees on the content via cat-file -p. + expected, err := exec.Command("git", "-C", repoDir, "cat-file", "-p", blobs["large.bin"].String()).Output() + if err != nil { + t.Fatalf("git cat-file -p: %v", err) + } + if !bytes.Equal(content, expected) { + t.Errorf("content mismatch: got %d bytes, git says %d bytes", len(content), len(expected)) + // Find first divergence. + for i := range content { + if i >= len(expected) || content[i] != expected[i] { + t.Errorf("first diff at byte %d: got 0x%02x, want 0x%02x", i, content[i], expected[i]) + break + } + } + } +} + +// TestCatfileReader_LargeBlobChunkedRead reads a 64KB blob in 997-byte +// chunks (a prime number that doesn't align with any power-of-2 buffer) +// to verify no byte is lost or duplicated across read boundaries. +func TestCatfileReader_LargeBlobChunkedRead(t *testing.T) { + repoDir, blobs := createTestRepo(t) + ids := []plumbing.Hash{blobs["large.bin"]} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + size, _, _ := cr.Next() + if size != 64*1024 { + t.Fatalf("size = %d", size) + } + + var result bytes.Buffer + buf := make([]byte, 997) // prime-sized chunks + for { + n, err := cr.Read(buf) + if n > 0 { + result.Write(buf[:n]) + } + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + } + + if result.Len() != size { + t.Fatalf("total read = %d, want %d", result.Len(), size) + } + + // Cross-check with git. + expected, _ := exec.Command("git", "-C", repoDir, "cat-file", "-p", blobs["large.bin"].String()).Output() + if !bytes.Equal(result.Bytes(), expected) { + t.Error("chunked read content differs from git cat-file -p output") + } +} + +// --- Duplicate SHA test --- + +// TestCatfileReader_DuplicateSHAs verifies that requesting the same +// SHA multiple times works — git cat-file --batch outputs the object +// for each request independently. +func TestCatfileReader_DuplicateSHAs(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + sha := blobs["hello.txt"] + ids := []plumbing.Hash{sha, sha, sha} + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatal(err) + } + defer cr.Close() + + for i := 0; i < 3; i++ { + size, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next #%d: %v", i, err) + } + if missing { + t.Fatalf("#%d unexpectedly missing", i) + } + if size != 12 { + t.Fatalf("#%d size = %d, want 12", i, size) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatal(err) + } + if string(content) != "hello world\n" { + t.Errorf("#%d content = %q", i, content) + } + } + + _, _, err = cr.Next() + if err != io.EOF { + t.Fatalf("expected EOF, got %v", err) + } +} diff --git a/gitindex/index.go b/gitindex/index.go index 0d8bc6697..595df28cb 100644 --- a/gitindex/index.go +++ b/gitindex/index.go @@ -587,13 +587,22 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { // Separate main-repo keys from submodule keys, collecting blob SHAs // for the main repo so we can stream them via git cat-file --batch. + // ZOEKT_DISABLE_CATFILE_BATCH=true falls back to the go-git path for + // all files, useful as a kill switch if the cat-file path causes issues. + catfileBatchDisabled := cmp.Or(os.Getenv("ZOEKT_DISABLE_CATFILE_BATCH"), "false") + useCatfileBatch := true + if disabled, _ := strconv.ParseBool(catfileBatchDisabled); disabled { + useCatfileBatch = false + log.Printf("cat-file batch disabled via ZOEKT_DISABLE_CATFILE_BATCH, using go-git") + } + mainRepoKeys := make([]fileKey, 0, totalFiles) mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) var submoduleKeys []fileKey for _, name := range names { for _, key := range fileKeys[name] { - if key.SubRepoPath == "" { + if useCatfileBatch && key.SubRepoPath == "" { mainRepoKeys = append(mainRepoKeys, key) mainRepoIDs = append(mainRepoIDs, key.ID) } else { @@ -612,50 +621,8 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { return false, fmt.Errorf("newCatfileReader: %w", err) } - for idx, key := range mainRepoKeys { - size, missing, err := cr.Next() - if err != nil { - cr.Close() - return false, fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err) - } - - branches := repos[key].Branches - var doc index.Document - - if missing { - doc = skippedLargeDoc(key, branches) - } else { - keyFullPath := key.FullPath() - if size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { - // Skip without reading content into memory. - doc = skippedLargeDoc(key, branches) - } else { - content := make([]byte, size) - if _, err := io.ReadFull(cr, content); err != nil { - cr.Close() - return false, fmt.Errorf("read blob %s: %w", keyFullPath, err) - } - doc = index.Document{ - SubRepositoryPath: key.SubRepoPath, - Name: keyFullPath, - Content: content, - Branches: branches, - } - } - } - - if err := builder.Add(doc); err != nil { - cr.Close() - return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) - } - - if idx%10_000 == 0 { - builder.CheckMemoryUsage() - } - } - - if err := cr.Close(); err != nil { - return false, fmt.Errorf("close cat-file: %w", err) + if err := indexCatfileBlobs(cr, mainRepoKeys, repos, opts, builder); err != nil { + return false, err } } @@ -678,6 +645,61 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { return true, builder.Finish() } +// indexCatfileBlobs streams main-repo blobs from the catfileReader into the +// builder. Large blobs are skipped without reading content into memory. +// keys must correspond 1:1 (in order) with the ids passed to newCatfileReader. +// The reader is always closed when this function returns. +func indexCatfileBlobs(cr *catfileReader, keys []fileKey, repos map[fileKey]BlobLocation, opts Options, builder *index.Builder) error { + defer cr.Close() + + for idx, key := range keys { + size, missing, err := cr.Next() + if err != nil { + return fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err) + } + + branches := repos[key].Branches + var doc index.Document + + if missing { + // Unexpected for local repos — may indicate corruption, shallow + // clone, or a race with git gc. Log a warning and skip. + log.Printf("warning: blob %s missing for %s", key.ID, key.FullPath()) + doc = skippedDoc(key, branches, index.SkipReasonMissing) + } else { + keyFullPath := key.FullPath() + if size > opts.BuildOptions.SizeMax && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { + // Skip without reading content into memory. + doc = skippedDoc(key, branches, index.SkipReasonTooLarge) + } else { + // Pre-allocate and read the full blob content in one call. + // io.ReadFull is preferred over io.LimitedReader here as it + // avoids the intermediate allocation and the size is known. + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + return fmt.Errorf("read blob %s: %w", keyFullPath, err) + } + doc = index.Document{ + SubRepositoryPath: key.SubRepoPath, + Name: keyFullPath, + Content: content, + Branches: branches, + } + } + } + + if err := builder.Add(doc); err != nil { + return fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) + } + + if idx%10_000 == 0 { + builder.CheckMemoryUsage() + } + } + + return nil +} + // openRepo opens a git repository in a way that's optimized for indexing. // // It copies the relevant logic from git.PlainOpen, and tweaks certain filesystem options. @@ -1058,7 +1080,7 @@ func createDocument(key fileKey, // We filter out large documents when fetching the repo. So if an object is too large, it will not be found. if errors.Is(err, plumbing.ErrObjectNotFound) { - return skippedLargeDoc(key, branches), nil + return skippedDoc(key, branches, index.SkipReasonTooLarge), nil } if err != nil { @@ -1067,7 +1089,7 @@ func createDocument(key fileKey, keyFullPath := key.FullPath() if blob.Size > int64(opts.SizeMax) && !opts.IgnoreSizeMax(keyFullPath) { - return skippedLargeDoc(key, branches), nil + return skippedDoc(key, branches, index.SkipReasonTooLarge), nil } contents, err := blobContents(blob) @@ -1083,9 +1105,10 @@ func createDocument(key fileKey, }, nil } -func skippedLargeDoc(key fileKey, branches []string) index.Document { +// skippedDoc creates a Document placeholder for a blob that was not indexed. +func skippedDoc(key fileKey, branches []string, reason index.SkipReason) index.Document { return index.Document{ - SkipReason: index.SkipReasonTooLarge, + SkipReason: reason, Name: key.FullPath(), Branches: branches, SubRepositoryPath: key.SubRepoPath, diff --git a/index/document.go b/index/document.go index 68a5b1e98..25efa461e 100644 --- a/index/document.go +++ b/index/document.go @@ -26,6 +26,7 @@ const ( SkipReasonTooSmall SkipReasonBinary SkipReasonTooManyTrigrams + SkipReasonMissing ) func (s SkipReason) explanation() string { @@ -40,6 +41,8 @@ func (s SkipReason) explanation() string { return "contains binary content" case SkipReasonTooManyTrigrams: return "contains too many trigrams" + case SkipReasonMissing: + return "object missing from repository" default: return "unknown skip reason" } diff --git a/index/file_category.go b/index/file_category.go index 6ca4800af..fa365ccd0 100644 --- a/index/file_category.go +++ b/index/file_category.go @@ -35,9 +35,10 @@ func DetermineFileCategory(doc *Document) { name := doc.Name content := doc.Content - // If this document was skipped because it was too large, just guess the category based on the filename to avoid - // examining the contents. Note: passing nil content is allowed by the go-enry contract. - if doc.SkipReason == SkipReasonTooLarge || doc.SkipReason == SkipReasonBinary { + // If this document was skipped (too large, binary, or missing from the repo), + // guess the category based on the filename to avoid examining the contents. + // Note: passing nil content is allowed by the go-enry contract. + if doc.SkipReason == SkipReasonTooLarge || doc.SkipReason == SkipReasonBinary || doc.SkipReason == SkipReasonMissing { content = nil }