From 3706b08a4fc79bc72ac5956d2d9d06c3f91ab6bb Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Thu, 29 Jan 2026 12:19:42 +0100 Subject: [PATCH 1/3] tar/asm: add NewInputTarStreamWithDone + tests Refactor tar disassembly into a shared `runInputTarStream`, `runInputTarStreamGoroutine` and `newInputTarStreamCommon` setup helper. Add `NewInputTarStreamWithDone`, which returns an `io.ReadCloser` together with a `done` channel that is signaled when the internal goroutine finishes (or fails), including the final padding-draining phase. Preserve existing `NewInputTarStream` behavior and API. Add unit tests covering: - successful full reads - early consumer close - packer error propagation - underlying reader failure while the tar-split goroutine is still running Motivation: callers need a reliable way to (1) abort consumption when they fail early and (2) block until the background goroutine has terminated so the underlying input reader can be safely released/reused. The wrapper guarantees the protocol (close pipe + send exactly one done value) even on panics, while preserving prompt termination when the consumer closes early. Relates: https://github.com/containers/container-libs/issues/148 Signed-off-by: Jan Kaluza --- tar/asm/disassemble.go | 320 ++++++++++++++++++++------------ tar/asm/disassemble_test.go | 359 ++++++++++++++++++++++++++++++++++++ 2 files changed, 560 insertions(+), 119 deletions(-) diff --git a/tar/asm/disassemble.go b/tar/asm/disassemble.go index 80c2522..727c8ef 100644 --- a/tar/asm/disassemble.go +++ b/tar/asm/disassemble.go @@ -1,23 +1,168 @@ package asm import ( + "errors" "io" "github.com/vbatts/tar-split/archive/tar" "github.com/vbatts/tar-split/tar/storage" ) -// NewInputTarStream wraps the Reader stream of a tar archive and provides a -// Reader stream of the same. +// runInputTarStreamGoroutine is the goroutine entrypoint. // -// In the middle it will pack the segments and file metadata to storage.Packer -// `p`. +// It centralizes the goroutine protocol so the core parsing logic can be +// written as ordinary Go code that just "returns an error". // -// The the storage.FilePutter is where payload of files in the stream are -// stashed. If this stashing is not needed, you can provide a nil -// storage.FilePutter. Since the checksumming is still needed, then a default -// of NewDiscardFilePutter will be used internally -func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { +// Protocol guarantees: +// - pW is always closed exactly once (CloseWithError(nil) == Close()). +// - if done != nil, exactly one value is sent (nil on success, non-nil on failure). +// - panics are converted into a non-nil error (and the panic is rethrown). +func runInputTarStreamGoroutine(outputRdr io.Reader, pW *io.PipeWriter, p storage.Packer, fp storage.FilePutter, done chan<- error) { + // Default to a non-nil error so a panic can't accidentally look like success. + err := errors.New("panic in runInputTarStream") + defer func() { + // CloseWithError(nil) is equivalent to Close(). + pW.CloseWithError(err) + + if done != nil { + done <- err + } + + // Preserve panic semantics while still ensuring the protocol above runs. + if r := recover(); r != nil { + panic(r) + } + }() + + err = runInputTarStream(outputRdr, p, fp) +} + +// runInputTarStream drives tar-split parsing. +// +// It reads a tar stream from outputRdr and records tar-split metadata into the +// provided storage.Packer. +// +// Abort behavior: if the consumer closes the read end early, the tee reader will +// stop producing bytes (due to pipe write failure) and tar parsing will return +// an error. We propagate that error so the goroutine terminates promptly rather +// than draining the input stream for no benefit. +func runInputTarStream(outputRdr io.Reader, p storage.Packer, fp storage.FilePutter) error { + tr := tar.NewReader(outputRdr) + tr.RawAccounting = true + + for { + hdr, err := tr.Next() + if err != nil { + if err != io.EOF { + return err + } + // Even when EOF is reached, there is often 1024 null bytes at the end + // of an archive. Collect them too. + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err + } + } + break // Not return: we still need to drain any additional padding. + } + if hdr == nil { + break // Not return: we still need to drain any additional padding. + } + + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err + } + } + + var csum []byte + if hdr.Size > 0 { + _, csum, err = fp.Put(hdr.Name, tr) + if err != nil { + return err + } + } + + entry := storage.Entry{ + Type: storage.FileType, + Size: hdr.Size, + Payload: csum, + } + // For proper marshalling of non-utf8 characters + entry.SetName(hdr.Name) + + // File entries added, regardless of size + if _, err := p.AddEntry(entry); err != nil { + return err + } + + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err + } + } + } + + // It is allowable, and not uncommon that there is further padding on + // the end of an archive, apart from the expected 1024 null bytes. We + // do this in chunks rather than in one go to avoid cases where a + // maliciously crafted tar file tries to trick us into reading many GBs + // into memory. + const paddingChunkSize = 1024 * 1024 + var paddingChunk [paddingChunkSize]byte + for { + n, err := outputRdr.Read(paddingChunk[:]) + if n != 0 { + if _, aerr := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: paddingChunk[:n], + }); aerr != nil { + return aerr + } + } + if err != nil { + if err == io.EOF { + break + } + return err + } + } + + return nil +} + +// newInputTarStreamCommon sets up the shared plumbing for NewInputTarStream and +// NewInputTarStreamWithDone. +// +// It constructs an io.Pipe and an io.TeeReader such that: +// +// - The caller reads tar bytes from the returned *io.PipeReader. +// - The background goroutine simultaneously reads the same stream from the +// TeeReader to perform tar-split parsing and metadata packing. +// +// Abort and synchronization semantics: +// +// - Closing the returned PipeReader causes the TeeReader to fail its write to +// the pipe, which in turn causes the background goroutine to exit promptly. +// - If withDone is true, a done channel is returned that receives exactly one +// error value (nil on success) once the background goroutine has fully +// terminated. This allows callers to safely wait until the input reader `r` +// is no longer in use. +func newInputTarStreamCommon( + r io.Reader, + p storage.Packer, + fp storage.FilePutter, + withDone bool, +) (pr *io.PipeReader, done <-chan error) { // What to do here... folks will want their own access to the Reader that is // their tar archive stream, but we'll need that same stream to use our // forked 'archive/tar'. @@ -34,123 +179,60 @@ func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io // only read what the outputRdr Read's. Since Tar archives have padding on // the end, we want to be the one reading the padding, even if the user's // `archive/tar` doesn't care. - pR, pW := io.Pipe() - outputRdr := io.TeeReader(r, pW) + pr, pw := io.Pipe() - // we need a putter that will generate the crc64 sums of file payloads + // We need a putter that will generate the crc64 sums of file payloads. if fp == nil { fp = storage.NewDiscardFilePutter() } - go func() { - tr := tar.NewReader(outputRdr) - tr.RawAccounting = true - for { - hdr, err := tr.Next() - if err != nil { - if err != io.EOF { - pW.CloseWithError(err) - return - } - // even when an EOF is reached, there is often 1024 null bytes on - // the end of an archive. Collect them too. - if b := tr.RawBytes(); len(b) > 0 { - _, err := p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - break // not return. We need the end of the reader. - } - if hdr == nil { - break // not return. We need the end of the reader. - } - - if b := tr.RawBytes(); len(b) > 0 { - _, err := p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } + outputRdr := io.TeeReader(r, pw) - var csum []byte - if hdr.Size > 0 { - var err error - _, csum, err = fp.Put(hdr.Name, tr) - if err != nil { - pW.CloseWithError(err) - return - } - } - - entry := storage.Entry{ - Type: storage.FileType, - Size: hdr.Size, - Payload: csum, - } - // For proper marshalling of non-utf8 characters - entry.SetName(hdr.Name) - - // File entries added, regardless of size - _, err = p.AddEntry(entry) - if err != nil { - pW.CloseWithError(err) - return - } + if withDone { + ch := make(chan error, 1) + done = ch + go runInputTarStreamGoroutine(outputRdr, pw, p, fp, ch) + return pr, done + } - if b := tr.RawBytes(); len(b) > 0 { - _, err = p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - } + go runInputTarStreamGoroutine(outputRdr, pw, p, fp, nil) + return pr, nil +} - // It is allowable, and not uncommon that there is further padding on - // the end of an archive, apart from the expected 1024 null bytes. We - // do this in chunks rather than in one go to avoid cases where a - // maliciously crafted tar file tries to trick us into reading many GBs - // into memory. - const paddingChunkSize = 1024 * 1024 - var paddingChunk [paddingChunkSize]byte - for { - var isEOF bool - n, err := outputRdr.Read(paddingChunk[:]) - if err != nil { - if err != io.EOF { - pW.CloseWithError(err) - return - } - isEOF = true - } - if n != 0 { - _, err = p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: paddingChunk[:n], - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - if isEOF { - break - } - } - pW.Close() - }() +// NewInputTarStream wraps the Reader stream of a tar archive and provides a +// Reader stream of the same. +// +// In the middle it will pack the segments and file metadata to storage.Packer +// `p`. +// +// The storage.FilePutter is where payload of files in the stream are +// stashed. If this stashing is not needed, you can provide a nil +// storage.FilePutter. Since the checksumming is still needed, then a default +// of NewDiscardFilePutter will be used internally +// +// If callers need to be able to abort early and/or wait for goroutine termination, +// prefer NewInputTarStreamWithDone. +// +// Deprecated: Use NewInputTarStreamWithDone instead. +func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { + pr, _ := newInputTarStreamCommon(r, p, fp, false) + return pr, nil +} - return pR, nil +// NewInputTarStreamWithDone wraps the Reader stream of a tar archive and provides a +// Reader stream of the same. +// +// In the middle it will pack the segments and file metadata to storage.Packer `p`. +// +// It also returns a done channel that will receive exactly one error value +// (nil on success) when the internal goroutine has fully completed parsing +// the tar stream (including the final paddingChunk draining loop) and has +// finished writing all entries to `p`. +// +// The returned reader is an io.ReadCloser so callers can stop early; closing it +// aborts the pipe so the internal goroutine can terminate promptly (rather than +// hanging on a blocked pipe write). +func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) { + pr, done := newInputTarStreamCommon(r, p, fp, true) + return pr, done, nil } diff --git a/tar/asm/disassemble_test.go b/tar/asm/disassemble_test.go index 84a3e77..2d3bcb0 100644 --- a/tar/asm/disassemble_test.go +++ b/tar/asm/disassemble_test.go @@ -2,10 +2,15 @@ package asm import ( "archive/tar" + "bytes" + "errors" "fmt" "io" "os" + "sync" + "sync/atomic" "testing" + "time" "github.com/vbatts/tar-split/tar/storage" ) @@ -66,3 +71,357 @@ func TestLargeJunkPadding(t *testing.T) { // At this point, if we haven't crashed then we are not vulnerable to // CVE-2017-14992. } + +// Mocked Packer storing entries and returning an error on demand. +type recordingPacker struct { + mu sync.Mutex + entries []storage.Entry + errAt int + err error + callNum int +} + +func (p *recordingPacker) AddEntry(e storage.Entry) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + // Return an aritifical error if we are instructed to do so. + p.callNum++ + if p.errAt > 0 && p.callNum == p.errAt { + if p.err == nil { + p.err = errors.New("packer error") + } + return 0, p.err + } + + // Copy payload because callers may reuse buffers. + cp := e + if e.Payload != nil { + cp.Payload = append([]byte(nil), e.Payload...) + } + p.entries = append(p.entries, cp) + return len(cp.Payload), nil +} + +func (p *recordingPacker) snapshot() []storage.Entry { + p.mu.Lock() + defer p.mu.Unlock() + out := make([]storage.Entry, len(p.entries)) + copy(out, p.entries) + return out +} + +// Mocked FilePutter +type recordingFilePutter struct { + mu sync.Mutex + puts []string +} + +func (fp *recordingFilePutter) Put(name string, r io.Reader) (int64, []byte, error) { + b, err := io.ReadAll(r) + if err != nil { + return 0, nil, err + } + fp.mu.Lock() + fp.puts = append(fp.puts, name) + fp.mu.Unlock() + + // Return a deterministic "checksum" based on content length. + csum := []byte(fmt.Sprintf("len=%d", len(b))) + return int64(len(b)), csum, nil +} + +// Helper function to generate the tar with optional extra padding. +func makeTarWithExtraPadding(t *testing.T, name string, content []byte, extraPadding int) []byte { + t.Helper() + + var buf bytes.Buffer + tw := tar.NewWriter(&buf) + + hdr := &tar.Header{ + Name: name, + Mode: 0o644, + Size: int64(len(content)), + } + if err := tw.WriteHeader(hdr); err != nil { + t.Fatalf("WriteHeader: %v", err) + } + if _, err := tw.Write(content); err != nil { + t.Fatalf("Write: %v", err) + } + if err := tw.Close(); err != nil { + t.Fatalf("Close tar writer: %v", err) + } + + out := buf.Bytes() + if extraPadding > 0 { + out = append(append([]byte(nil), out...), make([]byte, extraPadding)...) + } + return out +} + +// Helper function to wait until "done" for specific time. +func waitDone(t *testing.T, done <-chan error) error { + t.Helper() + select { + case err := <-done: + return err + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for done") + return errors.New("timeout") + } +} + +// closableBlockingReader simulates an io.Reader that can be "closed" while a Read is +// blocked. +// +// Behavior: +// - It serves bytes from data. +// - After it has served at least blockAfter bytes, the next Read blocks until either: +// - Unblock() is called, or +// - Close() is called (which also unblocks) and subsequent reads fail with errUnderlyingClosed. +type closableBlockingReader struct { + data []byte + pos int + blockAfter int + + closed atomic.Bool + + blockOnce sync.Once + blockCh chan struct{} // used to block/unblock reads + blockedCh chan struct{} // closed when we start blocking +} + +var errUnderlyingClosed = errors.New("underlying reader closed") + +func newClosableBlockingReader(data []byte, blockAfter int) *closableBlockingReader { + return &closableBlockingReader{ + data: data, + blockAfter: blockAfter, + blockCh: make(chan struct{}), + blockedCh: make(chan struct{}), + } +} + +func (r *closableBlockingReader) Read(p []byte) (int, error) { + if r.closed.Load() { + return 0, errUnderlyingClosed + } + if r.pos >= len(r.data) { + return 0, io.EOF + } + + // If we've reached the point where we should block, block before producing + // more data (simulates "reader got closed while goroutine is still running"). + if r.pos >= r.blockAfter { + r.blockOnce.Do(func() { close(r.blockedCh) }) + <-r.blockCh + if r.closed.Load() { + return 0, errUnderlyingClosed + } + } + + n := copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +func (r *closableBlockingReader) Close() error { + r.closed.Store(true) + // ensure blocked goroutine wakes up + select { + case <-r.blockCh: + // already closed/unblocked + default: + close(r.blockCh) + } + // signal blocked state even if we closed early + r.blockOnce.Do(func() { close(r.blockedCh) }) + return nil +} + +func (r *closableBlockingReader) Unblock() { + select { + case <-r.blockCh: + default: + close(r.blockCh) + } +} + +// Test that NewInputTarStreamWithDone signals done when we read everything. +func TestNewInputTarStreamWithDone(t *testing.T) { + input := makeTarWithExtraPadding(t, "file.txt", []byte("hello"), 4096) + + p := &recordingPacker{} + fp := &recordingFilePutter{} + + payload, done, err := NewInputTarStreamWithDone(bytes.NewReader(input), p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + defer payload.Close() + + got, rerr := io.ReadAll(payload) + if rerr != nil { + t.Fatalf("ReadAll(payload): %v", rerr) + } + if !bytes.Equal(got, input) { + t.Fatalf("payload bytes differ: got=%d bytes, want=%d bytes", len(got), len(input)) + } + + if derr := waitDone(t, done); derr != nil { + t.Fatalf("done returned error: %v", derr) + } + + entries := p.snapshot() + if len(entries) == 0 { + t.Fatalf("expected entries to be recorded") + } + + var ( + foundFile bool + foundSegment bool + ) + for _, e := range entries { + switch e.Type { + case storage.FileType: + foundFile = true + // We set size to len("hello") + if e.Size != int64(len("hello")) { + t.Fatalf("file entry size=%d, want=%d", e.Size, len("hello")) + } + case storage.SegmentType: + if len(e.Payload) > 0 { + foundSegment = true + } + } + } + if !foundFile { + t.Fatalf("expected at least one FileType entry") + } + if !foundSegment { + t.Fatalf("expected at least one SegmentType entry with payload") + } +} + +// Test that NewInputTarStreamWithDone works when underlying reader is closed while +// the NewInputTarStreamWithDone go-routine still runs. +func TestNewInputTarStreamWithDonUnderlyingClosed(t *testing.T) { + // Make a tar stream that is large enough that parsing won't finish in one tiny read. + input := makeTarWithExtraPadding(t, "file.txt", bytes.Repeat([]byte("A"), 64*1024), 0) + + // Block the underlying reader after it has produced some bytes. + // This ensures the tar-split goroutine will be mid-flight and will need more data. + under := newClosableBlockingReader(input, 4096) + + p := &recordingPacker{} + fp := storage.NewDiscardFilePutter() + + payload, done, err := NewInputTarStreamWithDone(under, p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + defer payload.Close() + + // Start draining payload in a separate goroutine so the internal goroutine is forced to read. + readErrCh := make(chan error, 1) + go func() { + _, rerr := io.ReadAll(payload) + readErrCh <- rerr + }() + + // Wait until the underlying reader starts blocking (i.e., internal goroutine progressed + // far enough to need more bytes). + select { + case <-under.blockedCh: + // good + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for underlying reader to enter blocked state") + } + + // Now "close" the underlying reader while the tar-split goroutine is still running. + under.Close() + + // The tar-split goroutine should treat this as a non-EOF error, call fail(err), + // CloseWithError on the pipe, and signal done with the same error. + derr := waitDone(t, done) + if derr == nil { + t.Fatalf("expected done error, got nil") + } + if !errors.Is(derr, errUnderlyingClosed) { + t.Fatalf("done error=%v, want errors.Is(..., errUnderlyingClosed)=true", derr) + } + + // The consumer side should also observe an error (from the pipe). + select { + case rerr := <-readErrCh: + if rerr == nil { + t.Fatalf("expected reader error, got nil") + } + if !errors.Is(rerr, errUnderlyingClosed) { + t.Fatalf("reader error=%v, want errors.Is(..., errUnderlyingClosed)=true", rerr) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for payload read to finish") + } +} + +// Test that if the caller closes the returned reader without draining it, +// the background goroutine terminates promptly and the done channel is signaled. +func TestNewInputTarStreamWithDoneEarlyClose(t *testing.T) { + input := makeTarWithExtraPadding(t, "file.txt", []byte("hello"), 2048) + + p := &recordingPacker{} + fp := &recordingFilePutter{} + + payload, done, err := NewInputTarStreamWithDone(bytes.NewReader(input), p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + + // Close immediately without draining: this should abort the pipe and allow + // the goroutine to exit (rather than hanging). + if err := payload.Close(); err != nil { + t.Fatalf("payload.Close(): %v", err) + } + + // The goroutine should terminate and signal done (likely with a non-nil error + // due to the induced abort). + derr := waitDone(t, done) + if !errors.Is(derr, io.ErrClosedPipe) { + t.Fatalf("done error=%v, want io.ErrClosedPipe", derr) + } +} + +// Test that Packer error propagates to waitDone(). +func TestNewInputTarStreamWithDonePackerError(t *testing.T) { + input := makeTarWithExtraPadding(t, "file.txt", []byte("hello"), 0) + + packerErr := errors.New("boom") + p := &recordingPacker{errAt: 2, err: packerErr} // fail early during AddEntry + fp := &recordingFilePutter{} + + payload, done, err := NewInputTarStreamWithDone(bytes.NewReader(input), p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + defer payload.Close() + + // Reading should eventually return the packer error via CloseWithError. + _, rerr := io.ReadAll(payload) + if rerr == nil { + t.Fatalf("expected reader error, got nil") + } + // The error returned from an io.PipeReader may wrap; check with errors.Is. + if !errors.Is(rerr, packerErr) { + t.Fatalf("reader error=%v, want errors.Is(...,%v)=true", rerr, packerErr) + } + + derr := waitDone(t, done) + if derr == nil { + t.Fatalf("expected done error, got nil") + } + if !errors.Is(derr, packerErr) { + t.Fatalf("done error=%v, want errors.Is(...,%v)=true", derr, packerErr) + } +} From 2237f5ae8f03d6f772cd4eb43fc968f73a6155cb Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Wed, 18 Feb 2026 10:57:58 +0100 Subject: [PATCH 2/3] tar/asm: simplify done handling Refactor `newInputTarStreamCommon` to accept an optional `done chan<- error` instead of a `withDone` bool. Clarify documentation to state that callers must fully read the returned reader. Signed-off-by: Jan Kaluza --- tar/asm/disassemble.go | 29 ++++++++++++++--------------- tar/asm/disassemble_test.go | 14 ++++++++------ 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/tar/asm/disassemble.go b/tar/asm/disassemble.go index 727c8ef..a17b6ea 100644 --- a/tar/asm/disassemble.go +++ b/tar/asm/disassemble.go @@ -161,8 +161,8 @@ func newInputTarStreamCommon( r io.Reader, p storage.Packer, fp storage.FilePutter, - withDone bool, -) (pr *io.PipeReader, done <-chan error) { + done chan<- error, +) (pr *io.PipeReader) { // What to do here... folks will want their own access to the Reader that is // their tar archive stream, but we'll need that same stream to use our // forked 'archive/tar'. @@ -181,22 +181,14 @@ func newInputTarStreamCommon( // `archive/tar` doesn't care. pr, pw := io.Pipe() - // We need a putter that will generate the crc64 sums of file payloads. if fp == nil { fp = storage.NewDiscardFilePutter() } outputRdr := io.TeeReader(r, pw) + go runInputTarStreamGoroutine(outputRdr, pw, p, fp, done) - if withDone { - ch := make(chan error, 1) - done = ch - go runInputTarStreamGoroutine(outputRdr, pw, p, fp, ch) - return pr, done - } - - go runInputTarStreamGoroutine(outputRdr, pw, p, fp, nil) - return pr, nil + return pr } // NewInputTarStream wraps the Reader stream of a tar archive and provides a @@ -213,9 +205,11 @@ func newInputTarStreamCommon( // If callers need to be able to abort early and/or wait for goroutine termination, // prefer NewInputTarStreamWithDone. // -// Deprecated: Use NewInputTarStreamWithDone instead. +// Deprecated: This leaves a goroutine around if the consumer aborts without consuming +// the whole stream, and does not allow the caller to know when r is safe to deallocate +// or when p has written everything. Use NewInputTarStreamWithDone instead. func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { - pr, _ := newInputTarStreamCommon(r, p, fp, false) + pr := newInputTarStreamCommon(r, p, fp, nil) return pr, nil } @@ -232,7 +226,12 @@ func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io // The returned reader is an io.ReadCloser so callers can stop early; closing it // aborts the pipe so the internal goroutine can terminate promptly (rather than // hanging on a blocked pipe write). +// +// The caller is expected to consume the returned reader fully until EOF +// (not just the tar EOF marker); closing the returned reader earlier will +// cause the done channel to return a failure. func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) { - pr, done := newInputTarStreamCommon(r, p, fp, true) + done := make(chan error, 1) + pr := newInputTarStreamCommon(r, p, fp, done) return pr, done, nil } diff --git a/tar/asm/disassemble_test.go b/tar/asm/disassemble_test.go index 2d3bcb0..51f7cfc 100644 --- a/tar/asm/disassemble_test.go +++ b/tar/asm/disassemble_test.go @@ -97,10 +97,10 @@ func (p *recordingPacker) AddEntry(e storage.Entry) (int, error) { // Copy payload because callers may reuse buffers. cp := e if e.Payload != nil { - cp.Payload = append([]byte(nil), e.Payload...) + cp.Payload = bytes.Clone(e.Payload) } p.entries = append(p.entries, cp) - return len(cp.Payload), nil + return len(p.entries), nil } func (p *recordingPacker) snapshot() []storage.Entry { @@ -118,17 +118,18 @@ type recordingFilePutter struct { } func (fp *recordingFilePutter) Put(name string, r io.Reader) (int64, []byte, error) { - b, err := io.ReadAll(r) + dataLen, err := io.Copy(io.Discard, r) if err != nil { return 0, nil, err } + fp.mu.Lock() fp.puts = append(fp.puts, name) fp.mu.Unlock() // Return a deterministic "checksum" based on content length. - csum := []byte(fmt.Sprintf("len=%d", len(b))) - return int64(len(b)), csum, nil + csum := []byte(fmt.Sprintf("len=%d", dataLen)) + return dataLen, csum, nil } // Helper function to generate the tar with optional extra padding. @@ -339,7 +340,8 @@ func TestNewInputTarStreamWithDonUnderlyingClosed(t *testing.T) { t.Fatalf("timeout waiting for underlying reader to enter blocked state") } - // Now "close" the underlying reader while the tar-split goroutine is still running. + // Now trigger an error from the underlying closableBlockingReader while the tar-split + // goroutine is still running. under.Close() // The tar-split goroutine should treat this as a non-EOF error, call fail(err), From a293733cf6e544c9df0eaf2cb0c0d2b705e12245 Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Wed, 18 Mar 2026 15:05:45 +0100 Subject: [PATCH 3/3] Use NewInputTarStreamWithDone everywhere. Signed-off-by: Jan Kaluza --- cmd/tar-split/checksize.go | 6 +++++- cmd/tar-split/disasm.go | 6 +++++- tar/asm/assemble_test.go | 12 ++++++++++-- tar/asm/iterate.go | 2 +- tar/asm/iterate_test.go | 4 +++- 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/cmd/tar-split/checksize.go b/cmd/tar-split/checksize.go index 0343682..49c2924 100644 --- a/cmd/tar-split/checksize.go +++ b/cmd/tar-split/checksize.go @@ -43,10 +43,11 @@ func CommandChecksize(c *cli.Context) { sp := storage.NewJSONPacker(packFh) fp := storage.NewDiscardFilePutter() - dissam, err := asm.NewInputTarStream(fh, sp, fp) + dissam, done, err := asm.NewInputTarStreamWithDone(fh, sp, fp) if err != nil { log.Fatal(err) } + defer dissam.Close() var num int tr := tar.NewReader(dissam) @@ -64,6 +65,9 @@ func CommandChecksize(c *cli.Context) { } } fmt.Printf(" -- number of files: %d\n", num) + if derr := <-done; derr != nil { + log.Fatal(derr) + } if err := packFh.Sync(); err != nil { log.Fatal(err) diff --git a/cmd/tar-split/disasm.go b/cmd/tar-split/disasm.go index 3c8b5b8..7bb49f7 100644 --- a/cmd/tar-split/disasm.go +++ b/cmd/tar-split/disasm.go @@ -44,10 +44,11 @@ func CommandDisasm(c *cli.Context) { // we're passing nil here for the file putter, because the ApplyDiff will // handle the extraction of the archive - its, err := asm.NewInputTarStream(inputStream, metaPacker, nil) + its, done, err := asm.NewInputTarStreamWithDone(inputStream, metaPacker, nil) if err != nil { logrus.Fatal(err) } + defer its.Close() var out io.Writer if c.Bool("no-stdout") { out = io.Discard @@ -58,5 +59,8 @@ func CommandDisasm(c *cli.Context) { if err != nil { logrus.Fatal(err) } + if derr := <-done; derr != nil { + logrus.Fatal(derr) + } logrus.Infof("created %s from %s (read %d bytes)", c.String("output"), c.Args()[0], i) } diff --git a/tar/asm/assemble_test.go b/tar/asm/assemble_test.go index cfbcca6..caf1f4e 100644 --- a/tar/asm/assemble_test.go +++ b/tar/asm/assemble_test.go @@ -163,10 +163,11 @@ func TestTarStream(t *testing.T) { fgp := storage.NewBufferFileGetPutter() // wrap the disassembly stream - tarStream, err := NewInputTarStream(gzRdr, sp, fgp) + tarStream, done, err := NewInputTarStreamWithDone(gzRdr, sp, fgp) if err != nil { t.Fatal(err) } + defer tarStream.Close() // get a sum of the stream after it has passed through to ensure it's the same. h0 := sha1.New() @@ -174,6 +175,9 @@ func TestTarStream(t *testing.T) { if err != nil { t.Fatal(err) } + if derr := <-done; derr != nil { + t.Fatal(derr) + } if i != tc.expectedSize { t.Errorf("size of tar: expected %d; got %d", tc.expectedSize, i) @@ -227,15 +231,19 @@ func BenchmarkAsm(b *testing.B) { fgp := storage.NewBufferFileGetPutter() // wrap the disassembly stream - tarStream, err := NewInputTarStream(gzRdr, sp, fgp) + tarStream, done, err := NewInputTarStreamWithDone(gzRdr, sp, fgp) if err != nil { b.Fatal(err) } + defer tarStream.Close() // read it all to the bit bucket i1, err := io.Copy(io.Discard, tarStream) if err != nil { b.Fatal(err) } + if derr := <-done; derr != nil { + b.Fatal(derr) + } r := bytes.NewBuffer(w.Bytes()) sup := storage.NewJSONUnpacker(r) diff --git a/tar/asm/iterate.go b/tar/asm/iterate.go index 8a65887..9db3ab5 100644 --- a/tar/asm/iterate.go +++ b/tar/asm/iterate.go @@ -11,7 +11,7 @@ import ( // IterateHeaders calls handler for each tar header provided by Unpacker func IterateHeaders(unpacker storage.Unpacker, handler func(hdr *tar.Header) error) error { - // We assume about NewInputTarStream: + // We assume about NewInputTarStreamWithDone: // - There is a separate SegmentType entry for every tar header, but only one SegmentType entry for the full header incl. any extensions // - (There is a FileType entry for every tar header, we ignore it) // - Trailing padding of a file, if any, is included in the next SegmentType entry diff --git a/tar/asm/iterate_test.go b/tar/asm/iterate_test.go index 884c019..1d10f07 100644 --- a/tar/asm/iterate_test.go +++ b/tar/asm/iterate_test.go @@ -75,10 +75,12 @@ func TestIterateHeaders(t *testing.T) { require.NoError(t, err) var tarSplit bytes.Buffer - tsReader, err := NewInputTarStream(&tarball, storage.NewJSONPacker(&tarSplit), storage.NewDiscardFilePutter()) + tsReader, done, err := NewInputTarStreamWithDone(&tarball, storage.NewJSONPacker(&tarSplit), storage.NewDiscardFilePutter()) require.NoError(t, err) + defer tsReader.Close() _, err = io.Copy(io.Discard, tsReader) require.NoError(t, err) + require.NoError(t, <-done) unpacker := storage.NewJSONUnpacker(&tarSplit) var actual []tar.Header