Skip to content

Commit

Permalink
Big refactor to use methods in all operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Seth Van Buren authored and Seth Van Buren committed May 18, 2019
1 parent e944486 commit 02c28ff
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 263 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# s3kor
S3 tools built in GoLang using threads for fast parallel actions like copy, list and remove to S3.
S3 tools built in GoLang using threads for fast parallel actions like copy, list and remove to S3. It's intended as a drop in replacement for the `aws cli s3` set of commands so all flags and args should be the same with the exception of a few new ones.

Easiest way to install if you're on a Mac or Linux (amd64 or arm64) is to use [Homebrew](https://brew.sh/)

Expand Down Expand Up @@ -104,7 +104,7 @@ Tha maximum. concurrent uploads (`--concurrent` or `-c`) is dependent not only o

You can check your file limits in linux, macos and other flavour of OS with `ulimit -n`. Changing this limit in the os is possible and not always dangerous. Instructions on how to change it vary between OS so they are not described here. `s3kor` impacts these limits both in walking the file system and uploading the file so there is not a 1 to 1 correlation between the max limit ond the value you pass to `--concurrent`. Try to pass `s3kor` a max value that is about 20% less than the systems max limit value.

Currently if you hit a file limit, the error is not reported
Currently if you hit a file limit, the error is not reported.

For optimal throughput consider using a S3 VPC Gateway endpoint if you are executing s3kor from within an AWS VPC.

Expand Down
7 changes: 0 additions & 7 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,8 @@ func checkBucket(sess *session.Session, bucket string) (svc *s3.S3, err error) {
return svc, err
} //if

//if autoRegion {
svc = s3.New(sess, &aws.Config{MaxRetries: aws.Int(30),
Region: aws.String(bucketLocation)})
//} else {
// if *svc.Config.Region != bucketLocation {
// fmt.Println("Bucket exist in region", bucketLocation, "which is different to region passed", *svc.Config.Region, ". Please adjust region on the command line our use --auto-region")
// logger.Fatal("Bucket exist in region", bucketLocation, "which is different to region passed", *svc.Config.Region, ". Please adjust region on the command line our use --auto-region")
// }
//}

return svc, err
}
216 changes: 112 additions & 104 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"

"github.com/vbauerster/mpb/decor"
Expand All @@ -16,56 +18,71 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"go.uber.org/zap"
)

//The different type of progress bars. We have one for counting files and another for counting sizes
type progressBar struct {
type copyPb struct {
count *mpb.Bar
fileSize *mpb.Bar
}

type BucketCopier struct {
source url.URL
target url.URL
acl string `enum:"ObjectCannedACL"`
sourceLength int
uploadManager s3manager.Uploader
bars progressBar
wg *sync.WaitGroup
bars copyPb
wg sync.WaitGroup
files chan fileJob
fileCounter chan int64
threads semaphore
template s3manager.UploadInput
}

func (copier BucketCopier) uploadFile() func(file fileJob) {
func (copier *BucketCopier) copyFile(file string) {
var logger = zap.S()

//Some logic to determin the base path to be used as the prefix for S3. If the source pass ends with a "/" then
//the base of the source path is not used in the S3 prefix as we assume iths the contents of the directory, not
//the actual directory that is needed in the copy
_, splitFile := filepath.Split(copier.source.Path)
includeRoot := 0
if splitFile != "" {
includeRoot = len(splitFile)
}
f, err := os.Open(file)
if err != nil {
logger.Errorf("failed to open file %q, %v", file, err)
} else {
// Upload the file to S3.
input := copier.template
input.Key = aws.String(copier.target.Path + "/" + file[copier.sourceLength:])
input.Body = f
_, err = copier.uploadManager.Upload(&input)

sourceLength := len(copier.source.Path) - includeRoot
if len(copier.source.Path) == 0 {
sourceLength++
if err != nil {
logger.Error("Object failed to create in S3 ", file)

}
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
default:
logger.Error(aerr.Error())
} //switch
} else {
// Message from an error.
logger.Error(err.Error())
} //else
}
_ = f.Close()
logger.Debug("file>>>s3 ", file)
} //else
}

func (copier *BucketCopier) uploadFile() func(file fileJob) {
var logger = zap.S()

return func(file fileJob) {
defer copier.threads.release(1)
start := time.Now()
input := copier.template
if file.info.IsDir() {
//Don't create a prefix for the base dir
if len(file.path) != sourceLength {
input.Key = aws.String(copier.target.Path + "/" + file.path[sourceLength:] + "/")
if len(file.path) != copier.sourceLength {
input.Key = aws.String(copier.target.Path + "/" + file.path[copier.sourceLength:] + "/")
_, err := copier.uploadManager.Upload(&input)

if err != nil {
Expand All @@ -85,39 +102,14 @@ func (copier BucketCopier) uploadFile() func(file fileJob) {
logger.Info("dir>>>>s3 ", file.path)
} //if
} else {

f, err := os.Open(file.path)
if err != nil {
logger.Errorf("failed to open file %q, %v", file.path, err)
} else {
// Upload the file to S3.
input.Key = aws.String(copier.target.Path + "/" + file.path[sourceLength:])
input.Body = f
_, err = copier.uploadManager.Upload(&input)

if err != nil {
logger.Error("Object failed to create in S3 ", file.path)

if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
default:
logger.Error(aerr.Error())
} //switch
} else {
// Message from an error.
logger.Error(err.Error())
} //else
}
_ = f.Close()
logger.Debug("file>>>s3 ", file.path)
} //else
copier.copyFile(file.path)
} //else
copier.bars.count.IncrInt64(1, time.Since(start))
copier.bars.count.IncrInt64(1)
copier.bars.fileSize.IncrInt64(file.info.Size(), time.Since(start))
}
}

func (copier BucketCopier) processFiles() {
func (copier *BucketCopier) processFiles() {
defer copier.wg.Done()

allThreads := len(copier.threads)
Expand All @@ -130,7 +122,7 @@ func (copier BucketCopier) processFiles() {

}

func (pb progressBar) updateBar(fileSize <-chan int64, wg *sync.WaitGroup) {
func (pb copyPb) updateBar(fileSize <-chan int64, wg *sync.WaitGroup) {
defer wg.Done()
var fileCount int64 = 0
var fileSizeTotal int64 = 0
Expand All @@ -141,15 +133,67 @@ func (pb progressBar) updateBar(fileSize <-chan int64, wg *sync.WaitGroup) {
fileSizeTotal += size
pb.count.SetTotal(fileCount, false)
pb.fileSize.SetTotal(fileSizeTotal, false)

}

}

func ACL(acl string) func(copier *BucketCopier) {
return func(copier *BucketCopier) {
copier.acl = acl
func (copier *BucketCopier) copy(recursive bool) {
//var logger = zap.S()

if recursive {
if copier.source.Scheme != "s3" {

go walkFiles(copier.source.Path, copier.files, copier.fileCounter)
}

progress := mpb.New(
mpb.WithRefreshRate(1000 * time.Millisecond),
)

copier.bars.count = progress.AddBar(0,
mpb.PrependDecorators(
// simple name decorator
decor.Name("Files", decor.WC{W: 6, C: decor.DSyncWidth}),
decor.CountersNoUnit(" %d / %d", decor.WCSyncWidth),
),
)

copier.bars.fileSize = progress.AddBar(0,
mpb.PrependDecorators(
decor.Name("Size ", decor.WC{W: 6, C: decor.DSyncWidth}),
decor.Counters(decor.UnitKB, "% .1f / % .1f", decor.WCSyncWidth),
),
mpb.AppendDecorators(
decor.Percentage(decor.WCSyncWidth),
decor.Name(" "),
decor.AverageSpeed(decor.UnitKB, "% .1f", decor.WCSyncWidth),
decor.Name(" ETA: "),
decor.AverageETA(decor.ET_STYLE_GO),
),
)

copier.wg.Add(2)

go copier.bars.updateBar(copier.fileCounter, &copier.wg)

go copier.processFiles()

copier.wg.Wait()

progress.Wait()
} else {
//single file copy
info, err := os.Lstat(copier.source.Path)

if err != nil {
if info.IsDir() {
fmt.Println("Can not coppy a directory without --recursive specified on command line")
} else {
copier.copyFile(copier.source.Path)
}
}
}

}

func NewBucketCopier(source string, dest string, threads int, sess *session.Session, template s3manager.UploadInput) (*BucketCopier, error) {
Expand Down Expand Up @@ -186,66 +230,30 @@ func NewBucketCopier(source string, dest string, threads int, sess *session.Sess

template.Bucket = aws.String(destURL.Host)

s3manager.NewUploaderWithClient(svc)

bc := &BucketCopier{
source: *sourceURL,
target: *destURL,
uploadManager: *s3manager.NewUploaderWithClient(svc),
threads: make(semaphore, threads),
files: make(chan fileJob, 10000),
fileCounter: make(chan int64, 10000),
wg: &sync.WaitGroup{},
files: make(chan fileJob, bigChanSize),
fileCounter: make(chan int64, threads*2),
wg: sync.WaitGroup{},
template: template,
}

return bc, nil
}

func (myCopier BucketCopier) copy() {
//var logger = zap.S()

if myCopier.source.Scheme != "s3" {

go walkFiles(myCopier.source.Path, myCopier.files, myCopier.fileCounter)
//Some logic to determin the base path to be used as the prefix for S3. If the source pass ends with a "/" then
//the base of the source path is not used in the S3 prefix as we assume iths the contents of the directory, not
//the actual directory that is needed in the copy
_, splitFile := filepath.Split(bc.source.Path)
includeRoot := 0
if splitFile != "" {
includeRoot = len(splitFile)
}

progress := mpb.New()

myCopier.bars.count = progress.AddBar(0,
mpb.PrependDecorators(
// simple name decorator
decor.Name("Files", decor.WC{W: 6, C: decor.DSyncWidth}),
decor.CountersNoUnit(" %d / %d", decor.WCSyncWidth),
),

mpb.AppendDecorators(
decor.Percentage(decor.WCSyncWidth),
decor.Name(" "),
decor.MovingAverageETA(decor.ET_STYLE_GO, decor.NewMedian(), decor.FixedIntervalTimeNormalizer(5000), decor.WCSyncSpaceR),
),
)

myCopier.bars.fileSize = progress.AddBar(0,
mpb.PrependDecorators(
decor.Name("Size ", decor.WC{W: 6, C: decor.DSyncWidth}),
decor.Counters(decor.UnitKB, "% .1f / % .1f", decor.WCSyncWidth),
),
mpb.AppendDecorators(
decor.Percentage(decor.WCSyncWidth),
decor.Name(" "),
decor.AverageSpeed(decor.UnitKB, "% .1f", decor.WCSyncWidth),
),
)

myCopier.wg.Add(2)

go myCopier.bars.updateBar(myCopier.fileCounter, myCopier.wg)

go myCopier.processFiles()

myCopier.wg.Wait()

progress.Wait()
bc.sourceLength = len(bc.source.Path) - includeRoot
if len(bc.source.Path) == 0 {
bc.sourceLength++

}
return bc, nil
}
Loading

0 comments on commit 02c28ff

Please sign in to comment.