diff --git a/image/copy/blob.go b/image/copy/blob.go index 9db6338d75..c69627c0ab 100644 --- a/image/copy/blob.go +++ b/image/copy/blob.go @@ -19,6 +19,7 @@ import ( func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo, getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool, + reporter progressReporter, ) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers; // that pipeline is built by updating stream. @@ -84,16 +85,11 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read return types.BlobInfo{}, err } - // === Report progress using the ic.c.options.Progress channel, if required. + // === Report progress using the reporter, if required. if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { - progressReader := newProgressReader( - stream.reader, - ic.c.options.Progress, - ic.c.options.ProgressInterval, - srcInfo, - ) - defer progressReader.reportDone() - stream.reader = progressReader + // Note: the reporter can be a no-op if the condition above evaluates + // false and in that case there's no reason to wrap the reader here. + stream.reader = newProgressReader(stream.reader, reporter) } // === Finally, send the layer stream to dest. diff --git a/image/copy/progress.go b/image/copy/progress.go new file mode 100644 index 0000000000..150f9dea48 --- /dev/null +++ b/image/copy/progress.go @@ -0,0 +1,44 @@ +package copy + +import ( + "context" + "io" + "math" + "time" + + "go.podman.io/image/v5/internal/private" + "go.podman.io/image/v5/types" +) + +// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar +// and optionally *progressReporter (if non-nil) with the number of received bytes. +type blobChunkAccessorProxy struct { + wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor + bar *progressBar // A progress bar updated with the number of bytes read so far + reporter progressReporter // A progress reporter updated with the number of bytes read so far +} + +// GetBlobAt returns a sequential channel of readers that contain data for the requested +// blob chunks, and a channel that might get a single error value. +// The specified chunks must be not overlapping and sorted by their offset. +// The readers must be fully consumed, in the order they are returned, before blocking +// to read the next chunk. +// If the Length for the last chunk is set to math.MaxUint64, then it +// fully fetches the remaining data from the offset to the end of the blob. +func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + start := time.Now() + rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) + if err == nil { + total := int64(0) + for _, c := range chunks { + // do not update the progress bar if there is a chunk with unknown length. + if c.Length == math.MaxUint64 { + return rc, errs, err + } + total += int64(c.Length) + } + s.reporter.reportRead(uint64(total)) + s.bar.EwmaIncrInt64(total, time.Since(start)) + } + return rc, errs, err +} diff --git a/image/copy/progress_bars.go b/image/copy/progress_bars.go index 5336346825..ed95896bf5 100644 --- a/image/copy/progress_bars.go +++ b/image/copy/progress_bars.go @@ -1,15 +1,11 @@ package copy import ( - "context" "fmt" "io" - "math" - "time" "github.com/vbauerster/mpb/v8" "github.com/vbauerster/mpb/v8/decor" - "go.podman.io/image/v5/internal/private" "go.podman.io/image/v5/types" ) @@ -144,34 +140,3 @@ func (bar *progressBar) mark100PercentComplete() { bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete. } } - -// blobChunkAccessorProxy wraps a BlobChunkAccessor and updates a *progressBar -// with the number of received bytes. -type blobChunkAccessorProxy struct { - wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor - bar *progressBar // A progress bar updated with the number of bytes read so far -} - -// GetBlobAt returns a sequential channel of readers that contain data for the requested -// blob chunks, and a channel that might get a single error value. -// The specified chunks must be not overlapping and sorted by their offset. -// The readers must be fully consumed, in the order they are returned, before blocking -// to read the next chunk. -// If the Length for the last chunk is set to math.MaxUint64, then it -// fully fetches the remaining data from the offset to the end of the blob. -func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { - start := time.Now() - rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) - if err == nil { - total := int64(0) - for _, c := range chunks { - // do not update the progress bar if there is a chunk with unknown length. - if c.Length == math.MaxUint64 { - return rc, errs, err - } - total += int64(c.Length) - } - s.bar.EwmaIncrInt64(total, time.Since(start)) - } - return rc, errs, err -} diff --git a/image/copy/progress_channel.go b/image/copy/progress_channel.go index f57646156e..b49810b207 100644 --- a/image/copy/progress_channel.go +++ b/image/copy/progress_channel.go @@ -7,73 +7,118 @@ import ( "go.podman.io/image/v5/types" ) -// progressReader is a reader that reports its progress to a types.ProgressProperties channel on an interval. -type progressReader struct { - source io.Reader - channel chan<- types.ProgressProperties - interval time.Duration - artifact types.BlobInfo - lastUpdate time.Time - offset uint64 - offsetUpdate uint64 +// progressReporter is an interface for reporting progress about a single blob. +type progressReporter interface { + reportRead(bytesRead uint64) + reportDone() + reset() } -// newProgressReader creates a new progress reader for: -// `source`: The source when internally reading bytes -// `channel`: The reporter channel to which the progress will be sent -// `interval`: The update interval to indicate how often the progress should update -// `artifact`: The blob metadata which is currently being progressed -func newProgressReader( - source io.Reader, +// noopProgressReporter is a no-op implementation of progressReporter. +type noopProgressReporter struct{} + +func (r *noopProgressReporter) reportRead(uint64) {} +func (r *noopProgressReporter) reportDone() {} +func (r *noopProgressReporter) reset() {} + +// channelProgressReporter reports progress about a single blob to a +// types.ProgressProperties channel and supports re-starting from zero +// without reporting the progress through the channel unless +// it's higher than the offset reached before the restart to +// avoid confusing behavior in consumers of the events +// (skipping back). +type channelProgressReporter struct { + channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent + interval time.Duration // The update interval to indicate how often the progress should update + artifact types.BlobInfo // The blob metadata which is currently being progressed + lastUpdate time.Time // The last time a progress channel event was sent + offset uint64 // The currently downloaded size in bytes + maxReportedOffset uint64 // The high-water mark for offset already sent to the channel +} + +// newChannelProgressReporter creates a new progress reporter +// and immediately reports a new artifact event. +func newChannelProgressReporter( channel chan<- types.ProgressProperties, interval time.Duration, artifact types.BlobInfo, -) *progressReader { - // The progress reader constructor informs the progress channel - // that a new artifact will be read +) progressReporter { channel <- types.ProgressProperties{ Event: types.ProgressEventNewArtifact, Artifact: artifact, } - return &progressReader{ - source: source, - channel: channel, - interval: interval, - artifact: artifact, - lastUpdate: time.Now(), - offset: 0, - offsetUpdate: 0, + return &channelProgressReporter{ + channel: channel, + interval: interval, + artifact: artifact, + lastUpdate: time.Now(), + offset: 0, + maxReportedOffset: 0, } } -// reportDone indicates to the internal channel that the progress has been -// finished -func (r *progressReader) reportDone() { - r.channel <- types.ProgressProperties{ - Event: types.ProgressEventDone, - Artifact: r.artifact, - Offset: r.offset, - OffsetUpdate: r.offsetUpdate, - } +// reset resets the reporter's progress. +// +// It's meant to be used on error when +// the processing has to be re-started +// (e.g. ErrFallbackToOrdinaryLayerDownload). +func (r *channelProgressReporter) reset() { + r.offset = 0 } -// Read continuously reads bytes into the progress reader and reports the -// status via the internal channel -func (r *progressReader) Read(p []byte) (int, error) { - n, err := r.source.Read(p) - r.offset += uint64(n) - r.offsetUpdate += uint64(n) - - // Fire the progress reader in the provided interval - if time.Since(r.lastUpdate) > r.interval { +// reportRead reports progress with the number of `bytesRead` +// while keeping track of a current high-water mark in case +// of reset(). It never skips back below the already reported +// offset and does not report the progress unless +// the configured `interval` elapses. +func (r *channelProgressReporter) reportRead(bytesRead uint64) { + r.offset += bytesRead + if r.offset > r.maxReportedOffset && time.Since(r.lastUpdate) > r.interval { r.channel <- types.ProgressProperties{ Event: types.ProgressEventRead, Artifact: r.artifact, Offset: r.offset, - OffsetUpdate: r.offsetUpdate, + OffsetUpdate: r.offset - r.maxReportedOffset, } + r.maxReportedOffset = r.offset r.lastUpdate = time.Now() - r.offsetUpdate = 0 } +} + +// reportDone reports successful completion. +func (r *channelProgressReporter) reportDone() { + offset := max(r.offset, r.maxReportedOffset) + r.channel <- types.ProgressProperties{ + Event: types.ProgressEventDone, + Artifact: r.artifact, + Offset: offset, + OffsetUpdate: offset - r.maxReportedOffset, + } +} + +// progressReader extends a wrapped io.Reader +// with additional reporting of its progress. +type progressReader struct { + source io.Reader + progressReporter +} + +// newProgressReader creates a new progress reader that wraps source +// and reports progress through the given reporter. +func newProgressReader( + source io.Reader, + reporter progressReporter, +) *progressReader { + return &progressReader{ + source: source, + progressReporter: reporter, + } +} + +// Read continuously reads bytes into the progress reader and reports the +// status via the internal channel. +func (r *progressReader) Read(p []byte) (int, error) { + n, err := r.source.Read(p) + r.reportRead(uint64(n)) return n, err } diff --git a/image/copy/progress_channel_test.go b/image/copy/progress_channel_test.go index 6b4bb16863..11f71b581e 100644 --- a/image/copy/progress_channel_test.go +++ b/image/copy/progress_channel_test.go @@ -4,12 +4,180 @@ import ( "bytes" "io" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" "go.podman.io/image/v5/types" ) +// consumeNonblocking drains all currently buffered events +// from a channel and returns them. It never blocks. +func consumeNonblocking(channel <-chan types.ProgressProperties) []types.ProgressProperties { + var result []types.ProgressProperties + for { + select { + case p := <-channel: + result = append(result, p) + default: + return result + } + } +} + +// TestNewProgressReporter verifies that constructing a reporter +// signals a new artifact event. +func TestNewProgressReporter(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + + r := newChannelProgressReporter(channel, time.Second, artifact) + assert.NotNil(t, r, "newChannelProgressReporter should return a progress reporter") + + // Verify only a new artifact event was received. + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventNewArtifact, + Artifact: artifact, + }}, events, "constructor should send exactly one new artifact event") +} + +// TestProgressReporterReportRead verifies that a read event is sent +// after the interval elapses and not before. +func TestProgressReporterReportRead(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + // Create a reporter and consume the new artifact event. + r := newChannelProgressReporter(channel, interval, artifact) + assert.NotEmpty(t, consumeNonblocking(channel), "should send new artifact event") + + // Verify that before the interval elapses: no event is sent. + r.reportRead(5) + assert.Empty(t, consumeNonblocking(channel), "should not send before interval") + + // Verify that after the interval: read event is sent. + time.Sleep(2 * interval) + r.reportRead(10) + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 15, + OffsetUpdate: 15, + }}, events, "should send a read event after interval elapses") + }) +} + +// TestProgressReporterReportDone verifies that a done event +// includes the accumulated offset and the not-yet-reported offset update. +func TestProgressReporterReportDone(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + // Create a reporter and consume the new artifact event. + r := newChannelProgressReporter(channel, interval, artifact) + assert.NotEmpty(t, consumeNonblocking(channel), "should send new artifact event") + + // Simulate progress with three reported reads. + const ( + read1 = 30 + read2 = 20 + read3 = 15 + ) + r.reportRead(read1) + assert.Empty(t, consumeNonblocking(channel), "should not send before interval") + time.Sleep(2 * interval) + r.reportRead(read2) + assert.NotEmpty(t, consumeNonblocking(channel), "should send after interval elapses") + r.reportRead(read3) + + // Verify that the done event includes the + // accumulated offset from all three reads + // and the OffsetUpdate is the final read + // that happened before the update interval + // elapsed. + r.reportDone() + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventDone, + Artifact: artifact, + Offset: read1 + read2 + read3, + OffsetUpdate: read3, + }}, events, "should send a done event with accumulated offsets") + }) +} + +// TestProgressReporterReset verifies that reset does not report +// negative progress and suppresses reports until the offset +// exceeds the previously reported high-water mark. +func TestProgressReporterReset(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + // Create a reporter and consume the new artifact event. + r := newChannelProgressReporter(channel, interval, artifact) + assert.NotEmpty(t, consumeNonblocking(channel), "should send new artifact event") + + // reportRead(10): no event before interval. + r.reportRead(10) + assert.Empty(t, consumeNonblocking(channel)) + + // After interval, reportRead(10): reports +20=20. + time.Sleep(2 * interval) + r.reportRead(10) + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 20, + OffsetUpdate: 20, + }}, events) + + // reportRead(10): no event (interval not elapsed). + r.reportRead(10) + assert.Empty(t, consumeNonblocking(channel)) + + // reset: no event sent. + r.reset() + assert.Empty(t, consumeNonblocking(channel), "reset should not send an event") + + // After interval, reportRead(15): nothing (15 < 20 already reported). + time.Sleep(2 * interval) + r.reportRead(15) + assert.Empty(t, consumeNonblocking(channel), "should not report below high-water mark") + + // After interval, reportRead(10): reports +5=25. + time.Sleep(2 * interval) + r.reportRead(10) + events = consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 25, + OffsetUpdate: 5, + }}, events) + }) +} + +// TestNoopProgressReporter verifies that +// a noopProgressReporter can be called without panicking. +func TestNoopProgressReporter(t *testing.T) { + r := &noopProgressReporter{} + + // Make sure that no progressReporter method panics. + r.reportRead(100) + r.reset() + r.reportRead(50) + r.reportDone() +} + func newSUT( t *testing.T, reader io.Reader, @@ -18,19 +186,17 @@ func newSUT( ) *progressReader { artifact := types.BlobInfo{Size: 10} - go func() { - res := <-channel - assert.Equal(t, res.Event, types.ProgressEventNewArtifact) - assert.Equal(t, res.Artifact, artifact) - }() - res := newProgressReader(reader, channel, duration, artifact) + reporter := newChannelProgressReporter(channel, duration, artifact) + res := <-channel + assert.Equal(t, res.Event, types.ProgressEventNewArtifact) + assert.Equal(t, res.Artifact, artifact) - return res + return newProgressReader(reader, reporter) } func TestNewProgressReader(t *testing.T) { // Given - channel := make(chan types.ProgressProperties) + channel := make(chan types.ProgressProperties, 1) sut := newSUT(t, nil, time.Second, channel) assert.NotNil(t, sut) @@ -44,7 +210,7 @@ func TestNewProgressReader(t *testing.T) { func TestReadWithoutEvent(t *testing.T) { // Given - channel := make(chan types.ProgressProperties) + channel := make(chan types.ProgressProperties, 1) reader := bytes.NewReader([]byte{0, 1, 2}) sut := newSUT(t, reader, time.Second, channel) assert.NotNil(t, sut) @@ -60,7 +226,7 @@ func TestReadWithoutEvent(t *testing.T) { func TestReadWithEvent(t *testing.T) { // Given - channel := make(chan types.ProgressProperties) + channel := make(chan types.ProgressProperties, 1) reader := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6}) sut := newSUT(t, reader, time.Nanosecond, channel) assert.NotNil(t, sut) diff --git a/image/copy/single.go b/image/copy/single.go index 9f15c77e05..88ff9265d0 100644 --- a/image/copy/single.go +++ b/image/copy/single.go @@ -634,10 +634,15 @@ func (ic *imageCopier) copyConfig(ctx context.Context, src types.Image) error { return types.BlobInfo{}, fmt.Errorf("reading config blob %s: %w", srcInfo.Digest, err) } - destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false) + var reporter progressReporter = &noopProgressReporter{} + if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + reporter = newChannelProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) + } + destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false, reporter) if err != nil { return types.BlobInfo{}, err } + reporter.reportDone() bar.mark100PercentComplete() return destInfo, nil @@ -789,6 +794,10 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to // of the source file are not known yet and must be fetched. // Attempt a partial only when the source allows to retrieve a blob partially and // the destination has support for it. + var reporter progressReporter = &noopProgressReporter{} + if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + reporter = newChannelProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) + } if canAvoidProcessingCompleteLayer && ic.c.rawSource.SupportsGetBlobAt() && ic.c.dest.SupportsPutBlobPartial() { reused, blobInfo, err := func() (bool, types.BlobInfo, error) { // A scope for defer bar, err := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done") @@ -801,8 +810,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to }() proxy := blobChunkAccessorProxy{ - wrapped: ic.c.rawSource, - bar: bar, + wrapped: ic.c.rawSource, + bar: bar, + reporter: reporter, } uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, private.PutBlobPartialOptions{ Cache: ic.c.blobInfoCache, @@ -817,12 +827,17 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } bar.mark100PercentComplete() hideProgressBar = false + + reporter.reportDone() + logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest) return true, updatedBlobInfoFromUpload(srcInfo, uploadedBlob), nil } // On a "partial content not available" error, ignore it and retrieve the whole layer. var perr private.ErrFallbackToOrdinaryLayerDownload if errors.As(err, &perr) { + // Reset progress, the reporter is reused for the fallback. + reporter.reset() logrus.Debugf("Failed to retrieve partial blob: %v", err) return false, types.BlobInfo{}, nil } @@ -850,7 +865,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer, reporter) if err != nil { return types.BlobInfo{}, "", err } @@ -880,6 +895,8 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } } + // Only report completion on success. + reporter.reportDone() bar.mark100PercentComplete() return blobInfo, diffID, nil }() @@ -929,6 +946,7 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool, + reporter progressReporter, ) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -954,7 +972,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea } } - blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer) // Sets err to nil on success + blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer, reporter) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } diff --git a/image/types/types.go b/image/types/types.go index 1c0007e6e4..20a73cd7d6 100644 --- a/image/types/types.go +++ b/image/types/types.go @@ -707,7 +707,8 @@ type SystemContext struct { type ProgressEvent uint const ( - // ProgressEventNewArtifact will be fired on progress reader setup + // ProgressEventNewArtifact will be fired when starting to process + // a new artifact ProgressEventNewArtifact ProgressEvent = iota // ProgressEventRead indicates that the artifact download is currently in @@ -719,7 +720,7 @@ const ( ProgressEventDone // ProgressEventSkipped is fired when the artifact has been skipped because - // its already available at the destination + // it's already available at the destination ProgressEventSkipped )