From fdf8af4f5fff4d87f27c30c32eac0fa3f6f85a04 Mon Sep 17 00:00:00 2001 From: Marek Simek Date: Thu, 14 May 2026 14:24:43 +0200 Subject: [PATCH 1/5] image/copy: Fix missing progress reporting for chunked layers The `PutBlobPartial` code path only updated a progress bar, but it did not report its progress to the `copy.Options.Progress` channel for chunked layers. Add missing reporting and refactor parts shared with `progressReader`. Fixes: #469 Signed-off-by: Marek Simek --- image/copy/blob_chunk_accessor_proxy.go | 50 +++++++++++ image/copy/progress_bars.go | 35 -------- image/copy/progress_channel.go | 114 +++++++++++++----------- image/copy/single.go | 16 ++++ image/types/types.go | 5 +- 5 files changed, 133 insertions(+), 87 deletions(-) create mode 100644 image/copy/blob_chunk_accessor_proxy.go diff --git a/image/copy/blob_chunk_accessor_proxy.go b/image/copy/blob_chunk_accessor_proxy.go new file mode 100644 index 0000000000..14343abdc8 --- /dev/null +++ b/image/copy/blob_chunk_accessor_proxy.go @@ -0,0 +1,50 @@ +package copy + +import ( + "context" + "io" + "math" + "time" + + "go.podman.io/image/v5/internal/private" + "go.podman.io/image/v5/types" +) + +// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar +// and optionally *progressReporter (if non-nil) with the number of received bytes. +type blobChunkAccessorProxy struct { + wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor + bar *progressBar // A progress bar updated with the number of bytes read so far + reporter *progressReporter // A progress reporter updated with the number of bytes read so far +} + +// GetBlobAt returns a sequential channel of readers that contain data for the requested +// blob chunks, and a channel that might get a single error value. +// The specified chunks must be not overlapping and sorted by their offset. +// The readers must be fully consumed, in the order they are returned, before blocking +// to read the next chunk. +// If the Length for the last chunk is set to math.MaxUint64, then it +// fully fetches the remaining data from the offset to the end of the blob. +// +// blobChunkAccessorProxy.GetBlobAt also updates a *progressBar +// and *progressReporter (if non-nil) with the number of bytes read. +func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + start := time.Now() + rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) + if err == nil { + total := int64(0) + for _, c := range chunks { + // do not update the progress bar if there is a chunk with unknown length. + if c.Length == math.MaxUint64 { + return rc, errs, err + } + total += int64(c.Length) + } + // Report read bytes if possible. + if s.reporter != nil { + s.reporter.reportRead(uint64(total)) + } + s.bar.EwmaIncrInt64(total, time.Since(start)) + } + return rc, errs, err +} diff --git a/image/copy/progress_bars.go b/image/copy/progress_bars.go index 5336346825..ed95896bf5 100644 --- a/image/copy/progress_bars.go +++ b/image/copy/progress_bars.go @@ -1,15 +1,11 @@ package copy import ( - "context" "fmt" "io" - "math" - "time" "github.com/vbauerster/mpb/v8" "github.com/vbauerster/mpb/v8/decor" - "go.podman.io/image/v5/internal/private" "go.podman.io/image/v5/types" ) @@ -144,34 +140,3 @@ func (bar *progressBar) mark100PercentComplete() { bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete. } } - -// blobChunkAccessorProxy wraps a BlobChunkAccessor and updates a *progressBar -// with the number of received bytes. -type blobChunkAccessorProxy struct { - wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor - bar *progressBar // A progress bar updated with the number of bytes read so far -} - -// GetBlobAt returns a sequential channel of readers that contain data for the requested -// blob chunks, and a channel that might get a single error value. -// The specified chunks must be not overlapping and sorted by their offset. -// The readers must be fully consumed, in the order they are returned, before blocking -// to read the next chunk. -// If the Length for the last chunk is set to math.MaxUint64, then it -// fully fetches the remaining data from the offset to the end of the blob. -func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { - start := time.Now() - rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) - if err == nil { - total := int64(0) - for _, c := range chunks { - // do not update the progress bar if there is a chunk with unknown length. - if c.Length == math.MaxUint64 { - return rc, errs, err - } - total += int64(c.Length) - } - s.bar.EwmaIncrInt64(total, time.Since(start)) - } - return rc, errs, err -} diff --git a/image/copy/progress_channel.go b/image/copy/progress_channel.go index f57646156e..27262c5ee5 100644 --- a/image/copy/progress_channel.go +++ b/image/copy/progress_channel.go @@ -7,48 +7,45 @@ import ( "go.podman.io/image/v5/types" ) -// progressReader is a reader that reports its progress to a types.ProgressProperties channel on an interval. -type progressReader struct { - source io.Reader - channel chan<- types.ProgressProperties - interval time.Duration - artifact types.BlobInfo - lastUpdate time.Time - offset uint64 - offsetUpdate uint64 +// progressReporter facilitates progress reporting through its +// underlying types.ProgressProperties channel on an interval. +type progressReporter struct { + channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent + interval time.Duration // The update interval to indicate how often the progress should update + artifact types.BlobInfo // The blob metadata which is currently being progressed + lastUpdate time.Time // The last time a progress channel event was sent + offset uint64 // The currently downloaded size in bytes + offsetUpdate uint64 // The number of bytes downloaded within the last update interval } -// newProgressReader creates a new progress reader for: -// `source`: The source when internally reading bytes -// `channel`: The reporter channel to which the progress will be sent -// `interval`: The update interval to indicate how often the progress should update -// `artifact`: The blob metadata which is currently being progressed -func newProgressReader( - source io.Reader, - channel chan<- types.ProgressProperties, - interval time.Duration, - artifact types.BlobInfo, -) *progressReader { - // The progress reader constructor informs the progress channel - // that a new artifact will be read - channel <- types.ProgressProperties{ +// reportNewArtifact fires types.ProgressEventNewArtifact to its progress channel. +func (r *progressReporter) reportNewArtifact() { + r.channel <- types.ProgressProperties{ Event: types.ProgressEventNewArtifact, - Artifact: artifact, + Artifact: r.artifact, } - return &progressReader{ - source: source, - channel: channel, - interval: interval, - artifact: artifact, - lastUpdate: time.Now(), - offset: 0, - offsetUpdate: 0, + r.lastUpdate = time.Now() +} + +// reportRead fires the types.ProgressEventRead event with `bytesRead` +// to its progress channel. +func (r *progressReporter) reportRead(bytesRead uint64) { + r.offset += bytesRead + r.offsetUpdate += bytesRead + if time.Since(r.lastUpdate) > r.interval { + r.channel <- types.ProgressProperties{ + Event: types.ProgressEventRead, + Artifact: r.artifact, + Offset: r.offset, + OffsetUpdate: r.offsetUpdate, + } + r.lastUpdate = time.Now() + r.offsetUpdate = 0 } } -// reportDone indicates to the internal channel that the progress has been -// finished -func (r *progressReader) reportDone() { +// reportDone fires the ProgressEventDone to its progress channel. +func (r *progressReporter) reportDone() { r.channel <- types.ProgressProperties{ Event: types.ProgressEventDone, Artifact: r.artifact, @@ -57,23 +54,40 @@ func (r *progressReader) reportDone() { } } +// progressReader is an io.Reader that reports its progress to +// an underlying *progressReporter. +type progressReader struct { + source io.Reader + *progressReporter +} + +// newProgressReader creates a new progress reader for +// `source`: The source when internally reading bytes +// `channel`: The reporter channel to which the progress will be sent +// `interval`: The update interval to indicate how often the progress should update +// `artifact`: The blob metadata which is currently being progressed. +func newProgressReader( + source io.Reader, + channel chan<- types.ProgressProperties, + interval time.Duration, + artifact types.BlobInfo, +) *progressReader { + r := &progressReader{ + source: source, + progressReporter: &progressReporter{ + channel: channel, + interval: interval, + artifact: artifact, + }, + } + r.reportNewArtifact() + return r +} + // Read continuously reads bytes into the progress reader and reports the -// status via the internal channel +// status via the internal channel. func (r *progressReader) Read(p []byte) (int, error) { n, err := r.source.Read(p) - r.offset += uint64(n) - r.offsetUpdate += uint64(n) - - // Fire the progress reader in the provided interval - if time.Since(r.lastUpdate) > r.interval { - r.channel <- types.ProgressProperties{ - Event: types.ProgressEventRead, - Artifact: r.artifact, - Offset: r.offset, - OffsetUpdate: r.offsetUpdate, - } - r.lastUpdate = time.Now() - r.offsetUpdate = 0 - } + r.reportRead(uint64(n)) return n, err } diff --git a/image/copy/single.go b/image/copy/single.go index 9f15c77e05..22fb96784f 100644 --- a/image/copy/single.go +++ b/image/copy/single.go @@ -804,6 +804,16 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to wrapped: ic.c.rawSource, bar: bar, } + // Setup progress reporting and report a new artifact event + // if the channel available and a non-zero interval set. + if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + proxy.reporter = &progressReporter{ + channel: ic.c.options.Progress, + interval: ic.c.options.ProgressInterval, + artifact: srcInfo, + } + proxy.reporter.reportNewArtifact() + } uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, private.PutBlobPartialOptions{ Cache: ic.c.blobInfoCache, EmptyLayer: emptyLayer, @@ -817,6 +827,12 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } bar.mark100PercentComplete() hideProgressBar = false + + // Report completion for an artifact if possible. + if proxy.reporter != nil { + proxy.reporter.reportDone() + } + logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest) return true, updatedBlobInfoFromUpload(srcInfo, uploadedBlob), nil } diff --git a/image/types/types.go b/image/types/types.go index 1c0007e6e4..161df100fb 100644 --- a/image/types/types.go +++ b/image/types/types.go @@ -707,7 +707,8 @@ type SystemContext struct { type ProgressEvent uint const ( - // ProgressEventNewArtifact will be fired on progress reader setup + // ProgressEventNewArtifact will be fired when starting processing a new + // artifact ProgressEventNewArtifact ProgressEvent = iota // ProgressEventRead indicates that the artifact download is currently in @@ -719,7 +720,7 @@ const ( ProgressEventDone // ProgressEventSkipped is fired when the artifact has been skipped because - // its already available at the destination + // it's already available at the destination ProgressEventSkipped ) From 80883b6d86b96a265d3772ae3f22fad0d180654b Mon Sep 17 00:00:00 2001 From: Marek Simek Date: Wed, 20 May 2026 12:49:52 +0200 Subject: [PATCH 2/5] fixup! image/copy: Fix missing progress reporting for chunked layers Signed-off-by: Marek Simek --- image/copy/blob.go | 14 +- image/copy/blob_chunk_accessor_proxy.go | 50 -------- image/copy/{progress_bars.go => progress.go} | 39 ++++++ image/copy/progress_bars_test.go | 24 ---- image/copy/progress_channel.go | 70 ++++++---- image/copy/progress_channel_test.go | 18 ++- image/copy/progress_test.go | 128 +++++++++++++++++++ image/copy/single.go | 31 +++-- 8 files changed, 239 insertions(+), 135 deletions(-) delete mode 100644 image/copy/blob_chunk_accessor_proxy.go rename image/copy/{progress_bars.go => progress.go} (77%) delete mode 100644 image/copy/progress_bars_test.go create mode 100644 image/copy/progress_test.go diff --git a/image/copy/blob.go b/image/copy/blob.go index 9db6338d75..873eec1599 100644 --- a/image/copy/blob.go +++ b/image/copy/blob.go @@ -19,6 +19,7 @@ import ( func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo, getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool, + reporter *progressReporter, ) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers; // that pipeline is built by updating stream. @@ -84,16 +85,9 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read return types.BlobInfo{}, err } - // === Report progress using the ic.c.options.Progress channel, if required. - if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { - progressReader := newProgressReader( - stream.reader, - ic.c.options.Progress, - ic.c.options.ProgressInterval, - srcInfo, - ) - defer progressReader.reportDone() - stream.reader = progressReader + // === Wrap stream with progress reporting if a reporter was provided. + if reporter != nil { + stream.reader = newProgressReader(stream.reader, reporter) } // === Finally, send the layer stream to dest. diff --git a/image/copy/blob_chunk_accessor_proxy.go b/image/copy/blob_chunk_accessor_proxy.go deleted file mode 100644 index 14343abdc8..0000000000 --- a/image/copy/blob_chunk_accessor_proxy.go +++ /dev/null @@ -1,50 +0,0 @@ -package copy - -import ( - "context" - "io" - "math" - "time" - - "go.podman.io/image/v5/internal/private" - "go.podman.io/image/v5/types" -) - -// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar -// and optionally *progressReporter (if non-nil) with the number of received bytes. -type blobChunkAccessorProxy struct { - wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor - bar *progressBar // A progress bar updated with the number of bytes read so far - reporter *progressReporter // A progress reporter updated with the number of bytes read so far -} - -// GetBlobAt returns a sequential channel of readers that contain data for the requested -// blob chunks, and a channel that might get a single error value. -// The specified chunks must be not overlapping and sorted by their offset. -// The readers must be fully consumed, in the order they are returned, before blocking -// to read the next chunk. -// If the Length for the last chunk is set to math.MaxUint64, then it -// fully fetches the remaining data from the offset to the end of the blob. -// -// blobChunkAccessorProxy.GetBlobAt also updates a *progressBar -// and *progressReporter (if non-nil) with the number of bytes read. -func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { - start := time.Now() - rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) - if err == nil { - total := int64(0) - for _, c := range chunks { - // do not update the progress bar if there is a chunk with unknown length. - if c.Length == math.MaxUint64 { - return rc, errs, err - } - total += int64(c.Length) - } - // Report read bytes if possible. - if s.reporter != nil { - s.reporter.reportRead(uint64(total)) - } - s.bar.EwmaIncrInt64(total, time.Since(start)) - } - return rc, errs, err -} diff --git a/image/copy/progress_bars.go b/image/copy/progress.go similarity index 77% rename from image/copy/progress_bars.go rename to image/copy/progress.go index ed95896bf5..e377873bd3 100644 --- a/image/copy/progress_bars.go +++ b/image/copy/progress.go @@ -1,11 +1,15 @@ package copy import ( + "context" "fmt" "io" + "math" + "time" "github.com/vbauerster/mpb/v8" "github.com/vbauerster/mpb/v8/decor" + "go.podman.io/image/v5/internal/private" "go.podman.io/image/v5/types" ) @@ -140,3 +144,38 @@ func (bar *progressBar) mark100PercentComplete() { bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete. } } + +// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar +// and optionally *progressReporter (if non-nil) with the number of received bytes. +type blobChunkAccessorProxy struct { + wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor + bar *progressBar // A progress bar updated with the number of bytes read so far + reporter *progressReporter // A progress reporter updated with the number of bytes read so far +} + +// GetBlobAt returns a sequential channel of readers that contain data for the requested +// blob chunks, and a channel that might get a single error value. +// The specified chunks must be not overlapping and sorted by their offset. +// The readers must be fully consumed, in the order they are returned, before blocking +// to read the next chunk. +// If the Length for the last chunk is set to math.MaxUint64, then it +// fully fetches the remaining data from the offset to the end of the blob. +func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + start := time.Now() + rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) + if err == nil { + total := int64(0) + for _, c := range chunks { + // do not update the progress bar if there is a chunk with unknown length. + if c.Length == math.MaxUint64 { + return rc, errs, err + } + total += int64(c.Length) + } + if s.reporter != nil { + s.reporter.reportRead(uint64(total)) + } + s.bar.EwmaIncrInt64(total, time.Since(start)) + } + return rc, errs, err +} diff --git a/image/copy/progress_bars_test.go b/image/copy/progress_bars_test.go deleted file mode 100644 index 0c863cacce..0000000000 --- a/image/copy/progress_bars_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package copy - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/vbauerster/mpb/v8/decor" -) - -func TestCustomPartialBlobDecorFunc(t *testing.T) { - // A stub test - s := decor.Statistics{} - assert.Equal(t, "0.0b / 0.0b (skipped: 0.0b)", customPartialBlobDecorFunc(s)) - // Partial pull in progress - s = decor.Statistics{} - s.Current = 1097653 - s.Total = 8329917 - s.Refill = 509722 - assert.Equal(t, "1.0MiB / 7.9MiB (skipped: 497.8KiB = 6.12%)", customPartialBlobDecorFunc(s)) - // Almost complete, but no reuse - s.Current = int64(float64(s.Total) * 0.95) - s.Refill = 0 - assert.Equal(t, "7.5MiB / 7.9MiB", customPartialBlobDecorFunc(s)) -} diff --git a/image/copy/progress_channel.go b/image/copy/progress_channel.go index 27262c5ee5..0cc1cfa1aa 100644 --- a/image/copy/progress_channel.go +++ b/image/copy/progress_channel.go @@ -15,20 +15,47 @@ type progressReporter struct { artifact types.BlobInfo // The blob metadata which is currently being progressed lastUpdate time.Time // The last time a progress channel event was sent offset uint64 // The currently downloaded size in bytes - offsetUpdate uint64 // The number of bytes downloaded within the last update interval + offsetUpdate uint64 // The number of bytes downloaded since lastUpdate } -// reportNewArtifact fires types.ProgressEventNewArtifact to its progress channel. -func (r *progressReporter) reportNewArtifact() { - r.channel <- types.ProgressProperties{ +// newProgressReporter creates a new progress reporter +// and immediately reports a new artifact event. +func newProgressReporter( + channel chan<- types.ProgressProperties, + interval time.Duration, + artifact types.BlobInfo, +) *progressReporter { + channel <- types.ProgressProperties{ Event: types.ProgressEventNewArtifact, - Artifact: r.artifact, + Artifact: artifact, + } + return &progressReporter{ + channel: channel, + interval: interval, + artifact: artifact, + lastUpdate: time.Now(), + } +} + +// reset resets the reporters progress +// and reports its zeroed state. +// It's meant to be used on error when +// the processing has to be re-started +// (e.g. ErrFallbackToOrdinaryLayerDownload). +func (r *progressReporter) reset() { + r.offset = 0 + r.offsetUpdate = 0 + + r.channel <- types.ProgressProperties{ + Event: types.ProgressEventRead, + Artifact: r.artifact, + Offset: r.offset, + OffsetUpdate: r.offsetUpdate, } r.lastUpdate = time.Now() } -// reportRead fires the types.ProgressEventRead event with `bytesRead` -// to its progress channel. +// reportRead reports progress with the number of `bytesRead`. func (r *progressReporter) reportRead(bytesRead uint64) { r.offset += bytesRead r.offsetUpdate += bytesRead @@ -44,7 +71,7 @@ func (r *progressReporter) reportRead(bytesRead uint64) { } } -// reportDone fires the ProgressEventDone to its progress channel. +// reportDone reports completion. func (r *progressReporter) reportDone() { r.channel <- types.ProgressProperties{ Event: types.ProgressEventDone, @@ -54,34 +81,23 @@ func (r *progressReporter) reportDone() { } } -// progressReader is an io.Reader that reports its progress to -// an underlying *progressReporter. +// progressReader extends a wrapped io.Reader +// with additional reporting of its progress. type progressReader struct { source io.Reader *progressReporter } -// newProgressReader creates a new progress reader for -// `source`: The source when internally reading bytes -// `channel`: The reporter channel to which the progress will be sent -// `interval`: The update interval to indicate how often the progress should update -// `artifact`: The blob metadata which is currently being progressed. +// newProgressReader creates a new progress reader that wraps source +// and reports progress through the given reporter. func newProgressReader( source io.Reader, - channel chan<- types.ProgressProperties, - interval time.Duration, - artifact types.BlobInfo, + reporter *progressReporter, ) *progressReader { - r := &progressReader{ - source: source, - progressReporter: &progressReporter{ - channel: channel, - interval: interval, - artifact: artifact, - }, + return &progressReader{ + source: source, + progressReporter: reporter, } - r.reportNewArtifact() - return r } // Read continuously reads bytes into the progress reader and reports the diff --git a/image/copy/progress_channel_test.go b/image/copy/progress_channel_test.go index 6b4bb16863..4fdca01c98 100644 --- a/image/copy/progress_channel_test.go +++ b/image/copy/progress_channel_test.go @@ -18,19 +18,17 @@ func newSUT( ) *progressReader { artifact := types.BlobInfo{Size: 10} - go func() { - res := <-channel - assert.Equal(t, res.Event, types.ProgressEventNewArtifact) - assert.Equal(t, res.Artifact, artifact) - }() - res := newProgressReader(reader, channel, duration, artifact) + reporter := newProgressReporter(channel, duration, artifact) + res := <-channel + assert.Equal(t, res.Event, types.ProgressEventNewArtifact) + assert.Equal(t, res.Artifact, artifact) - return res + return newProgressReader(reader, reporter) } func TestNewProgressReader(t *testing.T) { // Given - channel := make(chan types.ProgressProperties) + channel := make(chan types.ProgressProperties, 1) sut := newSUT(t, nil, time.Second, channel) assert.NotNil(t, sut) @@ -44,7 +42,7 @@ func TestNewProgressReader(t *testing.T) { func TestReadWithoutEvent(t *testing.T) { // Given - channel := make(chan types.ProgressProperties) + channel := make(chan types.ProgressProperties, 1) reader := bytes.NewReader([]byte{0, 1, 2}) sut := newSUT(t, reader, time.Second, channel) assert.NotNil(t, sut) @@ -60,7 +58,7 @@ func TestReadWithoutEvent(t *testing.T) { func TestReadWithEvent(t *testing.T) { // Given - channel := make(chan types.ProgressProperties) + channel := make(chan types.ProgressProperties, 1) reader := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6}) sut := newSUT(t, reader, time.Nanosecond, channel) assert.NotNil(t, sut) diff --git a/image/copy/progress_test.go b/image/copy/progress_test.go new file mode 100644 index 0000000000..5e4cca81d1 --- /dev/null +++ b/image/copy/progress_test.go @@ -0,0 +1,128 @@ +package copy + +import ( + "testing" + "testing/synctest" + "time" + + "github.com/stretchr/testify/assert" + "github.com/vbauerster/mpb/v8/decor" + "go.podman.io/image/v5/types" +) + +// TestNewProgressReporter verifies that constructing a reporter +// signals a new artifact event. +func TestNewProgressReporter(t *testing.T) { + channel := make(chan types.ProgressProperties, 1) + artifact := types.BlobInfo{} + + r := newProgressReporter(channel, time.Second, artifact) + assert.NotNil(t, r) + assert.Equal(t, types.ProgressProperties{ + Event: types.ProgressEventNewArtifact, + Artifact: artifact, + }, <-channel, "constructor should send a new artifact event") +} + +// TestProgressReporterReportRead verifies that a read event is sent +// after the interval elapses and not before. +func TestProgressReporterReportRead(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 1) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + r := newProgressReporter(channel, interval, artifact) + <-channel + + // Before the interval: offset is accumulated, but no event was sent. + r.reportRead(5) + assert.Equal(t, uint64(5), r.offset, "offset should be accumulated") + assert.Equal(t, uint64(5), r.offsetUpdate, "offsetUpdate should be accumulated") + + // Verify that after the interval event was sent. + time.Sleep(2 * interval) + go func() { + r.reportRead(10) + }() + res := <-channel + assert.Equal(t, types.ProgressProperties{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 15, + OffsetUpdate: 15, + }, res, "should send a read event after interval elapses") + }) +} + +// TestProgressReporterReportDone verifies that a done event +// includes the accumulated offset. +func TestProgressReporterReportDone(t *testing.T) { + channel := make(chan types.ProgressProperties, 1) + artifact := types.BlobInfo{} + + r := newProgressReporter(channel, time.Hour, artifact) + <-channel + + // Simulate progress. + r.offset = 50 + r.offsetUpdate = 10 + + // Complete. + go func() { + r.reportDone() + }() + + // Verify that the done event was received. + res := <-channel + assert.Equal(t, types.ProgressProperties{ + Event: types.ProgressEventDone, + Artifact: artifact, + Offset: 50, + OffsetUpdate: 10, + }, res, "should send a done event with accumulated offsets") +} + +// TestProgressReporterReset verifies that reset zeroes the offsets and +// reports a read event. +func TestProgressReporterReset(t *testing.T) { + channel := make(chan types.ProgressProperties, 1) + artifact := types.BlobInfo{} + + r := newProgressReporter(channel, time.Hour, artifact) + <-channel + + // Simulate progress. + r.offset = 30 + r.offsetUpdate = 15 + + // Reset the reporter. + go func() { + r.reset() + }() + + // Verify that a read event was received with zero values. + res := <-channel + assert.Equal(t, types.ProgressProperties{ + Event: types.ProgressEventRead, + Artifact: artifact, + }, res, "should send a read event with zeroed offsets") + assert.Equal(t, uint64(0), r.offset, "offset should be zeroed after reset") + assert.Equal(t, uint64(0), r.offsetUpdate, "offsetUpdate should be zeroed after reset") +} + +func TestCustomPartialBlobDecorFunc(t *testing.T) { + // A stub test + s := decor.Statistics{} + assert.Equal(t, "0.0b / 0.0b (skipped: 0.0b)", customPartialBlobDecorFunc(s)) + // Partial pull in progress + s = decor.Statistics{} + s.Current = 1097653 + s.Total = 8329917 + s.Refill = 509722 + assert.Equal(t, "1.0MiB / 7.9MiB (skipped: 497.8KiB = 6.12%)", customPartialBlobDecorFunc(s)) + // Almost complete, but no reuse + s.Current = int64(float64(s.Total) * 0.95) + s.Refill = 0 + assert.Equal(t, "7.5MiB / 7.9MiB", customPartialBlobDecorFunc(s)) +} diff --git a/image/copy/single.go b/image/copy/single.go index 22fb96784f..6edec752a3 100644 --- a/image/copy/single.go +++ b/image/copy/single.go @@ -634,7 +634,7 @@ func (ic *imageCopier) copyConfig(ctx context.Context, src types.Image) error { return types.BlobInfo{}, fmt.Errorf("reading config blob %s: %w", srcInfo.Digest, err) } - destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false) + destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false, nil) if err != nil { return types.BlobInfo{}, err } @@ -789,6 +789,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to // of the source file are not known yet and must be fetched. // Attempt a partial only when the source allows to retrieve a blob partially and // the destination has support for it. + var reporter *progressReporter if canAvoidProcessingCompleteLayer && ic.c.rawSource.SupportsGetBlobAt() && ic.c.dest.SupportsPutBlobPartial() { reused, blobInfo, err := func() (bool, types.BlobInfo, error) { // A scope for defer bar, err := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done") @@ -804,15 +805,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to wrapped: ic.c.rawSource, bar: bar, } - // Setup progress reporting and report a new artifact event - // if the channel available and a non-zero interval set. if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { - proxy.reporter = &progressReporter{ - channel: ic.c.options.Progress, - interval: ic.c.options.ProgressInterval, - artifact: srcInfo, - } - proxy.reporter.reportNewArtifact() + reporter = newProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) + proxy.reporter = reporter } uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, private.PutBlobPartialOptions{ Cache: ic.c.blobInfoCache, @@ -828,9 +823,8 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to bar.mark100PercentComplete() hideProgressBar = false - // Report completion for an artifact if possible. - if proxy.reporter != nil { - proxy.reporter.reportDone() + if reporter != nil { + reporter.reportDone() } logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest) @@ -839,6 +833,10 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to // On a "partial content not available" error, ignore it and retrieve the whole layer. var perr private.ErrFallbackToOrdinaryLayerDownload if errors.As(err, &perr) { + // Reset progress, the reporter is reused for the fallback. + if reporter != nil { + reporter.reset() + } logrus.Debugf("Failed to retrieve partial blob: %v", err) return false, types.BlobInfo{}, nil } @@ -866,7 +864,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer, reporter) if err != nil { return types.BlobInfo{}, "", err } @@ -896,6 +894,10 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } } + // Only report completion on success. + if reporter != nil { + reporter.reportDone() + } bar.mark100PercentComplete() return blobInfo, diffID, nil }() @@ -945,6 +947,7 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool, + reporter *progressReporter, ) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -970,7 +973,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea } } - blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer) // Sets err to nil on success + blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer, reporter) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } From 1b83a413e3306b2ff432f23af3fe204099c0db09 Mon Sep 17 00:00:00 2001 From: Marek Simek Date: Wed, 20 May 2026 14:53:26 +0200 Subject: [PATCH 3/5] fixup! image/copy: Fix missing progress reporting for chunked layers Signed-off-by: Marek Simek --- image/copy/single.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/image/copy/single.go b/image/copy/single.go index 6edec752a3..44627b2022 100644 --- a/image/copy/single.go +++ b/image/copy/single.go @@ -864,6 +864,10 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() + // Create a reporter if not re-used on ErrFallbackToOrdinaryLayerDownload. + if reporter == nil && ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + reporter = newProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) + } blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer, reporter) if err != nil { return types.BlobInfo{}, "", err From a8e5f3a957abb3f9624202d32d9e8810dd9bf5a7 Mon Sep 17 00:00:00 2001 From: Marek Simek Date: Wed, 20 May 2026 14:55:20 +0200 Subject: [PATCH 4/5] fixup! fixup! image/copy: Fix missing progress reporting for chunked layers Signed-off-by: Marek Simek --- image/copy/single.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/image/copy/single.go b/image/copy/single.go index 44627b2022..11450e4d5d 100644 --- a/image/copy/single.go +++ b/image/copy/single.go @@ -864,7 +864,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() - // Create a reporter if not re-used on ErrFallbackToOrdinaryLayerDownload. + // Create a reporter if not reused on ErrFallbackToOrdinaryLayerDownload. if reporter == nil && ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { reporter = newProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) } From 1c7ef0eef67392316efea7cee852ba8ac0f96d29 Mon Sep 17 00:00:00 2001 From: Marek Simek Date: Thu, 21 May 2026 12:36:20 +0200 Subject: [PATCH 5/5] fixup! image/copy: Fix missing progress reporting for chunked layers Signed-off-by: Marek Simek --- image/copy/blob.go | 8 +- image/copy/progress.go | 141 +---------------------- image/copy/progress_bars.go | 142 +++++++++++++++++++++++ image/copy/progress_bars_test.go | 24 ++++ image/copy/progress_channel.go | 97 +++++++++------- image/copy/progress_channel_test.go | 170 +++++++++++++++++++++++++++- image/copy/progress_test.go | 128 --------------------- image/copy/single.go | 39 +++---- image/types/types.go | 4 +- 9 files changed, 417 insertions(+), 336 deletions(-) create mode 100644 image/copy/progress_bars.go create mode 100644 image/copy/progress_bars_test.go delete mode 100644 image/copy/progress_test.go diff --git a/image/copy/blob.go b/image/copy/blob.go index 873eec1599..c69627c0ab 100644 --- a/image/copy/blob.go +++ b/image/copy/blob.go @@ -19,7 +19,7 @@ import ( func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo, getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool, - reporter *progressReporter, + reporter progressReporter, ) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers; // that pipeline is built by updating stream. @@ -85,8 +85,10 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read return types.BlobInfo{}, err } - // === Wrap stream with progress reporting if a reporter was provided. - if reporter != nil { + // === Report progress using the reporter, if required. + if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + // Note: the reporter can be a no-op if the condition above evaluates + // false and in that case there's no reason to wrap the reader here. stream.reader = newProgressReader(stream.reader, reporter) } diff --git a/image/copy/progress.go b/image/copy/progress.go index e377873bd3..150f9dea48 100644 --- a/image/copy/progress.go +++ b/image/copy/progress.go @@ -2,155 +2,20 @@ package copy import ( "context" - "fmt" "io" "math" "time" - "github.com/vbauerster/mpb/v8" - "github.com/vbauerster/mpb/v8/decor" "go.podman.io/image/v5/internal/private" "go.podman.io/image/v5/types" ) -// newProgressPool creates a *mpb.Progress. -// The caller must eventually call pool.Wait() after the pool will no longer be updated. -// NOTE: Every progress bar created within the progress pool must either successfully -// complete or be aborted, or pool.Wait() will hang. That is typically done -// using "defer bar.Abort(false)", which must be called BEFORE pool.Wait() is called. -func (c *copier) newProgressPool() *mpb.Progress { - return mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput)) -} - -// customPartialBlobDecorFunc implements mpb.DecorFunc for the partial blobs retrieval progress bar -func customPartialBlobDecorFunc(s decor.Statistics) string { - current := decor.SizeB1024(s.Current) - total := decor.SizeB1024(s.Total) - refill := decor.SizeB1024(s.Refill) - if s.Total == 0 { - return fmt.Sprintf("%.1f / %.1f (skipped: %.1f)", current, total, refill) - } - // If we didn't do a partial fetch then let's not output a distracting ("skipped: 0.0b = 0.00%") - if s.Refill == 0 { - return fmt.Sprintf("%.1f / %.1f", current, total) - } - percentage := 100.0 * float64(s.Refill) / float64(s.Total) - return fmt.Sprintf("%.1f / %.1f (skipped: %.1f = %.2f%%)", current, total, refill, percentage) -} - -// progressBar wraps a *mpb.Bar, allowing us to add extra state and methods. -type progressBar struct { - *mpb.Bar - originalSize int64 // or -1 if unknown -} - -// createProgressBar creates a progressBar in pool. Note that if the copier's reportWriter -// is io.Discard, the progress bar's output will be discarded. Callers may call printCopyInfo() -// to print a single line instead. -// -// NOTE: Every progress bar created within a progress pool must either successfully -// complete or be aborted, or pool.Wait() will hang. That is typically done -// using "defer bar.Abort(false)", which must happen BEFORE pool.Wait() is called. -// -// As a convention, most users of progress bars should call mark100PercentComplete on full success; -// by convention, we don't leave progress bars in partial state when fully done -// (even if we copied much less data than anticipated). -func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) (*progressBar, error) { - // shortDigestLen is the length of the digest used for blobs. - const shortDigestLen = 12 - - if err := info.Digest.Validate(); err != nil { // digest.Digest.Encoded() panics on failure, so validate explicitly. - return nil, err - } - prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded()) - // Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column. - maxPrefixLen := len("Copying blob ") + shortDigestLen - if len(prefix) > maxPrefixLen { - prefix = prefix[:maxPrefixLen] - } - - // onComplete will replace prefix once the bar/spinner has completed - onComplete = prefix + " " + onComplete - - // Use a normal progress bar when we know the size (i.e., size > 0). - // Otherwise, use a spinner to indicate that something's happening. - var bar *mpb.Bar - if info.Size > 0 { - if partial { - bar = pool.AddBar(info.Size, - mpb.BarFillerClearOnComplete(), - mpb.PrependDecorators( - decor.OnComplete(decor.Name(prefix), onComplete), - ), - mpb.AppendDecorators( - decor.Any(customPartialBlobDecorFunc), - ), - ) - } else { - bar = pool.AddBar(info.Size, - mpb.BarFillerClearOnComplete(), - mpb.PrependDecorators( - decor.OnComplete(decor.Name(prefix), onComplete), - ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""), - decor.Name(" | "), - decor.OnComplete(decor.EwmaSpeed(decor.SizeB1024(0), "% .1f", 30), ""), - ), - ) - } - } else { - bar = pool.New(0, - mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft(), - mpb.BarFillerClearOnComplete(), - mpb.PrependDecorators( - decor.OnComplete(decor.Name(prefix), onComplete), - ), - mpb.AppendDecorators( - decor.OnComplete(decor.EwmaSpeed(decor.SizeB1024(0), "% .1f", 30), ""), - ), - ) - } - return &progressBar{ - Bar: bar, - originalSize: info.Size, - }, nil -} - -// printCopyInfo prints a "Copying ..." message on the copier if the output is -// set to `io.Discard`. In that case, the progress bars won't be rendered but -// we still want to indicate when blobs and configs are copied. -func (c *copier) printCopyInfo(kind string, info types.BlobInfo) { - if c.progressOutput == io.Discard { - c.Printf("Copying %s %s\n", kind, info.Digest) - } -} - -// mark100PercentComplete marks the progress bars as 100% complete; -// it may do so by possibly advancing the current state if it is below the known total. -func (bar *progressBar) mark100PercentComplete() { - if bar.originalSize > 0 { - // We can't call bar.SetTotal even if we wanted to; the total can not be changed - // after a progress bar is created with a definite total. - bar.SetCurrent(bar.originalSize) // This triggers the completion condition. - } else { - // -1 = unknown size - // 0 is somewhat of a special case: Unlike c/image, where 0 is a definite known - // size (possible at least in theory), in mpb, zero-sized progress bars are treated - // as unknown size, in particular they are not configured to be marked as - // complete on bar.Current() reaching bar.total (because that would happen already - // when creating the progress bar). - // That means that we are both _allowed_ to call SetTotal, and we _have to_. - bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete. - } -} - // blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar // and optionally *progressReporter (if non-nil) with the number of received bytes. type blobChunkAccessorProxy struct { wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor bar *progressBar // A progress bar updated with the number of bytes read so far - reporter *progressReporter // A progress reporter updated with the number of bytes read so far + reporter progressReporter // A progress reporter updated with the number of bytes read so far } // GetBlobAt returns a sequential channel of readers that contain data for the requested @@ -172,9 +37,7 @@ func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobI } total += int64(c.Length) } - if s.reporter != nil { - s.reporter.reportRead(uint64(total)) - } + s.reporter.reportRead(uint64(total)) s.bar.EwmaIncrInt64(total, time.Since(start)) } return rc, errs, err diff --git a/image/copy/progress_bars.go b/image/copy/progress_bars.go new file mode 100644 index 0000000000..ed95896bf5 --- /dev/null +++ b/image/copy/progress_bars.go @@ -0,0 +1,142 @@ +package copy + +import ( + "fmt" + "io" + + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" + "go.podman.io/image/v5/types" +) + +// newProgressPool creates a *mpb.Progress. +// The caller must eventually call pool.Wait() after the pool will no longer be updated. +// NOTE: Every progress bar created within the progress pool must either successfully +// complete or be aborted, or pool.Wait() will hang. That is typically done +// using "defer bar.Abort(false)", which must be called BEFORE pool.Wait() is called. +func (c *copier) newProgressPool() *mpb.Progress { + return mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput)) +} + +// customPartialBlobDecorFunc implements mpb.DecorFunc for the partial blobs retrieval progress bar +func customPartialBlobDecorFunc(s decor.Statistics) string { + current := decor.SizeB1024(s.Current) + total := decor.SizeB1024(s.Total) + refill := decor.SizeB1024(s.Refill) + if s.Total == 0 { + return fmt.Sprintf("%.1f / %.1f (skipped: %.1f)", current, total, refill) + } + // If we didn't do a partial fetch then let's not output a distracting ("skipped: 0.0b = 0.00%") + if s.Refill == 0 { + return fmt.Sprintf("%.1f / %.1f", current, total) + } + percentage := 100.0 * float64(s.Refill) / float64(s.Total) + return fmt.Sprintf("%.1f / %.1f (skipped: %.1f = %.2f%%)", current, total, refill, percentage) +} + +// progressBar wraps a *mpb.Bar, allowing us to add extra state and methods. +type progressBar struct { + *mpb.Bar + originalSize int64 // or -1 if unknown +} + +// createProgressBar creates a progressBar in pool. Note that if the copier's reportWriter +// is io.Discard, the progress bar's output will be discarded. Callers may call printCopyInfo() +// to print a single line instead. +// +// NOTE: Every progress bar created within a progress pool must either successfully +// complete or be aborted, or pool.Wait() will hang. That is typically done +// using "defer bar.Abort(false)", which must happen BEFORE pool.Wait() is called. +// +// As a convention, most users of progress bars should call mark100PercentComplete on full success; +// by convention, we don't leave progress bars in partial state when fully done +// (even if we copied much less data than anticipated). +func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) (*progressBar, error) { + // shortDigestLen is the length of the digest used for blobs. + const shortDigestLen = 12 + + if err := info.Digest.Validate(); err != nil { // digest.Digest.Encoded() panics on failure, so validate explicitly. + return nil, err + } + prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded()) + // Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column. + maxPrefixLen := len("Copying blob ") + shortDigestLen + if len(prefix) > maxPrefixLen { + prefix = prefix[:maxPrefixLen] + } + + // onComplete will replace prefix once the bar/spinner has completed + onComplete = prefix + " " + onComplete + + // Use a normal progress bar when we know the size (i.e., size > 0). + // Otherwise, use a spinner to indicate that something's happening. + var bar *mpb.Bar + if info.Size > 0 { + if partial { + bar = pool.AddBar(info.Size, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.OnComplete(decor.Name(prefix), onComplete), + ), + mpb.AppendDecorators( + decor.Any(customPartialBlobDecorFunc), + ), + ) + } else { + bar = pool.AddBar(info.Size, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.OnComplete(decor.Name(prefix), onComplete), + ), + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""), + decor.Name(" | "), + decor.OnComplete(decor.EwmaSpeed(decor.SizeB1024(0), "% .1f", 30), ""), + ), + ) + } + } else { + bar = pool.New(0, + mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft(), + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.OnComplete(decor.Name(prefix), onComplete), + ), + mpb.AppendDecorators( + decor.OnComplete(decor.EwmaSpeed(decor.SizeB1024(0), "% .1f", 30), ""), + ), + ) + } + return &progressBar{ + Bar: bar, + originalSize: info.Size, + }, nil +} + +// printCopyInfo prints a "Copying ..." message on the copier if the output is +// set to `io.Discard`. In that case, the progress bars won't be rendered but +// we still want to indicate when blobs and configs are copied. +func (c *copier) printCopyInfo(kind string, info types.BlobInfo) { + if c.progressOutput == io.Discard { + c.Printf("Copying %s %s\n", kind, info.Digest) + } +} + +// mark100PercentComplete marks the progress bars as 100% complete; +// it may do so by possibly advancing the current state if it is below the known total. +func (bar *progressBar) mark100PercentComplete() { + if bar.originalSize > 0 { + // We can't call bar.SetTotal even if we wanted to; the total can not be changed + // after a progress bar is created with a definite total. + bar.SetCurrent(bar.originalSize) // This triggers the completion condition. + } else { + // -1 = unknown size + // 0 is somewhat of a special case: Unlike c/image, where 0 is a definite known + // size (possible at least in theory), in mpb, zero-sized progress bars are treated + // as unknown size, in particular they are not configured to be marked as + // complete on bar.Current() reaching bar.total (because that would happen already + // when creating the progress bar). + // That means that we are both _allowed_ to call SetTotal, and we _have to_. + bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete. + } +} diff --git a/image/copy/progress_bars_test.go b/image/copy/progress_bars_test.go new file mode 100644 index 0000000000..0c863cacce --- /dev/null +++ b/image/copy/progress_bars_test.go @@ -0,0 +1,24 @@ +package copy + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/vbauerster/mpb/v8/decor" +) + +func TestCustomPartialBlobDecorFunc(t *testing.T) { + // A stub test + s := decor.Statistics{} + assert.Equal(t, "0.0b / 0.0b (skipped: 0.0b)", customPartialBlobDecorFunc(s)) + // Partial pull in progress + s = decor.Statistics{} + s.Current = 1097653 + s.Total = 8329917 + s.Refill = 509722 + assert.Equal(t, "1.0MiB / 7.9MiB (skipped: 497.8KiB = 6.12%)", customPartialBlobDecorFunc(s)) + // Almost complete, but no reuse + s.Current = int64(float64(s.Total) * 0.95) + s.Refill = 0 + assert.Equal(t, "7.5MiB / 7.9MiB", customPartialBlobDecorFunc(s)) +} diff --git a/image/copy/progress_channel.go b/image/copy/progress_channel.go index 0cc1cfa1aa..b49810b207 100644 --- a/image/copy/progress_channel.go +++ b/image/copy/progress_channel.go @@ -7,77 +7,92 @@ import ( "go.podman.io/image/v5/types" ) -// progressReporter facilitates progress reporting through its -// underlying types.ProgressProperties channel on an interval. -type progressReporter struct { - channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent - interval time.Duration // The update interval to indicate how often the progress should update - artifact types.BlobInfo // The blob metadata which is currently being progressed - lastUpdate time.Time // The last time a progress channel event was sent - offset uint64 // The currently downloaded size in bytes - offsetUpdate uint64 // The number of bytes downloaded since lastUpdate +// progressReporter is an interface for reporting progress about a single blob. +type progressReporter interface { + reportRead(bytesRead uint64) + reportDone() + reset() } -// newProgressReporter creates a new progress reporter +// noopProgressReporter is a no-op implementation of progressReporter. +type noopProgressReporter struct{} + +func (r *noopProgressReporter) reportRead(uint64) {} +func (r *noopProgressReporter) reportDone() {} +func (r *noopProgressReporter) reset() {} + +// channelProgressReporter reports progress about a single blob to a +// types.ProgressProperties channel and supports re-starting from zero +// without reporting the progress through the channel unless +// it's higher than the offset reached before the restart to +// avoid confusing behavior in consumers of the events +// (skipping back). +type channelProgressReporter struct { + channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent + interval time.Duration // The update interval to indicate how often the progress should update + artifact types.BlobInfo // The blob metadata which is currently being progressed + lastUpdate time.Time // The last time a progress channel event was sent + offset uint64 // The currently downloaded size in bytes + maxReportedOffset uint64 // The high-water mark for offset already sent to the channel +} + +// newChannelProgressReporter creates a new progress reporter // and immediately reports a new artifact event. -func newProgressReporter( +func newChannelProgressReporter( channel chan<- types.ProgressProperties, interval time.Duration, artifact types.BlobInfo, -) *progressReporter { +) progressReporter { channel <- types.ProgressProperties{ Event: types.ProgressEventNewArtifact, Artifact: artifact, } - return &progressReporter{ - channel: channel, - interval: interval, - artifact: artifact, - lastUpdate: time.Now(), + return &channelProgressReporter{ + channel: channel, + interval: interval, + artifact: artifact, + lastUpdate: time.Now(), + offset: 0, + maxReportedOffset: 0, } } -// reset resets the reporters progress -// and reports its zeroed state. +// reset resets the reporter's progress. +// // It's meant to be used on error when // the processing has to be re-started // (e.g. ErrFallbackToOrdinaryLayerDownload). -func (r *progressReporter) reset() { +func (r *channelProgressReporter) reset() { r.offset = 0 - r.offsetUpdate = 0 - - r.channel <- types.ProgressProperties{ - Event: types.ProgressEventRead, - Artifact: r.artifact, - Offset: r.offset, - OffsetUpdate: r.offsetUpdate, - } - r.lastUpdate = time.Now() } -// reportRead reports progress with the number of `bytesRead`. -func (r *progressReporter) reportRead(bytesRead uint64) { +// reportRead reports progress with the number of `bytesRead` +// while keeping track of a current high-water mark in case +// of reset(). It never skips back below the already reported +// offset and does not report the progress unless +// the configured `interval` elapses. +func (r *channelProgressReporter) reportRead(bytesRead uint64) { r.offset += bytesRead - r.offsetUpdate += bytesRead - if time.Since(r.lastUpdate) > r.interval { + if r.offset > r.maxReportedOffset && time.Since(r.lastUpdate) > r.interval { r.channel <- types.ProgressProperties{ Event: types.ProgressEventRead, Artifact: r.artifact, Offset: r.offset, - OffsetUpdate: r.offsetUpdate, + OffsetUpdate: r.offset - r.maxReportedOffset, } + r.maxReportedOffset = r.offset r.lastUpdate = time.Now() - r.offsetUpdate = 0 } } -// reportDone reports completion. -func (r *progressReporter) reportDone() { +// reportDone reports successful completion. +func (r *channelProgressReporter) reportDone() { + offset := max(r.offset, r.maxReportedOffset) r.channel <- types.ProgressProperties{ Event: types.ProgressEventDone, Artifact: r.artifact, - Offset: r.offset, - OffsetUpdate: r.offsetUpdate, + Offset: offset, + OffsetUpdate: offset - r.maxReportedOffset, } } @@ -85,14 +100,14 @@ func (r *progressReporter) reportDone() { // with additional reporting of its progress. type progressReader struct { source io.Reader - *progressReporter + progressReporter } // newProgressReader creates a new progress reader that wraps source // and reports progress through the given reporter. func newProgressReader( source io.Reader, - reporter *progressReporter, + reporter progressReporter, ) *progressReader { return &progressReader{ source: source, diff --git a/image/copy/progress_channel_test.go b/image/copy/progress_channel_test.go index 4fdca01c98..11f71b581e 100644 --- a/image/copy/progress_channel_test.go +++ b/image/copy/progress_channel_test.go @@ -4,12 +4,180 @@ import ( "bytes" "io" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" "go.podman.io/image/v5/types" ) +// consumeNonblocking drains all currently buffered events +// from a channel and returns them. It never blocks. +func consumeNonblocking(channel <-chan types.ProgressProperties) []types.ProgressProperties { + var result []types.ProgressProperties + for { + select { + case p := <-channel: + result = append(result, p) + default: + return result + } + } +} + +// TestNewProgressReporter verifies that constructing a reporter +// signals a new artifact event. +func TestNewProgressReporter(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + + r := newChannelProgressReporter(channel, time.Second, artifact) + assert.NotNil(t, r, "newChannelProgressReporter should return a progress reporter") + + // Verify only a new artifact event was received. + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventNewArtifact, + Artifact: artifact, + }}, events, "constructor should send exactly one new artifact event") +} + +// TestProgressReporterReportRead verifies that a read event is sent +// after the interval elapses and not before. +func TestProgressReporterReportRead(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + // Create a reporter and consume the new artifact event. + r := newChannelProgressReporter(channel, interval, artifact) + assert.NotEmpty(t, consumeNonblocking(channel), "should send new artifact event") + + // Verify that before the interval elapses: no event is sent. + r.reportRead(5) + assert.Empty(t, consumeNonblocking(channel), "should not send before interval") + + // Verify that after the interval: read event is sent. + time.Sleep(2 * interval) + r.reportRead(10) + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 15, + OffsetUpdate: 15, + }}, events, "should send a read event after interval elapses") + }) +} + +// TestProgressReporterReportDone verifies that a done event +// includes the accumulated offset and the not-yet-reported offset update. +func TestProgressReporterReportDone(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + // Create a reporter and consume the new artifact event. + r := newChannelProgressReporter(channel, interval, artifact) + assert.NotEmpty(t, consumeNonblocking(channel), "should send new artifact event") + + // Simulate progress with three reported reads. + const ( + read1 = 30 + read2 = 20 + read3 = 15 + ) + r.reportRead(read1) + assert.Empty(t, consumeNonblocking(channel), "should not send before interval") + time.Sleep(2 * interval) + r.reportRead(read2) + assert.NotEmpty(t, consumeNonblocking(channel), "should send after interval elapses") + r.reportRead(read3) + + // Verify that the done event includes the + // accumulated offset from all three reads + // and the OffsetUpdate is the final read + // that happened before the update interval + // elapsed. + r.reportDone() + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventDone, + Artifact: artifact, + Offset: read1 + read2 + read3, + OffsetUpdate: read3, + }}, events, "should send a done event with accumulated offsets") + }) +} + +// TestProgressReporterReset verifies that reset does not report +// negative progress and suppresses reports until the offset +// exceeds the previously reported high-water mark. +func TestProgressReporterReset(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + channel := make(chan types.ProgressProperties, 10) + artifact := types.BlobInfo{} + interval := 5 * time.Second + + // Create a reporter and consume the new artifact event. + r := newChannelProgressReporter(channel, interval, artifact) + assert.NotEmpty(t, consumeNonblocking(channel), "should send new artifact event") + + // reportRead(10): no event before interval. + r.reportRead(10) + assert.Empty(t, consumeNonblocking(channel)) + + // After interval, reportRead(10): reports +20=20. + time.Sleep(2 * interval) + r.reportRead(10) + events := consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 20, + OffsetUpdate: 20, + }}, events) + + // reportRead(10): no event (interval not elapsed). + r.reportRead(10) + assert.Empty(t, consumeNonblocking(channel)) + + // reset: no event sent. + r.reset() + assert.Empty(t, consumeNonblocking(channel), "reset should not send an event") + + // After interval, reportRead(15): nothing (15 < 20 already reported). + time.Sleep(2 * interval) + r.reportRead(15) + assert.Empty(t, consumeNonblocking(channel), "should not report below high-water mark") + + // After interval, reportRead(10): reports +5=25. + time.Sleep(2 * interval) + r.reportRead(10) + events = consumeNonblocking(channel) + assert.Equal(t, []types.ProgressProperties{{ + Event: types.ProgressEventRead, + Artifact: artifact, + Offset: 25, + OffsetUpdate: 5, + }}, events) + }) +} + +// TestNoopProgressReporter verifies that +// a noopProgressReporter can be called without panicking. +func TestNoopProgressReporter(t *testing.T) { + r := &noopProgressReporter{} + + // Make sure that no progressReporter method panics. + r.reportRead(100) + r.reset() + r.reportRead(50) + r.reportDone() +} + func newSUT( t *testing.T, reader io.Reader, @@ -18,7 +186,7 @@ func newSUT( ) *progressReader { artifact := types.BlobInfo{Size: 10} - reporter := newProgressReporter(channel, duration, artifact) + reporter := newChannelProgressReporter(channel, duration, artifact) res := <-channel assert.Equal(t, res.Event, types.ProgressEventNewArtifact) assert.Equal(t, res.Artifact, artifact) diff --git a/image/copy/progress_test.go b/image/copy/progress_test.go deleted file mode 100644 index 5e4cca81d1..0000000000 --- a/image/copy/progress_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package copy - -import ( - "testing" - "testing/synctest" - "time" - - "github.com/stretchr/testify/assert" - "github.com/vbauerster/mpb/v8/decor" - "go.podman.io/image/v5/types" -) - -// TestNewProgressReporter verifies that constructing a reporter -// signals a new artifact event. -func TestNewProgressReporter(t *testing.T) { - channel := make(chan types.ProgressProperties, 1) - artifact := types.BlobInfo{} - - r := newProgressReporter(channel, time.Second, artifact) - assert.NotNil(t, r) - assert.Equal(t, types.ProgressProperties{ - Event: types.ProgressEventNewArtifact, - Artifact: artifact, - }, <-channel, "constructor should send a new artifact event") -} - -// TestProgressReporterReportRead verifies that a read event is sent -// after the interval elapses and not before. -func TestProgressReporterReportRead(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - channel := make(chan types.ProgressProperties, 1) - artifact := types.BlobInfo{} - interval := 5 * time.Second - - r := newProgressReporter(channel, interval, artifact) - <-channel - - // Before the interval: offset is accumulated, but no event was sent. - r.reportRead(5) - assert.Equal(t, uint64(5), r.offset, "offset should be accumulated") - assert.Equal(t, uint64(5), r.offsetUpdate, "offsetUpdate should be accumulated") - - // Verify that after the interval event was sent. - time.Sleep(2 * interval) - go func() { - r.reportRead(10) - }() - res := <-channel - assert.Equal(t, types.ProgressProperties{ - Event: types.ProgressEventRead, - Artifact: artifact, - Offset: 15, - OffsetUpdate: 15, - }, res, "should send a read event after interval elapses") - }) -} - -// TestProgressReporterReportDone verifies that a done event -// includes the accumulated offset. -func TestProgressReporterReportDone(t *testing.T) { - channel := make(chan types.ProgressProperties, 1) - artifact := types.BlobInfo{} - - r := newProgressReporter(channel, time.Hour, artifact) - <-channel - - // Simulate progress. - r.offset = 50 - r.offsetUpdate = 10 - - // Complete. - go func() { - r.reportDone() - }() - - // Verify that the done event was received. - res := <-channel - assert.Equal(t, types.ProgressProperties{ - Event: types.ProgressEventDone, - Artifact: artifact, - Offset: 50, - OffsetUpdate: 10, - }, res, "should send a done event with accumulated offsets") -} - -// TestProgressReporterReset verifies that reset zeroes the offsets and -// reports a read event. -func TestProgressReporterReset(t *testing.T) { - channel := make(chan types.ProgressProperties, 1) - artifact := types.BlobInfo{} - - r := newProgressReporter(channel, time.Hour, artifact) - <-channel - - // Simulate progress. - r.offset = 30 - r.offsetUpdate = 15 - - // Reset the reporter. - go func() { - r.reset() - }() - - // Verify that a read event was received with zero values. - res := <-channel - assert.Equal(t, types.ProgressProperties{ - Event: types.ProgressEventRead, - Artifact: artifact, - }, res, "should send a read event with zeroed offsets") - assert.Equal(t, uint64(0), r.offset, "offset should be zeroed after reset") - assert.Equal(t, uint64(0), r.offsetUpdate, "offsetUpdate should be zeroed after reset") -} - -func TestCustomPartialBlobDecorFunc(t *testing.T) { - // A stub test - s := decor.Statistics{} - assert.Equal(t, "0.0b / 0.0b (skipped: 0.0b)", customPartialBlobDecorFunc(s)) - // Partial pull in progress - s = decor.Statistics{} - s.Current = 1097653 - s.Total = 8329917 - s.Refill = 509722 - assert.Equal(t, "1.0MiB / 7.9MiB (skipped: 497.8KiB = 6.12%)", customPartialBlobDecorFunc(s)) - // Almost complete, but no reuse - s.Current = int64(float64(s.Total) * 0.95) - s.Refill = 0 - assert.Equal(t, "7.5MiB / 7.9MiB", customPartialBlobDecorFunc(s)) -} diff --git a/image/copy/single.go b/image/copy/single.go index 11450e4d5d..88ff9265d0 100644 --- a/image/copy/single.go +++ b/image/copy/single.go @@ -634,10 +634,15 @@ func (ic *imageCopier) copyConfig(ctx context.Context, src types.Image) error { return types.BlobInfo{}, fmt.Errorf("reading config blob %s: %w", srcInfo.Digest, err) } - destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false, nil) + var reporter progressReporter = &noopProgressReporter{} + if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + reporter = newChannelProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) + } + destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false, reporter) if err != nil { return types.BlobInfo{}, err } + reporter.reportDone() bar.mark100PercentComplete() return destInfo, nil @@ -789,7 +794,10 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to // of the source file are not known yet and must be fetched. // Attempt a partial only when the source allows to retrieve a blob partially and // the destination has support for it. - var reporter *progressReporter + var reporter progressReporter = &noopProgressReporter{} + if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { + reporter = newChannelProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) + } if canAvoidProcessingCompleteLayer && ic.c.rawSource.SupportsGetBlobAt() && ic.c.dest.SupportsPutBlobPartial() { reused, blobInfo, err := func() (bool, types.BlobInfo, error) { // A scope for defer bar, err := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done") @@ -802,12 +810,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to }() proxy := blobChunkAccessorProxy{ - wrapped: ic.c.rawSource, - bar: bar, - } - if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { - reporter = newProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) - proxy.reporter = reporter + wrapped: ic.c.rawSource, + bar: bar, + reporter: reporter, } uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, private.PutBlobPartialOptions{ Cache: ic.c.blobInfoCache, @@ -823,9 +828,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to bar.mark100PercentComplete() hideProgressBar = false - if reporter != nil { - reporter.reportDone() - } + reporter.reportDone() logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest) return true, updatedBlobInfoFromUpload(srcInfo, uploadedBlob), nil @@ -834,9 +837,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to var perr private.ErrFallbackToOrdinaryLayerDownload if errors.As(err, &perr) { // Reset progress, the reporter is reused for the fallback. - if reporter != nil { - reporter.reset() - } + reporter.reset() logrus.Debugf("Failed to retrieve partial blob: %v", err) return false, types.BlobInfo{}, nil } @@ -864,10 +865,6 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() - // Create a reporter if not reused on ErrFallbackToOrdinaryLayerDownload. - if reporter == nil && ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 { - reporter = newProgressReporter(ic.c.options.Progress, ic.c.options.ProgressInterval, srcInfo) - } blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer, reporter) if err != nil { return types.BlobInfo{}, "", err @@ -899,9 +896,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } // Only report completion on success. - if reporter != nil { - reporter.reportDone() - } + reporter.reportDone() bar.mark100PercentComplete() return blobInfo, diffID, nil }() @@ -951,7 +946,7 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool, - reporter *progressReporter, + reporter progressReporter, ) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult diff --git a/image/types/types.go b/image/types/types.go index 161df100fb..20a73cd7d6 100644 --- a/image/types/types.go +++ b/image/types/types.go @@ -707,8 +707,8 @@ type SystemContext struct { type ProgressEvent uint const ( - // ProgressEventNewArtifact will be fired when starting processing a new - // artifact + // ProgressEventNewArtifact will be fired when starting to process + // a new artifact ProgressEventNewArtifact ProgressEvent = iota // ProgressEventRead indicates that the artifact download is currently in