From fc571f4c956f09157ec1257642b7225aa7fed177 Mon Sep 17 00:00:00 2001 From: hweawer Date: Fri, 21 Nov 2025 15:50:35 +0100 Subject: [PATCH 1/7] fix: resolve race condition in file state verification and improve error handling This commit fixes critical issues with blob downloads in the agent: 1. Race Condition Fix: - Use Any() instead of Cache() after sched.Download() completes - Tolerates files in download state during brief move window - Eliminates spurious 500 errors when serving newly downloaded blobs - Safe due to Unix file descriptor semantics (fd stays valid after rename) 2. Improved Error Handling: - Add typed errors (ErrBlobNotFound, ErrTagNotFound) with detailed reasons - Map scheduler errors to appropriate response codes (404 vs 500) - Use %w for error wrapping to preserve error chains - Extract mapSchedulerError() helper to centralize error mapping 3. Code Quality: - Fix double slash in error messages (use filepath.Join) - Reduce nesting with early return patterns - Consistent error handling across methods --- agent/agentserver/server.go | 51 +++++--- lib/dockerregistry/storage_driver.go | 17 ++- lib/dockerregistry/transfer/errors.go | 46 ++++++- lib/dockerregistry/transfer/ro_transferer.go | 123 ++++++++++++++---- .../transfer/ro_transferer_test.go | 2 +- lib/dockerregistry/transfer/rw_transferer.go | 87 +++++++------ .../transfer/rw_transferer_test.go | 4 +- lib/dockerregistry/transfer/testing.go | 5 +- lib/store/base/errors.go | 9 +- lib/store/base/file_entry.go | 2 +- 10 files changed, 246 insertions(+), 100 deletions(-) diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index 93376021b..2211a5507 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -118,13 +118,15 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error { if err != nil { return err } + d, err := s.tags.Get(tag) + if err == tagclient.ErrTagNotFound { + return handler.ErrorStatus(http.StatusNotFound) + } if err != nil { - if err == tagclient.ErrTagNotFound { - return handler.ErrorStatus(http.StatusNotFound) - } return handler.Errorf("get tag: %s", err) } + io.WriteString(w, d.String()) return nil } @@ -139,23 +141,38 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err if err != nil { return err } + f, err := s.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - if os.IsNotExist(err) || s.cads.InDownloadError(err) { - if err := s.sched.Download(namespace, d); err != nil { - if err == scheduler.ErrTorrentNotFound { - return handler.ErrorStatus(http.StatusNotFound) - } - return handler.Errorf("download torrent: %s", err) - } - f, err = s.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - return handler.Errorf("store: %s", err) - } - } else { - return handler.Errorf("store: %s", err) + + // Happy path: file already exists in cache + if err == nil { + if _, err := io.Copy(w, f); err != nil { + return fmt.Errorf("copy file: %s", err) + } + return nil + } + + // If error is not recoverable, return error + if !os.IsNotExist(err) && !s.cads.InDownloadError(err) { + return handler.Errorf("store: %s", err) + } + + // File doesn't exist or is in wrong state, trigger P2P download + if err := s.sched.Download(namespace, d); err != nil { + if err == scheduler.ErrTorrentNotFound { + return handler.ErrorStatus(http.StatusNotFound) } + return handler.Errorf("download torrent: %s", err) } + + // Get file reader after download completes + // Use Any() to check both download and cache directories, as the file + // might still be in the process of being moved from download to cache. + f, err = s.cads.Any().GetFileReader(d.Hex()) + if err != nil { + return handler.Errorf("store: %s", err) + } + if _, err := io.Copy(w, f); err != nil { return fmt.Errorf("copy file: %s", err) } diff --git a/lib/dockerregistry/storage_driver.go b/lib/dockerregistry/storage_driver.go index 9f90eb075..e0887a3eb 100644 --- a/lib/dockerregistry/storage_driver.go +++ b/lib/dockerregistry/storage_driver.go @@ -77,15 +77,20 @@ func (e InvalidRequestError) Error() string { } func toDriverError(err error, path string) error { + // Check for "not found" errors -> return 404 if errors.Is(err, os.ErrNotExist) || - errors.Is(err, transfer.ErrBlobNotFound) || - errors.Is(err, transfer.ErrTagNotFound) { + transfer.IsBlobNotFound(err) || + transfer.IsTagNotFound(err) { return driver.PathNotFoundError{ DriverName: Name, Path: path, } } - return err + log.Errorf("Storage driver error for path %s: %v", path, err) + return driver.Error{ + DriverName: Name, + Enclosed: err, + } } type krakenStorageDriverFactory struct { @@ -132,7 +137,8 @@ func NewReadWriteStorageDriver( config Config, cas *store.CAStore, transferer transfer.ImageTransferer, - verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error)) *KrakenStorageDriver { + verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error), +) *KrakenStorageDriver { return &KrakenStorageDriver{ config: config, transferer: transferer, @@ -147,7 +153,8 @@ func NewReadOnlyStorageDriver( config Config, bs BlobStore, transferer transfer.ImageTransferer, - verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error)) *KrakenStorageDriver { + verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error), +) *KrakenStorageDriver { return &KrakenStorageDriver{ config: config, transferer: transferer, diff --git a/lib/dockerregistry/transfer/errors.go b/lib/dockerregistry/transfer/errors.go index 9aff892f9..f7987bffc 100644 --- a/lib/dockerregistry/transfer/errors.go +++ b/lib/dockerregistry/transfer/errors.go @@ -13,10 +13,50 @@ // limitations under the License. package transfer -import "errors" +import "fmt" // ErrBlobNotFound is returned when a blob is not found by transferer. -var ErrBlobNotFound = errors.New("blob not found") +type ErrBlobNotFound struct { + Digest string + Reason string +} + +func (e ErrBlobNotFound) Error() string { + if e.Reason != "" { + return fmt.Sprintf("blob %s not found: %s", e.Digest, e.Reason) + } + return fmt.Sprintf("blob %s not found", e.Digest) +} // ErrTagNotFound is returned when a tag is not found by transferer. -var ErrTagNotFound = errors.New("tag not found") +type ErrTagNotFound struct { + Tag string + Reason string +} + +func (e ErrTagNotFound) Error() string { + if e.Reason != "" { + return fmt.Sprintf("tag %s not found: %s", e.Tag, e.Reason) + } + return fmt.Sprintf("tag %s not found", e.Tag) +} + +// IsBlobNotFound checks if an error is ErrBlobNotFound. +func IsBlobNotFound(err error) bool { + _, ok := err.(ErrBlobNotFound) + if ok { + return true + } + _, ok = err.(*ErrBlobNotFound) + return ok +} + +// IsTagNotFound checks if an error is ErrTagNotFound. +func IsTagNotFound(err error) bool { + _, ok := err.(ErrTagNotFound) + if ok { + return true + } + _, ok = err.(*ErrTagNotFound) + return ok +} diff --git a/lib/dockerregistry/transfer/ro_transferer.go b/lib/dockerregistry/transfer/ro_transferer.go index 5913034de..2c4ab8011 100644 --- a/lib/dockerregistry/transfer/ro_transferer.go +++ b/lib/dockerregistry/transfer/ro_transferer.go @@ -40,8 +40,8 @@ func NewReadOnlyTransferer( stats tally.Scope, cads *store.CADownloadStore, tags tagclient.Client, - sched scheduler.Scheduler) *ReadOnlyTransferer { - + sched scheduler.Scheduler, +) *ReadOnlyTransferer { stats = stats.Tagged(map[string]string{ "module": "rotransferer", }) @@ -49,38 +49,102 @@ func NewReadOnlyTransferer( return &ReadOnlyTransferer{stats, cads, tags, sched} } +// mapSchedulerError converts scheduler errors to appropriate transferer errors. +func mapSchedulerError(err error, d core.Digest) error { + switch err { + case scheduler.ErrTorrentNotFound: + return ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "torrent not found in tracker", + } + case scheduler.ErrTorrentTimeout: + return ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "download timed out", + } + case scheduler.ErrTorrentRemoved: + return ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "torrent was removed", + } + case scheduler.ErrSchedulerStopped: + return fmt.Errorf("scheduler unavailable: scheduler is stopped") + case scheduler.ErrSendEventTimedOut: + return fmt.Errorf("scheduler unavailable: event loop busy") + default: + return fmt.Errorf("scheduler download failed: %w", err) + } +} + // Stat returns blob info from local cache, and triggers download if the blob is // not available locally. func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) { fi, err := t.cads.Cache().GetFileStat(d.Hex()) - if os.IsNotExist(err) || t.cads.InDownloadError(err) { - if err := t.sched.Download(namespace, d); err != nil { - return nil, fmt.Errorf("scheduler: %s", err) - } - fi, err = t.cads.Cache().GetFileStat(d.Hex()) - if err != nil { - return nil, fmt.Errorf("stat cache: %s", err) + + // Happy path: file already exists in cache + if err == nil { + return core.NewBlobInfo(fi.Size()), nil + } + + // If error is not recoverable, return error + if !os.IsNotExist(err) && !t.cads.InDownloadError(err) { + return nil, fmt.Errorf("stat cache: %w", err) + } + + // File doesn't exist or is in wrong state, trigger P2P download + if err := t.sched.Download(namespace, d); err != nil { + return nil, mapSchedulerError(err, d) + } + + // Stat file after download completes + // Use Any() to check both download and cache directories, as the file + // might still be in the process of being moved from download to cache. + fi, err = t.cads.Any().GetFileStat(d.Hex()) + if err == nil { + return core.NewBlobInfo(fi.Size()), nil + } + if os.IsNotExist(err) { + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "file not found after download", } - } else if err != nil { - return nil, fmt.Errorf("stat cache: %s", err) } - return core.NewBlobInfo(fi.Size()), nil + return nil, fmt.Errorf("stat cache after download: %w", err) } // Download downloads blobs as torrent. func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) { f, err := t.cads.Cache().GetFileReader(d.Hex()) - if os.IsNotExist(err) || t.cads.InDownloadError(err) { - if err := t.sched.Download(namespace, d); err != nil { - return nil, fmt.Errorf("scheduler: %s", err) - } - f, err = t.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - return nil, fmt.Errorf("cache: %s", err) + + // Happy path: file already exists in cache + if err == nil { + return f, nil + } + + // If error is not recoverable, return error + if !os.IsNotExist(err) && !t.cads.InDownloadError(err) { + return nil, fmt.Errorf("get cache file: %w", err) + } + + // File doesn't exist or is in wrong state, trigger P2P download + if err := t.sched.Download(namespace, d); err != nil { + return nil, mapSchedulerError(err, d) + } + + // Get file reader after download completes + // Use Any() to check both download and cache directories, as the file + // might still be in the process of being moved from download to cache. + f, err = t.cads.Any().GetFileReader(d.Hex()) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "file not found after download", + } } - } else if err != nil { - return nil, fmt.Errorf("cache: %s", err) + return nil, fmt.Errorf("get file reader after download: %w", err) } + return f, nil } @@ -92,15 +156,18 @@ func (t *ReadOnlyTransferer) Upload(namespace string, d core.Digest, blob store. // GetTag gets manifest digest for tag. func (t *ReadOnlyTransferer) GetTag(tag string) (core.Digest, error) { d, err := t.tags.Get(tag) - if err != nil { - if err == tagclient.ErrTagNotFound { - t.stats.Counter("tag_not_found").Inc(1) - return core.Digest{}, ErrTagNotFound + if err == nil { + return d, nil + } + if err == tagclient.ErrTagNotFound { + t.stats.Counter("tag_not_found").Inc(1) + return core.Digest{}, ErrTagNotFound{ + Tag: tag, + Reason: "not found in build-index", } - t.stats.Counter("get_tag_error").Inc(1) - return core.Digest{}, fmt.Errorf("client get tag: %s", err) } - return d, nil + t.stats.Counter("get_tag_error").Inc(1) + return core.Digest{}, fmt.Errorf("client get tag: %w", err) } // PutTag is not supported. diff --git a/lib/dockerregistry/transfer/ro_transferer_test.go b/lib/dockerregistry/transfer/ro_transferer_test.go index 5ffb61ede..08a06430f 100644 --- a/lib/dockerregistry/transfer/ro_transferer_test.go +++ b/lib/dockerregistry/transfer/ro_transferer_test.go @@ -143,7 +143,7 @@ func TestReadOnlyTransfererGetTagNotFound(t *testing.T) { _, err := transferer.GetTag(tag) require.Error(err) - require.Equal(ErrTagNotFound, err) + require.True(IsTagNotFound(err)) } // TODO(codyg): This is a particularly ugly test that is a symptom of the lack diff --git a/lib/dockerregistry/transfer/rw_transferer.go b/lib/dockerregistry/transfer/rw_transferer.go index e79fafd90..6b853c156 100644 --- a/lib/dockerregistry/transfer/rw_transferer.go +++ b/lib/dockerregistry/transfer/rw_transferer.go @@ -41,8 +41,8 @@ func NewReadWriteTransferer( stats tally.Scope, tags tagclient.Client, originCluster blobclient.ClusterClient, - cas *store.CAStore) *ReadWriteTransferer { - + cas *store.CAStore, +) *ReadWriteTransferer { stats = stats.Tagged(map[string]string{ "module": "rwtransferer", }) @@ -53,94 +53,103 @@ func NewReadWriteTransferer( // Stat returns blob info from origin cluster or local cache. func (t *ReadWriteTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) { fi, err := t.cas.GetCacheFileStat(d.Hex()) - if err != nil { - if os.IsNotExist(err) { - return t.originStat(namespace, d) - } - return nil, fmt.Errorf("stat cache file: %s", err) + if err == nil { + return core.NewBlobInfo(fi.Size()), nil + } + if os.IsNotExist(err) { + return t.originStat(namespace, d) } - return core.NewBlobInfo(fi.Size()), nil + return nil, fmt.Errorf("stat cache file: %w", err) } func (t *ReadWriteTransferer) originStat(namespace string, d core.Digest) (*core.BlobInfo, error) { bi, err := t.originCluster.Stat(namespace, d) - if err != nil { - // `docker push` stats blobs before uploading them. If the blob is not - // found, it will upload it. However if remote blob storage is unavailable, - // this will be a 5XX error, and will short-circuit push. We must consider - // this class of error to be a 404 to allow pushes to succeed while remote - // storage is down (write-back will eventually persist the blobs). - if err != blobclient.ErrBlobNotFound { - log.With("digest", d).Info("Error stat-ing origin blob: %s", err) - } - return nil, ErrBlobNotFound + if err == nil { + return bi, nil + } + // `docker push` stats blobs before uploading them. If the blob is not + // found, it will upload it. However if remote blob storage is unavailable, + // this will be a 5XX error, and will short-circuit push. We must consider + // this class of error to be a 404 to allow pushes to succeed while remote + // storage is down (write-back will eventually persist the blobs). + if err != blobclient.ErrBlobNotFound { + log.With("digest", d).Info("Error stat-ing origin blob: %s", err) + } + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "not found in origin cluster", } - return bi, nil } // Download downloads the blob of name into the file store and returns a reader // to the newly downloaded file. func (t *ReadWriteTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) { blob, err := t.cas.GetCacheFileReader(d.Hex()) - if err != nil { - if os.IsNotExist(err) { - return t.downloadFromOrigin(namespace, d) - } - return nil, fmt.Errorf("get cache file: %s", err) + if err == nil { + return blob, nil } - return blob, nil + if os.IsNotExist(err) { + return t.downloadFromOrigin(namespace, d) + } + return nil, fmt.Errorf("get cache file: %w", err) } func (t *ReadWriteTransferer) downloadFromOrigin(namespace string, d core.Digest) (store.FileReader, error) { tmp := fmt.Sprintf("%s.%s", d.Hex(), uuid.Generate().String()) if err := t.cas.CreateUploadFile(tmp, 0); err != nil { - return nil, fmt.Errorf("create upload file: %s", err) + return nil, fmt.Errorf("create upload file: %w", err) } w, err := t.cas.GetUploadFileReadWriter(tmp) if err != nil { - return nil, fmt.Errorf("get upload writer: %s", err) + return nil, fmt.Errorf("get upload writer: %w", err) } defer w.Close() if err := t.originCluster.DownloadBlob(namespace, d, w); err != nil { if err == blobclient.ErrBlobNotFound { - return nil, ErrBlobNotFound + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "not found in origin cluster", + } } - return nil, fmt.Errorf("origin: %s", err) + return nil, fmt.Errorf("origin download: %w", err) } if err := t.cas.MoveUploadFileToCache(tmp, d.Hex()); err != nil && !os.IsExist(err) { - return nil, fmt.Errorf("move upload file to cache: %s", err) + return nil, fmt.Errorf("move upload file to cache: %w", err) } blob, err := t.cas.GetCacheFileReader(d.Hex()) if err != nil { - return nil, fmt.Errorf("get cache file: %s", err) + return nil, fmt.Errorf("get cache file: %w", err) } return blob, nil } // Upload uploads blob to the origin cluster. func (t *ReadWriteTransferer) Upload( - namespace string, d core.Digest, blob store.FileReader) error { - + namespace string, d core.Digest, blob store.FileReader, +) error { return t.originCluster.UploadBlob(namespace, d, blob) } // GetTag returns the manifest digest for tag. func (t *ReadWriteTransferer) GetTag(tag string) (core.Digest, error) { d, err := t.tags.Get(tag) - if err != nil { - if err == tagclient.ErrTagNotFound { - return core.Digest{}, ErrTagNotFound + if err == nil { + return d, nil + } + if err == tagclient.ErrTagNotFound { + return core.Digest{}, ErrTagNotFound{ + Tag: tag, + Reason: "not found in build-index", } - return core.Digest{}, fmt.Errorf("client get tag: %s", err) } - return d, nil + return core.Digest{}, fmt.Errorf("client get tag: %w", err) } // PutTag uploads d as the manifest digest for tag. func (t *ReadWriteTransferer) PutTag(tag string, d core.Digest) error { if err := t.tags.PutAndReplicate(tag, d); err != nil { t.stats.Counter("put_tag_error").Inc(1) - return fmt.Errorf("put and replicate tag: %s", err) + return fmt.Errorf("put and replicate tag: %w", err) } return nil } diff --git a/lib/dockerregistry/transfer/rw_transferer_test.go b/lib/dockerregistry/transfer/rw_transferer_test.go index 681949fe0..e7c5169cb 100644 --- a/lib/dockerregistry/transfer/rw_transferer_test.go +++ b/lib/dockerregistry/transfer/rw_transferer_test.go @@ -115,7 +115,7 @@ func TestReadWriteTransfererGetTagNotFound(t *testing.T) { _, err := transferer.GetTag(tag) require.Error(err) - require.Equal(ErrTagNotFound, err) + require.True(IsTagNotFound(err)) } func TestReadWriteTransfererPutTag(t *testing.T) { @@ -191,5 +191,5 @@ func TestReadWriteTransfererStatNotFoundOnAnyOriginError(t *testing.T) { mocks.originCluster.EXPECT().Stat(namespace, blob.Digest).Return(nil, errors.New("any error")) _, err := transferer.Stat(namespace, blob.Digest) - require.Equal(ErrBlobNotFound, err) + require.True(IsBlobNotFound(err)) } diff --git a/lib/dockerregistry/transfer/testing.go b/lib/dockerregistry/transfer/testing.go index 362b5be52..b6ae68bd0 100644 --- a/lib/dockerregistry/transfer/testing.go +++ b/lib/dockerregistry/transfer/testing.go @@ -67,7 +67,10 @@ func (t *testTransferer) GetTag(tag string) (core.Digest, error) { } d, ok := t.tags[p] if !ok { - return core.Digest{}, ErrTagNotFound + return core.Digest{}, ErrTagNotFound{ + Tag: tag, + Reason: "not found in test transferer", + } } return d, nil } diff --git a/lib/store/base/errors.go b/lib/store/base/errors.go index e5785ac16..b28868d23 100644 --- a/lib/store/base/errors.go +++ b/lib/store/base/errors.go @@ -13,7 +13,10 @@ // limitations under the License. package base -import "fmt" +import ( + "fmt" + "path/filepath" +) // FileStateError represents errors related to file state. // It's used when a file is not in the state it was supposed to be in. @@ -25,8 +28,8 @@ type FileStateError struct { } func (e *FileStateError) Error() string { - return fmt.Sprintf("failed to perform \"%s\" on %s/%s: %s", - e.Op, e.State.GetDirectory(), e.Name, e.Msg) + return fmt.Sprintf("failed to perform \"%s\" on %s: %s", + e.Op, filepath.Join(e.State.GetDirectory(), e.Name), e.Msg) } // IsFileStateError returns true if the param is of FileStateError type. diff --git a/lib/store/base/file_entry.go b/lib/store/base/file_entry.go index db0431999..15bc55ce6 100644 --- a/lib/store/base/file_entry.go +++ b/lib/store/base/file_entry.go @@ -304,7 +304,7 @@ func (entry *localFileEntry) Create(targetState FileState, size int64) error { return err } - return f.Close() + return nil } // Reload tries to reload a file that doesn't exist in memory from disk. From ce235f85face37ae650e26a3782909d13f63e99e Mon Sep 17 00:00:00 2001 From: hweawer Date: Fri, 21 Nov 2025 16:17:56 +0100 Subject: [PATCH 2/7] Fix --- agent/agentserver/server.go | 10 ++++++---- lib/dockerregistry/transfer/errors.go | 21 ++++++++++++--------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index 2211a5507..a30fc1e71 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -32,6 +32,7 @@ import ( "github.com/uber/kraken/lib/store" "github.com/uber/kraken/lib/torrent/scheduler" "github.com/uber/kraken/tracker/announceclient" + "github.com/uber/kraken/utils/closers" "github.com/uber/kraken/utils/handler" "github.com/uber/kraken/utils/httputil" @@ -65,8 +66,8 @@ func New( sched scheduler.ReloadableScheduler, tags tagclient.Client, ac announceclient.Client, - containerRuntime containerruntime.Factory) *Server { - + containerRuntime containerruntime.Factory, +) *Server { stats = stats.Tagged(map[string]string{ "module": "agentserver", }) @@ -143,11 +144,12 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err } f, err := s.cads.Cache().GetFileReader(d.Hex()) + defer closers.Close(f) // Happy path: file already exists in cache if err == nil { if _, err := io.Copy(w, f); err != nil { - return fmt.Errorf("copy file: %s", err) + return fmt.Errorf("copy file: %w", err) } return nil } @@ -174,7 +176,7 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err } if _, err := io.Copy(w, f); err != nil { - return fmt.Errorf("copy file: %s", err) + return fmt.Errorf("copy file: %w", err) } return nil } diff --git a/lib/dockerregistry/transfer/errors.go b/lib/dockerregistry/transfer/errors.go index f7987bffc..2567d242e 100644 --- a/lib/dockerregistry/transfer/errors.go +++ b/lib/dockerregistry/transfer/errors.go @@ -13,7 +13,10 @@ // limitations under the License. package transfer -import "fmt" +import ( + "errors" + "fmt" +) // ErrBlobNotFound is returned when a blob is not found by transferer. type ErrBlobNotFound struct { @@ -43,20 +46,20 @@ func (e ErrTagNotFound) Error() string { // IsBlobNotFound checks if an error is ErrBlobNotFound. func IsBlobNotFound(err error) bool { - _, ok := err.(ErrBlobNotFound) - if ok { + var e ErrBlobNotFound + if errors.As(err, &e) { return true } - _, ok = err.(*ErrBlobNotFound) - return ok + var ePtr *ErrBlobNotFound + return errors.As(err, &ePtr) } // IsTagNotFound checks if an error is ErrTagNotFound. func IsTagNotFound(err error) bool { - _, ok := err.(ErrTagNotFound) - if ok { + var e ErrTagNotFound + if errors.As(err, &e) { return true } - _, ok = err.(*ErrTagNotFound) - return ok + var ePtr *ErrTagNotFound + return errors.As(err, &ePtr) } From ff65c417d4b905a119d9b0ee1f4fa4ac8a0471d0 Mon Sep 17 00:00:00 2001 From: hweawer Date: Fri, 21 Nov 2025 16:34:33 +0100 Subject: [PATCH 3/7] Simplify errors --- lib/dockerregistry/transfer/ro_transferer.go | 23 +++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/lib/dockerregistry/transfer/ro_transferer.go b/lib/dockerregistry/transfer/ro_transferer.go index 2c4ab8011..20a8df306 100644 --- a/lib/dockerregistry/transfer/ro_transferer.go +++ b/lib/dockerregistry/transfer/ro_transferer.go @@ -51,29 +51,16 @@ func NewReadOnlyTransferer( // mapSchedulerError converts scheduler errors to appropriate transferer errors. func mapSchedulerError(err error, d core.Digest) error { - switch err { - case scheduler.ErrTorrentNotFound: + // torrent not found → 404 + if err == scheduler.ErrTorrentNotFound { return ErrBlobNotFound{ Digest: d.Hex(), Reason: "torrent not found in tracker", } - case scheduler.ErrTorrentTimeout: - return ErrBlobNotFound{ - Digest: d.Hex(), - Reason: "download timed out", - } - case scheduler.ErrTorrentRemoved: - return ErrBlobNotFound{ - Digest: d.Hex(), - Reason: "torrent was removed", - } - case scheduler.ErrSchedulerStopped: - return fmt.Errorf("scheduler unavailable: scheduler is stopped") - case scheduler.ErrSendEventTimedOut: - return fmt.Errorf("scheduler unavailable: event loop busy") - default: - return fmt.Errorf("scheduler download failed: %w", err) } + + // All other scheduler errors → 500 with context + return fmt.Errorf("download blob %s: %w", d.Hex(), err) } // Stat returns blob info from local cache, and triggers download if the blob is From 97ffb2e95b2f332f17c6e1767ecd30bb1c29590c Mon Sep 17 00:00:00 2001 From: hweawer Date: Fri, 21 Nov 2025 16:43:50 +0100 Subject: [PATCH 4/7] Fix --- agent/agentserver/server.go | 3 ++- lib/dockerregistry/transfer/errors.go | 12 ++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index a30fc1e71..da1b96320 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -144,10 +144,10 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err } f, err := s.cads.Cache().GetFileReader(d.Hex()) - defer closers.Close(f) // Happy path: file already exists in cache if err == nil { + defer closers.Close(f) if _, err := io.Copy(w, f); err != nil { return fmt.Errorf("copy file: %w", err) } @@ -174,6 +174,7 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err if err != nil { return handler.Errorf("store: %s", err) } + defer closers.Close(f) if _, err := io.Copy(w, f); err != nil { return fmt.Errorf("copy file: %w", err) diff --git a/lib/dockerregistry/transfer/errors.go b/lib/dockerregistry/transfer/errors.go index 2567d242e..5e219de13 100644 --- a/lib/dockerregistry/transfer/errors.go +++ b/lib/dockerregistry/transfer/errors.go @@ -47,19 +47,11 @@ func (e ErrTagNotFound) Error() string { // IsBlobNotFound checks if an error is ErrBlobNotFound. func IsBlobNotFound(err error) bool { var e ErrBlobNotFound - if errors.As(err, &e) { - return true - } - var ePtr *ErrBlobNotFound - return errors.As(err, &ePtr) + return errors.As(err, &e) } // IsTagNotFound checks if an error is ErrTagNotFound. func IsTagNotFound(err error) bool { var e ErrTagNotFound - if errors.As(err, &e) { - return true - } - var ePtr *ErrTagNotFound - return errors.As(err, &ePtr) + return errors.As(err, &e) } From d1cc3f3181ac8ac0cfd74e20aeabb9ae5588fe4c Mon Sep 17 00:00:00 2001 From: hweawer Date: Fri, 21 Nov 2025 16:58:10 +0100 Subject: [PATCH 5/7] Fix formatting --- agent/agentserver/server.go | 3 +-- lib/dockerregistry/storage_driver.go | 6 ++---- lib/dockerregistry/transfer/ro_transferer.go | 3 +-- lib/dockerregistry/transfer/rw_transferer.go | 6 ++---- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index da1b96320..6920f2e9f 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -66,8 +66,7 @@ func New( sched scheduler.ReloadableScheduler, tags tagclient.Client, ac announceclient.Client, - containerRuntime containerruntime.Factory, -) *Server { + containerRuntime containerruntime.Factory) *Server { stats = stats.Tagged(map[string]string{ "module": "agentserver", }) diff --git a/lib/dockerregistry/storage_driver.go b/lib/dockerregistry/storage_driver.go index e0887a3eb..8972144be 100644 --- a/lib/dockerregistry/storage_driver.go +++ b/lib/dockerregistry/storage_driver.go @@ -137,8 +137,7 @@ func NewReadWriteStorageDriver( config Config, cas *store.CAStore, transferer transfer.ImageTransferer, - verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error), -) *KrakenStorageDriver { + verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error)) *KrakenStorageDriver { return &KrakenStorageDriver{ config: config, transferer: transferer, @@ -153,8 +152,7 @@ func NewReadOnlyStorageDriver( config Config, bs BlobStore, transferer transfer.ImageTransferer, - verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error), -) *KrakenStorageDriver { + verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error)) *KrakenStorageDriver { return &KrakenStorageDriver{ config: config, transferer: transferer, diff --git a/lib/dockerregistry/transfer/ro_transferer.go b/lib/dockerregistry/transfer/ro_transferer.go index 20a8df306..6ff35e1e5 100644 --- a/lib/dockerregistry/transfer/ro_transferer.go +++ b/lib/dockerregistry/transfer/ro_transferer.go @@ -40,8 +40,7 @@ func NewReadOnlyTransferer( stats tally.Scope, cads *store.CADownloadStore, tags tagclient.Client, - sched scheduler.Scheduler, -) *ReadOnlyTransferer { + sched scheduler.Scheduler) *ReadOnlyTransferer { stats = stats.Tagged(map[string]string{ "module": "rotransferer", }) diff --git a/lib/dockerregistry/transfer/rw_transferer.go b/lib/dockerregistry/transfer/rw_transferer.go index 6b853c156..ed393f61f 100644 --- a/lib/dockerregistry/transfer/rw_transferer.go +++ b/lib/dockerregistry/transfer/rw_transferer.go @@ -41,8 +41,7 @@ func NewReadWriteTransferer( stats tally.Scope, tags tagclient.Client, originCluster blobclient.ClusterClient, - cas *store.CAStore, -) *ReadWriteTransferer { + cas *store.CAStore) *ReadWriteTransferer { stats = stats.Tagged(map[string]string{ "module": "rwtransferer", }) @@ -125,8 +124,7 @@ func (t *ReadWriteTransferer) downloadFromOrigin(namespace string, d core.Digest // Upload uploads blob to the origin cluster. func (t *ReadWriteTransferer) Upload( - namespace string, d core.Digest, blob store.FileReader, -) error { + namespace string, d core.Digest, blob store.FileReader) error { return t.originCluster.UploadBlob(namespace, d, blob) } From b036ac51caf7b261ed4adc1327c8fb7a208fb63b Mon Sep 17 00:00:00 2001 From: hweawer Date: Mon, 24 Nov 2025 16:49:06 +0100 Subject: [PATCH 6/7] Update lib/dockerregistry/transfer/ro_transferer.go Co-authored-by: akalpakchiev <99887781+Anton-Kalpakchiev@users.noreply.github.com> --- lib/dockerregistry/transfer/ro_transferer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dockerregistry/transfer/ro_transferer.go b/lib/dockerregistry/transfer/ro_transferer.go index 6ff35e1e5..ffd76c27d 100644 --- a/lib/dockerregistry/transfer/ro_transferer.go +++ b/lib/dockerregistry/transfer/ro_transferer.go @@ -125,7 +125,7 @@ func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.Fi if os.IsNotExist(err) { return nil, ErrBlobNotFound{ Digest: d.Hex(), - Reason: "file not found after download", + Reason: "file not found on disk after download", } } return nil, fmt.Errorf("get file reader after download: %w", err) From d12ed8b0917118682b482ecce4c118e0c8aa3bd2 Mon Sep 17 00:00:00 2001 From: hweawer Date: Wed, 26 Nov 2025 16:16:19 +0100 Subject: [PATCH 7/7] Fix --- agent/agentserver/server.go | 6 ++---- lib/dockerregistry/transfer/ro_transferer.go | 9 ++------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index 6920f2e9f..e3e0fb89b 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -66,7 +66,8 @@ func New( sched scheduler.ReloadableScheduler, tags tagclient.Client, ac announceclient.Client, - containerRuntime containerruntime.Factory) *Server { + containerRuntime containerruntime.Factory, +) *Server { stats = stats.Tagged(map[string]string{ "module": "agentserver", }) @@ -144,7 +145,6 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err f, err := s.cads.Cache().GetFileReader(d.Hex()) - // Happy path: file already exists in cache if err == nil { defer closers.Close(f) if _, err := io.Copy(w, f); err != nil { @@ -153,12 +153,10 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err return nil } - // If error is not recoverable, return error if !os.IsNotExist(err) && !s.cads.InDownloadError(err) { return handler.Errorf("store: %s", err) } - // File doesn't exist or is in wrong state, trigger P2P download if err := s.sched.Download(namespace, d); err != nil { if err == scheduler.ErrTorrentNotFound { return handler.ErrorStatus(http.StatusNotFound) diff --git a/lib/dockerregistry/transfer/ro_transferer.go b/lib/dockerregistry/transfer/ro_transferer.go index ffd76c27d..21503477a 100644 --- a/lib/dockerregistry/transfer/ro_transferer.go +++ b/lib/dockerregistry/transfer/ro_transferer.go @@ -40,7 +40,8 @@ func NewReadOnlyTransferer( stats tally.Scope, cads *store.CADownloadStore, tags tagclient.Client, - sched scheduler.Scheduler) *ReadOnlyTransferer { + sched scheduler.Scheduler, +) *ReadOnlyTransferer { stats = stats.Tagged(map[string]string{ "module": "rotransferer", }) @@ -67,17 +68,14 @@ func mapSchedulerError(err error, d core.Digest) error { func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) { fi, err := t.cads.Cache().GetFileStat(d.Hex()) - // Happy path: file already exists in cache if err == nil { return core.NewBlobInfo(fi.Size()), nil } - // If error is not recoverable, return error if !os.IsNotExist(err) && !t.cads.InDownloadError(err) { return nil, fmt.Errorf("stat cache: %w", err) } - // File doesn't exist or is in wrong state, trigger P2P download if err := t.sched.Download(namespace, d); err != nil { return nil, mapSchedulerError(err, d) } @@ -102,17 +100,14 @@ func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobIn func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) { f, err := t.cads.Cache().GetFileReader(d.Hex()) - // Happy path: file already exists in cache if err == nil { return f, nil } - // If error is not recoverable, return error if !os.IsNotExist(err) && !t.cads.InDownloadError(err) { return nil, fmt.Errorf("get cache file: %w", err) } - // File doesn't exist or is in wrong state, trigger P2P download if err := t.sched.Download(namespace, d); err != nil { return nil, mapSchedulerError(err, d) }