From e86b997b0c40712cedb629d5ef251a4347a88f02 Mon Sep 17 00:00:00 2001 From: Dave Brotherstone Date: Mon, 2 Dec 2024 12:24:09 +0100 Subject: [PATCH] Add parallel uploads This adds a parallel upload flag that enables uploading multiple files in parallel. This is considerably faster for uploading many files. --- main.go | 8 ++- plugin.go | 186 +++++++++++++++++++++++++++++++----------------------- 2 files changed, 113 insertions(+), 81 deletions(-) diff --git a/main.go b/main.go index 91deb7e..bb72011 100644 --- a/main.go +++ b/main.go @@ -151,6 +151,11 @@ func main() { Usage: "OIDC token for assuming role via web identity", EnvVar: "PLUGIN_OIDC_TOKEN_ID", }, + cli.IntFlag{ + Name: "parallel-uploads", + Usage: "number of parallel uploads", + EnvVar: "PLUGIN_PARALLEL_UPLOADS", + }, } if err := app.Run(os.Args); err != nil { @@ -163,7 +168,6 @@ func run(c *cli.Context) error { _ = godotenv.Load(c.String("env-file")) } - plugin := Plugin{ Endpoint: c.String("endpoint"), Key: c.String("access-key"), @@ -189,8 +193,8 @@ func run(c *cli.Context) error { DryRun: c.Bool("dry-run"), ExternalID: c.String("external-id"), IdToken: c.String("oidc-token-id"), + ParallelUploads: c.Int("parallel-uploads"), } return plugin.Exec() } - diff --git a/plugin.go b/plugin.go index a6dd797..af0a9f3 100644 --- a/plugin.go +++ b/plugin.go @@ -7,6 +7,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -102,6 +103,94 @@ type Plugin struct { // set OIDC ID Token to retrieve temporary credentials IdToken string + + ParallelUploads int +} + +func (p *Plugin) uploadFile(client *s3.S3, match string) error { + + target := resolveKey(p.Target, match, p.StripPrefix) + + contentType := matchExtension(match, p.ContentType) + contentEncoding := matchExtension(match, p.ContentEncoding) + cacheControl := matchExtension(match, p.CacheControl) + + if contentType == "" { + contentType = mime.TypeByExtension(filepath.Ext(match)) + + if contentType == "" { + contentType = "application/octet-stream" + } + } + + // log file for debug purposes. + log.WithFields(log.Fields{ + "name": match, + "bucket": p.Bucket, + "target": target, + }).Info("Uploading file") + + // when executing a dry-run we exit because we don't actually want to + // upload the file to S3. + if p.DryRun { + return nil + } + + f, err := os.Open(match) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "file": match, + }).Error("Problem opening file") + return err + } + defer f.Close() + + putObjectInput := &s3.PutObjectInput{ + Body: f, + Bucket: &(p.Bucket), + Key: &target, + } + + if contentType != "" { + putObjectInput.ContentType = aws.String(contentType) + } + + if contentEncoding != "" { + putObjectInput.ContentEncoding = aws.String(contentEncoding) + } + + if cacheControl != "" { + putObjectInput.CacheControl = aws.String(cacheControl) + } + + if p.Encryption != "" { + putObjectInput.ServerSideEncryption = aws.String(p.Encryption) + } + + if p.StorageClass != "" { + putObjectInput.StorageClass = &(p.StorageClass) + } + + if p.Access != "" { + putObjectInput.ACL = &(p.Access) + } + + _, err = client.PutObject(putObjectInput) + + if err != nil { + log.WithFields(log.Fields{ + "name": match, + "bucket": p.Bucket, + "target": target, + "error": err, + }).Error("Could not upload file") + + return err + } + f.Close() + + return nil } // Exec runs the plugin @@ -137,96 +226,35 @@ func (p *Plugin) Exec() error { }).Error("Could not match files") return err } + sem := make(chan bool, max(1, p.ParallelUploads)) + errChan := make(chan error) + var wg sync.WaitGroup for _, match := range matches { + match := match // skip directories if isDir(match, matches) { continue } - - target := resolveKey(p.Target, match, p.StripPrefix) - - contentType := matchExtension(match, p.ContentType) - contentEncoding := matchExtension(match, p.ContentEncoding) - cacheControl := matchExtension(match, p.CacheControl) - - if contentType == "" { - contentType = mime.TypeByExtension(filepath.Ext(match)) - - if contentType == "" { - contentType = "application/octet-stream" - } - } - - // log file for debug purposes. - log.WithFields(log.Fields{ - "name": match, - "bucket": p.Bucket, - "target": target, - }).Info("Uploading file") - - // when executing a dry-run we exit because we don't actually want to - // upload the file to S3. - if p.DryRun { - continue - } - - f, err := os.Open(match) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "file": match, - }).Error("Problem opening file") + select { + case err := <-errChan: return err + case sem <- true: } - defer f.Close() - putObjectInput := &s3.PutObjectInput{ - Body: f, - Bucket: &(p.Bucket), - Key: &target, - } - - if contentType != "" { - putObjectInput.ContentType = aws.String(contentType) - } - - if contentEncoding != "" { - putObjectInput.ContentEncoding = aws.String(contentEncoding) - } - - if cacheControl != "" { - putObjectInput.CacheControl = aws.String(cacheControl) - } - - if p.Encryption != "" { - putObjectInput.ServerSideEncryption = aws.String(p.Encryption) - } - - if p.StorageClass != "" { - putObjectInput.StorageClass = &(p.StorageClass) - } - - if p.Access != "" { - putObjectInput.ACL = &(p.Access) - } - - _, err = client.PutObject(putObjectInput) - - if err != nil { - log.WithFields(log.Fields{ - "name": match, - "bucket": p.Bucket, - "target": target, - "error": err, - }).Error("Could not upload file") - - return err - } - f.Close() + wg.Add(1) + go func() { + defer wg.Done() + defer func() { <-sem }() // release + err := p.uploadFile(client, match) + if err != nil { + errChan <- err + } + }() } - + wg.Wait() return nil + } // matches is a helper function that returns a list of all files matching the