diff --git a/image/docker/docker_image_src.go b/image/docker/docker_image_src.go index 4003af5d27..4ec9f53e31 100644 --- a/image/docker/docker_image_src.go +++ b/image/docker/docker_image_src.go @@ -8,15 +8,20 @@ import ( "fmt" "io" "math" + "math/rand/v2" "mime" "mime/multipart" + "net" "net/http" "net/url" "os" "os/exec" "strings" "sync" + "time" + "github.com/docker/distribution/registry/api/errcode" + v2 "github.com/docker/distribution/registry/api/v2" digest "github.com/opencontainers/go-digest" "github.com/sirupsen/logrus" "go.podman.io/image/v5/docker/reference" @@ -48,6 +53,19 @@ type dockerImageSource struct { // State cachedManifest []byte // nil if not loaded yet cachedManifestMIMEType string // Only valid if cachedManifest != nil + + // Mirror fallback: when blob fetch fails with a retryable error, try + // remaining mirrors before giving up. Protected by mirrorMu. + mirrorMu sync.Mutex + mirrorOverride *mirrorSource // If non-nil, this is the mirror and physicalRef for the override client + prevOverrides []*dockerClient // old overrides not yet closed + remainingSources []sysregistriesv2.PullSource + fallbackSys *types.SystemContext +} + +type mirrorSource struct { + client *dockerClient + ref dockerReference } // newImageSource creates a new ImageSource for the specified image reference. @@ -91,7 +109,7 @@ func newImageSource(ctx context.Context, sys *types.SystemContext, ref dockerRef err error } attempts := []attempt{} - for _, pullSource := range pullSources { + for i, pullSource := range pullSources { if sys != nil && sys.DockerLogMirrorChoice { logrus.Infof("Trying to access %q", pullSource.Reference) } else { @@ -99,6 +117,10 @@ func newImageSource(ctx context.Context, sys *types.SystemContext, ref dockerRef } s, err := newImageSourceAttempt(ctx, sys, ref, pullSource, registryConfig) if err == nil { + if i+1 < len(pullSources) { + s.remainingSources = pullSources[i+1:] + s.fallbackSys = sys + } return s, nil } logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err) @@ -205,6 +227,18 @@ func (s *dockerImageSource) Reference() types.ImageReference { // Close removes resources associated with an initialized ImageSource, if any. func (s *dockerImageSource) Close() error { + s.mirrorMu.Lock() + prev := s.prevOverrides + override := s.mirrorOverride + s.prevOverrides = nil + s.mirrorOverride = nil + s.mirrorMu.Unlock() + for _, m := range prev { + m.Close() + } + if override != nil { + override.client.Close() + } return s.c.Close() } @@ -455,7 +489,161 @@ func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, // The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided. // May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location. func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { - return s.c.getBlob(ctx, s.physicalRef, info, cache) + return s.getBlob(ctx, info, cache) +} + +func (s *dockerImageSource) getBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { + client, physRef, hasRemaining := s.getActiveSource() + reader, size, err := tryGetBlob(ctx, client, physRef, info, cache) + if err == nil || !hasRemaining { + return reader, size, err + } + if isMirrorTransientError(err) || isMirrorFallbackError(err) { + logrus.Debugf("Blob %s fetch from %q failed (%v), trying fallback sources", info.Digest, physRef.ref, err) + return s.getBlobWithMirrorFallback(ctx, info, cache, err, client) + } + return reader, size, err +} + +func (s *dockerImageSource) getActiveSource() (*dockerClient, dockerReference, bool) { + s.mirrorMu.Lock() + defer s.mirrorMu.Unlock() + hasRemaining := len(s.remainingSources) > 0 + if s.mirrorOverride != nil { + return s.mirrorOverride.client, s.mirrorOverride.ref, hasRemaining + } + return s.c, s.physicalRef, hasRemaining +} + +func tryGetBlob(ctx context.Context, client *dockerClient, physRef dockerReference, + info types.BlobInfo, cache types.BlobInfoCache, +) (io.ReadCloser, int64, error) { + reader, size, err := client.getBlob(ctx, physRef, info, cache) + if err != nil && isMirrorTransientError(err) { + logrus.Debugf("Transient error fetching blob %s from %q, retrying: %v", info.Digest, physRef.ref, err) + delay := time.Second + rand.N(time.Second/10) // 1s + 10% jitter + select { + case <-ctx.Done(): + return nil, 0, fmt.Errorf("%w (while retrying after: %v)", ctx.Err(), err) + case <-time.After(delay): + } + reader, size, err = client.getBlob(ctx, physRef, info, cache) + } + return reader, size, err +} + +// isMirrorFallbackError returns true for errors where the blob is not present +// on this mirror but may be on another — warranting a fallback attempt. +// All tested registries (docker.io, registry.redhat.io, quay.io, registry.access.redhat.com) +// return BLOB_UNKNOWN for blob 404s, matching the OCI distribution spec. +func isMirrorFallbackError(err error) bool { + // Blob endpoint returns HTTP 404 with errcode JSON body + // {"errors":[{"code":"BLOB_UNKNOWN",...}]}. + // registryHTTPResponseToError → handleErrorResponse → parseHTTPErrorResponse + // returns errcode.Errors containing errcode.Error{Code: v2.ErrorCodeBlobUnknown}. + var ec errcode.ErrorCoder + if errors.As(err, &ec) && ec.ErrorCode() == v2.ErrorCodeBlobUnknown { + return true + } + + // UnexpectedHTTPStatusError (capital U) is NOT matched here — for regular blobs, + // handleErrorResponse never produces it for 4xx (only for 5xx). For external blobs, + // getExternalBlob() produces it via newUnexpectedHTTPStatusError() directly for any + // non-200 including 404, but external URLs are fixed and mirror-independent — + // a different registry mirror cannot serve them. + return false +} + +// isMirrorTransientError returns true for errors that are transient — the mirror +// probably has the blob but temporarily cannot serve it. +func isMirrorTransientError(err error) bool { + // HTTP 5xx: handleErrorResponse returns UnexpectedHTTPStatusError for status + // codes outside 400–499. Server-side error, another mirror may succeed. + var httpErr UnexpectedHTTPStatusError + if errors.As(err, &httpErr) && httpErr.StatusCode >= 500 { + return true + } + + // Network timeout: makeRequest returns net.Error with Timeout() == true. + // The mirror is reachable but slow — worth retrying on another. + var netErr net.Error + return errors.As(err, &netErr) && netErr.Timeout() +} + +func (s *dockerImageSource) getBlobWithMirrorFallback(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, originalErr error, failedClient *dockerClient) (io.ReadCloser, int64, error) { + // Held for the full fallback loop, including network I/O. This serializes + // concurrent blob fetchers during fallback — only one goroutine probes + // sources while the rest block on getActiveSource(). Acceptable trade-off: + // fallback is rare and serialization prevents thundering-herd probing. + s.mirrorMu.Lock() + defer s.mirrorMu.Unlock() + + // If another goroutine already switched to a working mirror, try it first. + if s.mirrorOverride != nil && s.mirrorOverride.client != failedClient { + client, physRef := s.mirrorOverride.client, s.mirrorOverride.ref + reader, size, err := tryGetBlob(ctx, client, physRef, info, cache) + if err == nil { + return reader, size, nil + } + // Override failed — retire it so subsequent goroutines don't retry it. + s.prevOverrides = append(s.prevOverrides, s.mirrorOverride.client) + s.mirrorOverride = nil + } + + registryConf, regErr := loadRegistryConfiguration(s.fallbackSys) + if regErr != nil { + logrus.Debugf("Mirror fallback: failed to load registry config: %v", regErr) + return nil, 0, originalErr + } + + type attempt struct { + ref reference.Named + err error + } + attempts := []attempt{} + for len(s.remainingSources) > 0 { + pullSource := s.remainingSources[0] + s.remainingSources = s.remainingSources[1:] + + logrus.Debugf("Trying to access %q", pullSource.Reference) + + // newImageSourceAttempt calls ensureManifestIsLoaded(), which preserves + // the manifest-level filtering: only mirrors that can serve the manifest + // are accepted. + fallback, fallbackErr := newImageSourceAttempt(ctx, s.fallbackSys, s.logicalRef, pullSource, registryConf) + if fallbackErr != nil { + logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, fallbackErr) + attempts = append(attempts, attempt{ref: pullSource.Reference, err: fallbackErr}) + continue + } + + reader, size, err := tryGetBlob(ctx, fallback.c, fallback.physicalRef, info, cache) + if err != nil { + fallback.c.Close() + logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err) + attempts = append(attempts, attempt{ref: pullSource.Reference, err: err}) + if !isMirrorTransientError(err) && !isMirrorFallbackError(err) { + break + } + continue + } + + if s.mirrorOverride != nil { + s.prevOverrides = append(s.prevOverrides, s.mirrorOverride.client) + } + s.mirrorOverride = &mirrorSource{client: fallback.c, ref: fallback.physicalRef} + logrus.Debugf("Blob fetch succeeded from fallback source %q, switching to it for future requests", pullSource.Reference) + + return reader, size, nil + } + if len(attempts) > 0 { + extras := []string{} + for _, a := range attempts { + extras = append(extras, fmt.Sprintf("[%s: %v]", a.ref.String(), a.err)) + } + logrus.Debugf("(Fallback sources also failed: %s): %v", strings.Join(extras, "\n"), originalErr) + } + return nil, 0, originalErr } // GetSignaturesWithFormat returns the image's signatures. It may use a remote (= slow) service.