Skip to content

Commit

Permalink
First commit of copy fro one bucket to another with different AWS cre…
Browse files Browse the repository at this point in the history
…dentials
  • Loading branch information
sethkor committed Jun 15, 2019
1 parent 7144388 commit 8c19089
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 12 deletions.
33 changes: 28 additions & 5 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type BucketCopier struct {
sourceLength int
uploadManager s3manager.Uploader
downloadManager s3manager.Downloader
svc s3.S3
svc *s3.S3
destSvc *s3.S3
bars copyPb
wg sync.WaitGroup
files chan fileJob
Expand Down Expand Up @@ -288,7 +289,7 @@ func (cp *BucketCopier) copyObjects() (func(object *s3.Object) error, error) {
if *cp.template.ServerSideEncryption != "" {
copyTemplate.ServerSideEncryption = cp.template.ServerSideEncryption
}
cm := NewCopyerWithClient(&cp.svc)
cm := NewCopyerWithClient(cp.svc)

return func(object *s3.Object) error {
defer cp.threads.release(1)
Expand Down Expand Up @@ -415,7 +416,12 @@ func (cp *BucketCopier) copy(recursive bool) {
// List Objects
go cp.lister.ListObjects(true)

cp.copyAllObjects()
if cp.destSvc == nil {
cp.copyAllObjects()
} else {
rp := newRemoteCopier(cp)
rp.remoteCopy()
}

if progress != nil {
progress.Wait()
Expand Down Expand Up @@ -559,7 +565,7 @@ func (cp *BucketCopier) copy(recursive bool) {

// NewBucketCopier creates a new BucketCopier struct initialized with all variables needed to copy objects in and out of
// a bucket
func NewBucketCopier(source string, dest string, threads int, quiet bool, sess *session.Session, template s3manager.UploadInput) (*BucketCopier, error) {
func NewBucketCopier(source string, dest string, threads int, quiet bool, sess *session.Session, template s3manager.UploadInput, destProfile string) (*BucketCopier, error) {

var svc, destSvc *s3.S3
sourceURL, err := url.Parse(source)
Expand All @@ -586,6 +592,18 @@ func NewBucketCopier(source string, dest string, threads int, quiet bool, sess *
}

if destURL.Scheme == "s3" {

if destProfile != "" {
sess = session.Must(session.NewSessionWithOptions(session.Options{
Profile: destProfile,
SharedConfigState: session.SharedConfigEnable,
Config: aws.Config{
CredentialsChainVerboseErrors: aws.Bool(true),
MaxRetries: aws.Int(30),
},
}))
}

wg.Add(1)
go func() {
destSvc, err = checkBucket(sess, destURL.Host, &wg)
Expand All @@ -609,13 +627,18 @@ func NewBucketCopier(source string, dest string, threads int, quiet bool, sess *
quiet: quiet,
uploadManager: *s3manager.NewUploaderWithClient(svc),
downloadManager: *s3manager.NewDownloaderWithClient(svc),
svc: *svc,
svc: svc,
destSvc: destSvc,
threads: make(semaphore, threads),
sizeChan: make(chan objectCounter, threads),
wg: sync.WaitGroup{},
template: template,
}

if destProfile != "" {
bc.uploadManager = *s3manager.NewUploaderWithClient(destSvc)
}

if sourceURL.Scheme == "s3" {
bc.lister, err = NewBucketListerWithSvc(source, threads, svc)
//if destURL.Scheme == "s3" {
Expand Down
305 changes: 305 additions & 0 deletions dest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package main

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"sync"

"github.com/aws/aws-sdk-go/aws/awsutil"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"go.uber.org/zap"
)

const chunkSize int64 = 5 * 1024 * 1024

type chunk struct {
buffer io.ReadCloser
start int64
finish int64
num int64
template s3.UploadPartInput
}

var chunkThreads = make(semaphore, 20)

type RemoteCopy struct {
cp *BucketCopier
parts int64
chunkThread semaphore
wg sync.WaitGroup
}

func (rp *RemoteCopy) downloadChunks(object *s3.Object, chunks chan chunk) error {
var logger = zap.S()

var first int64
last := chunkSize

downloadInput := s3.GetObjectInput{
Bucket: aws.String(rp.cp.source.Host),
Key: object.Key,
}

for num := int64(1); num < rp.parts+1; num++ {
if last >= *object.Size {
last = *object.Size - 1
}

downloadInput.Range = aws.String(fmt.Sprintf("bytes=%d-%d", first, last))

chunkThreads.acquire(1)

resp, err := rp.cp.downloadManager.S3.GetObject(&downloadInput)

if err != nil {
if aerr, ok := err.(awserr.RequestFailure); ok {
switch aerr.StatusCode() {

default:
logger.Error(*object.Key)
logger.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
logger.Error(err.Error())
}

return err

}

chunks <- chunk{
buffer: resp.Body,
start: first,
finish: last,
num: num,
}

first = last + 1
last = first + chunkSize
}

close(chunks)

return nil
}

func (rp *RemoteCopy) uploadChunk(key *string, uploadId *string, wg *sync.WaitGroup) (func(chunk chunk), s3.CompletedMultipartUpload) {
var logger = zap.S()

var cmu s3.CompletedMultipartUpload
cmu.Parts = make([]*s3.CompletedPart, rp.parts)

input := s3.UploadPartInput{
Bucket: aws.String(rp.cp.target.Host),
Key: key,
UploadId: uploadId,
}

return func(chunk chunk) {
defer wg.Done()
buffer := make([]byte, (chunk.finish-chunk.start)+1)

buffer, err := ioutil.ReadAll(chunk.buffer)

writer := bytes.NewReader(buffer)

input.Body = writer
input.PartNumber = aws.Int64(chunk.num)

resp, err := rp.cp.uploadManager.S3.UploadPart(&input)

chunkThreads.release(1)
cmu.Parts[chunk.num-1] = &s3.CompletedPart{
ETag: resp.ETag,
PartNumber: aws.Int64(chunk.num),
}

if err != nil {

if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
default:
logger.Error(aerr.Error())
}
} else {
// Message from an error.
logger.Error(err.Error())
}

}

if !rp.cp.quiet {
rp.cp.bars.fileSize.IncrInt64(int64(len(buffer)))
}
}, cmu
}

//
func (rp *RemoteCopy) uploadChunks(bucket *string, key *string, uploadId *string, chunks chan chunk) error {

var wg sync.WaitGroup
uploadChunkFunc, cmu := rp.uploadChunk(key, uploadId, &wg)

for chunk := range chunks {
wg.Add(1)
go uploadChunkFunc(chunk)

}

wg.Wait()
_, err := rp.cp.uploadManager.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
MultipartUpload: &cmu,
Bucket: bucket,
Key: key,
UploadId: uploadId,
})

if !rp.cp.quiet {
rp.cp.bars.count.Increment()
}

return err

}

func (rp *RemoteCopy) copySingleOperationWithDestinationProfile(object *s3.Object) error {
var logger = zap.S()

downloadInput := s3.GetObjectInput{
Bucket: aws.String(rp.cp.source.Host),
Key: object.Key,
}

buffer := make([]byte, *object.Size)
writeBuffer := aws.NewWriteAtBuffer(buffer)

_, err := rp.cp.downloadManager.Download(writeBuffer, &downloadInput)

if err != nil {
if aerr, ok := err.(awserr.RequestFailure); ok {
switch aerr.StatusCode() {

default:
logger.Error(*object.Key)
logger.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
logger.Error(err.Error())
}

return err

}

// Upload the file to S3.
input := rp.cp.template
input.Key = aws.String(rp.cp.target.Path + "/" + (*object.Key)[len(rp.cp.source.Path):])
input.Body = bytes.NewReader(writeBuffer.Bytes())
_, err = rp.cp.uploadManager.Upload(&input)

if err != nil {

if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
default:
logger.Error(aerr.Error())
}
} else {
// Message from an error.
logger.Error(err.Error())
}

}

if !rp.cp.quiet {
rp.cp.bars.count.Increment()
rp.cp.bars.fileSize.IncrInt64(*object.Size)
}
return nil
}

func (rp *RemoteCopy) remoteCopyObject() (func(object *s3.Object) error, error) {

return func(object *s3.Object) error {
defer rp.cp.threads.release(1)

if *object.Size <= chunkSize {
chunkThreads.acquire(1)
err := rp.copySingleOperationWithDestinationProfile(object)
chunkThreads.release(1)
return err
}

//Create the multipart upload
params := &s3.CreateMultipartUploadInput{}
awsutil.Copy(params, object)
params.Bucket = aws.String(rp.cp.target.Host)
params.Key = aws.String(rp.cp.target.Path + "/" + (*object.Key)[len(rp.cp.source.Path):])

// Create the multipart
resp, err := rp.cp.uploadManager.S3.CreateMultipartUpload(params)
if err != nil {
return err
}

rp.parts = ((*object.Size) / chunkSize) + 1

chunks := make(chan chunk, 20)

go rp.downloadChunks(object, chunks)

err = rp.uploadChunks(params.Bucket, aws.String(rp.cp.target.Path+"/"+(*object.Key)[len(rp.cp.source.Path):]), resp.UploadId, chunks)

return err
}, nil
}

func (rp *RemoteCopy) remoteCopy() error {
defer rp.cp.wg.Done()

copyObjectsFunc, err := rp.remoteCopyObject()

if err != nil {
return err
}
allThreads := cap(rp.cp.threads)
if !rp.cp.quiet {
fmt.Printf("0")
}

//we need one thread to update the progress bar and another to do the downloads

for item := range rp.cp.objects {

for _, object := range item {
rp.cp.threads.acquire(1)
go copyObjectsFunc(object)
}

}
rp.cp.threads.acquire(allThreads)

if !rp.cp.quiet {
rp.cp.bars.count.SetTotal(rp.cp.bars.count.Current(), true)
rp.cp.bars.fileSize.SetTotal(rp.cp.bars.fileSize.Current(), true)
}

return nil
}

func newRemoteCopier(cp *BucketCopier) *RemoteCopy {
return &RemoteCopy{
cp: cp,
chunkThread: make(semaphore, 20),
}

}
Loading

0 comments on commit 8c19089

Please sign in to comment.