Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feat-crc
Browse files Browse the repository at this point in the history
  • Loading branch information
Yang Kaiyong committed Feb 24, 2025
2 parents c5d46ba + 9193160 commit fcaf26f
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 36 deletions.
2 changes: 1 addition & 1 deletion contrib/nydus-overlayfs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ GIT_COMMIT := $(shell git rev-parse --verify HEAD --short=7)
BUILD_TIME := $(shell date -u +%Y%m%d.%H%M)
PACKAGES ?= $(shell go list ./... | grep -v /vendor/)
GOARCH ?= $(shell go env GOARCH)
GOPROXY ?= https://goproxy.io
GOPROXY ?=

ifdef GOPROXY
PROXY := GOPROXY=${GOPROXY}
Expand Down
2 changes: 1 addition & 1 deletion contrib/nydusify/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PACKAGES ?= $(shell go list ./... | grep -v /vendor/)
GOARCH ?= $(shell go env GOARCH)
GOPROXY ?= https://goproxy.io
GOPROXY ?=

ifdef GOPROXY
PROXY := GOPROXY=${GOPROXY}
Expand Down
2 changes: 1 addition & 1 deletion contrib/nydusify/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/containerd/containerd => github.com/nydusaccelerator/containerd v0.0.0-20240605070649-62e0d4d66f9f
replace github.com/containerd/containerd => github.com/nydusaccelerator/containerd v1.7.18-nydus.10
4 changes: 2 additions & 2 deletions contrib/nydusify/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ github.com/moby/sys/signal v0.7.0 h1:25RW3d5TnQEoKvRbEKUGay6DCQ46IxAVTT9CUMgmsSI
github.com/moby/sys/signal v0.7.0/go.mod h1:GQ6ObYZfqacOwTtlXvcmh9A26dVRul/hbOZn88Kg8Tg=
github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg=
github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU=
github.com/nydusaccelerator/containerd v0.0.0-20240605070649-62e0d4d66f9f h1:jbWfZohlnnbKXcYykpfw0VT8baJpI90sWg0hxvD596g=
github.com/nydusaccelerator/containerd v0.0.0-20240605070649-62e0d4d66f9f/go.mod h1:IYEk9/IO6wAPUz2bCMVUbsfXjzw5UNP5fLz4PsUygQ4=
github.com/nydusaccelerator/containerd v1.7.18-nydus.10 h1:ir28uQOPtYtFP+gry7sbiwaOHUISC1viPeogTDTff+Q=
github.com/nydusaccelerator/containerd v1.7.18-nydus.10/go.mod h1:IYEk9/IO6wAPUz2bCMVUbsfXjzw5UNP5fLz4PsUygQ4=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down
2 changes: 2 additions & 0 deletions contrib/nydusify/pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"

"github.com/containerd/containerd/remotes"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/remote"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/utils"
"github.com/opencontainers/go-digest"
Expand All @@ -27,6 +28,7 @@ type Backend interface {
Check(blobID string) (bool, error)
Type() Type
Reader(blobID string) (io.ReadCloser, error)
RangeReader(blobID string) (remotes.RangeReadCloser, error)
Size(blobID string) (int64, error)
}

Expand Down
15 changes: 15 additions & 0 deletions contrib/nydusify/pkg/backend/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/containerd/containerd/remotes"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -259,6 +260,20 @@ func (b *OSSBackend) Type() Type {
return OssBackend
}

type RangeReader struct {
b *OSSBackend
blobID string
}

func (rr *RangeReader) Reader(offset int64, size int64) (io.ReadCloser, error) {
return rr.b.bucket.GetObject(rr.blobID, oss.Range(offset, offset+size-1))
}

func (b *OSSBackend) RangeReader(blobID string) (remotes.RangeReadCloser, error) {
blobID = b.objectPrefix + blobID
return &RangeReader{b: b, blobID: blobID}, nil
}

func (b *OSSBackend) Reader(blobID string) (io.ReadCloser, error) {
blobID = b.objectPrefix + blobID
rc, err := b.bucket.GetObject(blobID)
Expand Down
5 changes: 5 additions & 0 deletions contrib/nydusify/pkg/backend/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"

"github.com/containerd/containerd/remotes"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/remote"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -47,6 +48,10 @@ func (r *Registry) Type() Type {
return RegistryBackend
}

func (r *Registry) RangeReader(_ string) (remotes.RangeReadCloser, error) {
panic("not implemented")
}

func (r *Registry) Reader(_ string) (io.ReadCloser, error) {
panic("not implemented")
}
Expand Down
20 changes: 20 additions & 0 deletions contrib/nydusify/pkg/backend/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/containerd/containerd/remotes"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -160,6 +161,25 @@ func (b *S3Backend) blobObjectKey(blobID string) string {
return b.objectPrefix + blobID
}

type rangeReader struct {
b *S3Backend
objectKey string
}

func (rr *rangeReader) Reader(offset int64, size int64) (io.ReadCloser, error) {
output, err := rr.b.client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: &rr.b.bucketName,
Key: &rr.objectKey,
Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
})
return output.Body, err
}

func (b *S3Backend) RangeReader(blobID string) (remotes.RangeReadCloser, error) {
objectKey := b.blobObjectKey(blobID)
return &rangeReader{b: b, objectKey: objectKey}, nil
}

func (b *S3Backend) Reader(blobID string) (io.ReadCloser, error) {
objectKey := b.blobObjectKey(blobID)
output, err := b.client.GetObject(context.TODO(), &s3.GetObjectInput{
Expand Down
107 changes: 77 additions & 30 deletions contrib/nydusify/pkg/copier/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import (

"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
containerdErrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes"
containerdErrdefs "github.com/containerd/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/converter"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/backend"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/checker/tool"
Expand Down Expand Up @@ -66,6 +67,23 @@ type output struct {
Blobs []string
}

func withRetry(handle func() error, total int) error {
for {
total--
err := handle()
if err == nil {
return nil
}

if total > 0 && !errors.Is(err, context.Canceled) {
logrus.WithError(err).Warnf("retry (remain %d times)", total)
continue
}

return err
}
}

func hosts(opt Opt) remote.HostFunc {
maps := map[string]bool{
opt.Source: opt.SourceInsecure,
Expand All @@ -76,7 +94,7 @@ func hosts(opt Opt) remote.HostFunc {
}
}

func getPushWriter(ctx context.Context, pvd *provider.Provider, desc ocispec.Descriptor, opt Opt) (content.Writer, error) {
func getPusherInChunked(ctx context.Context, pvd *provider.Provider, desc ocispec.Descriptor, opt Opt) (remotes.PusherInChunked, error) {
resolver, err := pvd.Resolver(opt.Target)
if err != nil {
return nil, errors.Wrap(err, "get resolver")
Expand All @@ -85,18 +103,13 @@ func getPushWriter(ctx context.Context, pvd *provider.Provider, desc ocispec.Des
if !strings.Contains(ref, "@") {
ref = ref + "@" + desc.Digest.String()
}
pusher, err := resolver.Pusher(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "create pusher")
}
writer, err := pusher.Push(ctx, desc)

pusherInChunked, err := resolver.PusherInChunked(ctx, ref)
if err != nil {
if containerdErrdefs.IsAlreadyExists(err) {
return nil, nil
}
return nil, err
return nil, errors.Wrap(err, "create pusher in chunked")
}
return writer, nil

return pusherInChunked, nil
}

func pushBlobFromBackend(
Expand Down Expand Up @@ -167,11 +180,6 @@ func pushBlobFromBackend(
blobSizeStr := humanize.Bytes(uint64(blobSize))

logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushing blob from backend")
rc, err := backend.Reader(blobID)
if err != nil {
return errors.Wrap(err, "get blob reader")
}
defer rc.Close()
blobDescs[idx] = ocispec.Descriptor{
Digest: blobDigest,
Size: blobSize,
Expand All @@ -180,22 +188,61 @@ func pushBlobFromBackend(
converter.LayerAnnotationNydusBlob: "true",
},
}
writer, err := getPushWriter(ctx, pvd, blobDescs[idx], opt)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
writer, err = getPushWriter(ctx, pvd, blobDescs[idx], opt)
}

if err := withRetry(func() error {
pusher, err := getPusherInChunked(ctx, pvd, blobDescs[idx], opt)
if err != nil {
return errors.Wrap(err, "get push writer")
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
pusher, err = getPusherInChunked(ctx, pvd, blobDescs[idx], opt)
}
if err != nil {
return errors.Wrapf(err, "get push writer: %s", blobDigest)
}
}

push := func() error {
if blobSize > opt.PushChunkSize {
rr, err := backend.RangeReader(blobID)
if err != nil {
return errors.Wrapf(err, "get push reader: %s", blobDigest)
}
if err := pusher.PushInChunked(ctx, blobDescs[idx], rr); err != nil {
return errors.Wrapf(err, "push blob in chunked: %s", blobDigest)
}
} else {
rc, err := backend.Reader(blobID)
if err != nil {
return errors.Wrap(err, "get blob reader")
}
defer rc.Close()
writer, err := pusher.Push(ctx, blobDescs[idx])
if err != nil {
return errors.Wrapf(err, "get push writer: %s", blobDigest)
}
if writer != nil {
defer writer.Close()
if err := content.Copy(ctx, writer, rc, blobSize, blobDigest); err != nil {
return errors.Wrapf(err, "push blob: %s", blobDigest)
}
}
}
return nil
}
}
if writer != nil {
defer writer.Close()
return content.Copy(ctx, writer, rc, blobSize, blobDigest)
}

logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend")
if err := push(); err != nil {
if containerdErrdefs.IsAlreadyExists(err) {
logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend (exists)")
return nil
}
return errors.Wrapf(err, "copy blob content: %s", blobDigest)
}
logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend")

return nil
}, 3); err != nil {
return errors.Wrapf(err, "push blob: %s", blobDigest)
}

return nil
})
Expand Down
5 changes: 5 additions & 0 deletions contrib/nydusify/pkg/packer/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"testing"

"github.com/containerd/containerd/remotes"
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/backend"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -44,6 +45,10 @@ func (m *mockBackend) Reader(_ string) (io.ReadCloser, error) {
panic("not implemented")
}

func (m *mockBackend) RangeReader(_ string) (remotes.RangeReadCloser, error) {
panic("not implemented")
}

func (m *mockBackend) Size(_ string) (int64, error) {
panic("not implemented")
}
Expand Down
2 changes: 1 addition & 1 deletion smoke/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
PACKAGES ?= $(shell go list ./... | grep -v /vendor/)
GOPROXY ?= https://goproxy.io
GOPROXY ?=
TESTS ?= .*

ifdef GOPROXY
Expand Down

0 comments on commit fcaf26f

Please sign in to comment.