Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions agent/agentserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
if err != nil {
return err
}

d, err := s.tags.Get(tag)
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
if err != nil {
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("get tag: %s", err)
}

io.WriteString(w, d.String())
return nil
}
Expand All @@ -139,23 +141,38 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err
if err != nil {
return err
}

f, err := s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) || s.cads.InDownloadError(err) {
if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}
f, err = s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return handler.Errorf("store: %s", err)
}
} else {
return handler.Errorf("store: %s", err)

// Happy path: file already exists in cache
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these comments that repeat the code logic? IMO they get stale with time and the cost of maintaining them is higher than the value they provide by clarifying the code. Were they written by AI and forgotten after? I think this happens a lot with AI code :D

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also love the nesting reduction here too! Code is much more readable like this

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

if err == nil {
if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %s", err)
}
return nil
}

// If error is not recoverable, return error
if !os.IsNotExist(err) && !s.cads.InDownloadError(err) {
return handler.Errorf("store: %s", err)
}

// File doesn't exist or is in wrong state, trigger P2P download
if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}

// Get file reader after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
f, err = s.cads.Any().GetFileReader(d.Hex())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What happens when a file is in the download dir, but is partially downloaded? Does it get returned?
  2. If we can serve blobs directly from the download dir, what is the purpose of having a download and a cache dir separately? Aren't we violating any atomicity invariants by serving data from the download dir?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. s.sched.Download() blocks until download is complete, concurrent requests are deduplicated, so it will not be returned
  2. I guess the purpose is:
  • Download dir: Incomplete files being assembled piece-by-piece
  • Cache dir: Complete, verified files ready for serving

But Any() is just handling the microsecond window where the move operation is in flight

if err != nil {
return handler.Errorf("store: %s", err)
}

if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %s", err)
}
Expand Down
17 changes: 12 additions & 5 deletions lib/dockerregistry/storage_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ func (e InvalidRequestError) Error() string {
}

func toDriverError(err error, path string) error {
// Check for "not found" errors -> return 404
if errors.Is(err, os.ErrNotExist) ||
errors.Is(err, transfer.ErrBlobNotFound) ||
errors.Is(err, transfer.ErrTagNotFound) {
transfer.IsBlobNotFound(err) ||
transfer.IsTagNotFound(err) {
return driver.PathNotFoundError{
DriverName: Name,
Path: path,
}
}
return err
log.Errorf("Storage driver error for path %s: %v", path, err)
return driver.Error{
DriverName: Name,
Enclosed: err,
}
}

type krakenStorageDriverFactory struct {
Expand Down Expand Up @@ -132,7 +137,8 @@ func NewReadWriteStorageDriver(
config Config,
cas *store.CAStore,
transferer transfer.ImageTransferer,
verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error)) *KrakenStorageDriver {
verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error),
) *KrakenStorageDriver {
return &KrakenStorageDriver{
config: config,
transferer: transferer,
Expand All @@ -147,7 +153,8 @@ func NewReadOnlyStorageDriver(
config Config,
bs BlobStore,
transferer transfer.ImageTransferer,
verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error)) *KrakenStorageDriver {
verification func(repo string, digest core.Digest, blob store.FileReader) (SignatureVerificationDecision, error),
) *KrakenStorageDriver {
return &KrakenStorageDriver{
config: config,
transferer: transferer,
Expand Down
46 changes: 43 additions & 3 deletions lib/dockerregistry/transfer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,50 @@
// limitations under the License.
package transfer

import "errors"
import "fmt"

// ErrBlobNotFound is returned when a blob is not found by transferer.
var ErrBlobNotFound = errors.New("blob not found")
type ErrBlobNotFound struct {
Digest string
Reason string
}

func (e ErrBlobNotFound) Error() string {
if e.Reason != "" {
return fmt.Sprintf("blob %s not found: %s", e.Digest, e.Reason)
}
return fmt.Sprintf("blob %s not found", e.Digest)
}

// ErrTagNotFound is returned when a tag is not found by transferer.
var ErrTagNotFound = errors.New("tag not found")
type ErrTagNotFound struct {
Tag string
Reason string
}

func (e ErrTagNotFound) Error() string {
if e.Reason != "" {
return fmt.Sprintf("tag %s not found: %s", e.Tag, e.Reason)
}
return fmt.Sprintf("tag %s not found", e.Tag)
}

// IsBlobNotFound checks if an error is ErrBlobNotFound.
func IsBlobNotFound(err error) bool {
_, ok := err.(ErrBlobNotFound)
if ok {
return true
}
_, ok = err.(*ErrBlobNotFound)
return ok
}

// IsTagNotFound checks if an error is ErrTagNotFound.
func IsTagNotFound(err error) bool {
_, ok := err.(ErrTagNotFound)
if ok {
return true
}
_, ok = err.(*ErrTagNotFound)
return ok
}
123 changes: 95 additions & 28 deletions lib/dockerregistry/transfer/ro_transferer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,111 @@ func NewReadOnlyTransferer(
stats tally.Scope,
cads *store.CADownloadStore,
tags tagclient.Client,
sched scheduler.Scheduler) *ReadOnlyTransferer {

sched scheduler.Scheduler,
) *ReadOnlyTransferer {
stats = stats.Tagged(map[string]string{
"module": "rotransferer",
})

return &ReadOnlyTransferer{stats, cads, tags, sched}
}

// mapSchedulerError converts scheduler errors to appropriate transferer errors.
func mapSchedulerError(err error, d core.Digest) error {
switch err {
case scheduler.ErrTorrentNotFound:
return ErrBlobNotFound{
Digest: d.Hex(),
Reason: "torrent not found in tracker",
}
case scheduler.ErrTorrentTimeout:
return ErrBlobNotFound{
Digest: d.Hex(),
Reason: "download timed out",
}
case scheduler.ErrTorrentRemoved:
return ErrBlobNotFound{
Digest: d.Hex(),
Reason: "torrent was removed",
}
case scheduler.ErrSchedulerStopped:
return fmt.Errorf("scheduler unavailable: scheduler is stopped")
case scheduler.ErrSendEventTimedOut:
return fmt.Errorf("scheduler unavailable: event loop busy")
default:
return fmt.Errorf("scheduler download failed: %w", err)
}
}

// Stat returns blob info from local cache, and triggers download if the blob is
// not available locally.
func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
fi, err := t.cads.Cache().GetFileStat(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
fi, err = t.cads.Cache().GetFileStat(d.Hex())
if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)

// Happy path: file already exists in cache
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note about these comments as above

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}

// If error is not recoverable, return error
if !os.IsNotExist(err) && !t.cads.InDownloadError(err) {
return nil, fmt.Errorf("stat cache: %w", err)
}

// File doesn't exist or is in wrong state, trigger P2P download
if err := t.sched.Download(namespace, d); err != nil {
return nil, mapSchedulerError(err, d)
}

// Stat file after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
fi, err = t.cads.Any().GetFileStat(d.Hex())
if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}
if os.IsNotExist(err) {
return nil, ErrBlobNotFound{
Digest: d.Hex(),
Reason: "file not found after download",
}
} else if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)
}
return core.NewBlobInfo(fi.Size()), nil
return nil, fmt.Errorf("stat cache after download: %w", err)
}

// Download downloads blobs as torrent.
func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) {
f, err := t.cads.Cache().GetFileReader(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
f, err = t.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return nil, fmt.Errorf("cache: %s", err)

// Happy path: file already exists in cache
if err == nil {
return f, nil
}

// If error is not recoverable, return error
if !os.IsNotExist(err) && !t.cads.InDownloadError(err) {
return nil, fmt.Errorf("get cache file: %w", err)
}

// File doesn't exist or is in wrong state, trigger P2P download
if err := t.sched.Download(namespace, d); err != nil {
return nil, mapSchedulerError(err, d)
}

// Get file reader after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
f, err = t.cads.Any().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) {
return nil, ErrBlobNotFound{
Digest: d.Hex(),
Reason: "file not found after download",
}
}
} else if err != nil {
return nil, fmt.Errorf("cache: %s", err)
return nil, fmt.Errorf("get file reader after download: %w", err)
}

return f, nil
}

Expand All @@ -92,15 +156,18 @@ func (t *ReadOnlyTransferer) Upload(namespace string, d core.Digest, blob store.
// GetTag gets manifest digest for tag.
func (t *ReadOnlyTransferer) GetTag(tag string) (core.Digest, error) {
d, err := t.tags.Get(tag)
if err != nil {
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound
if err == nil {
return d, nil
}
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound{
Tag: tag,
Reason: "not found in build-index",
}
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %s", err)
}
return d, nil
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %w", err)
}

// PutTag is not supported.
Expand Down
2 changes: 1 addition & 1 deletion lib/dockerregistry/transfer/ro_transferer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestReadOnlyTransfererGetTagNotFound(t *testing.T) {

_, err := transferer.GetTag(tag)
require.Error(err)
require.Equal(ErrTagNotFound, err)
require.True(IsTagNotFound(err))
}

// TODO(codyg): This is a particularly ugly test that is a symptom of the lack
Expand Down
Loading
Loading