diff --git a/CONFIGURE.md b/CONFIGURE.md index a8559c0..33feaab 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -103,16 +103,24 @@ src: useSSL: false accessKey: xxx secretKey: yyy + googleCredentials: | + { + "type": "service_account", + "project_id": "xxx", + "private_key_id": "123" + ... + } ``` -| Flag | Description | -|-------------|----------------------------------------| -| `name` | label name for the file source | -| `endpoint` | endpoint (default: localhost:9000) | -| `useSSL` | enable ssl connection (default: false) | -| `bucket` | bucket name | -| `accessKey` | access key | -| `secretKey` | secret access key | +| Flag | Description | +|---------------------|---------------------------------------------------| +| `name` | label name for the file source | +| `endpoint` | endpoint (default: localhost:9000) | +| `useSSL` | enable ssl connection (default: false) | +| `bucket` | bucket name | +| `accessKey` | aws access key | +| `secretKey` | aws secret access key | +| `googleCredentials` | service account or refresh token JSON credentials | ### Transfer Destination Options @@ -127,19 +135,26 @@ dest: partSize: 16 accessKey: xxx secretKey: yyy + googleCredentials: | + { + "type": "service_account", + "project_id": "xxx", + "private_key_id": "123" + ... + } ``` -| Flag | Description | -|-------------|------------------------------------------------| -| `name` | label name for the file source | -| `endpoint` | endpoint (default: localhost:9000) | -| `useSSL` | enable ssl connection (default: false) | -| `bucket` | bucket name | -| `accessKey` | access key | -| `secretKey` | secret access key | -| `threads` | number of transfer threads (default: 4) | -| `partSize` | size of parts for uploads in MiB (default: 16) | - +| Flag | Description | +|---------------------|---------------------------------------------------| +| `name` | label name for the file source | +| `endpoint` | endpoint (default: localhost:9000) | +| `useSSL` | enable ssl connection (default: false) | +| `bucket` | bucket name | +| `accessKey` | aws access key | +| `secretKey` | aws secret access key | +| `threads` | number of transfer threads (default: 4) | +| `partSize` | size of parts for uploads in MiB (default: 16) | +| `googleCredentials` | service account or refresh token JSON credentials | ### Health Check Server Options diff --git a/archie/archiver.go b/archie/archiver.go index 6aede40..6b77f42 100644 --- a/archie/archiver.go +++ b/archie/archiver.go @@ -1,14 +1,14 @@ package archie import ( - "github.com/minio/minio-go/v7" + "archie/client" "go.arsenm.dev/pcre" "sync" ) type Archiver struct { DestBucket string - DestClient *minio.Client + DestClient client.Client DestName string DestPartSize uint64 DestThreads uint @@ -18,7 +18,7 @@ type Archiver struct { MsgTimeout string SkipLifecycleExpired bool SrcBucket string - SrcClient *minio.Client + SrcClient client.Client SrcName string WaitGroup *sync.WaitGroup ExcludePaths struct { diff --git a/archie/copyObject.go b/archie/copyObject.go index d0d444a..f390570 100644 --- a/archie/copyObject.go +++ b/archie/copyObject.go @@ -1,8 +1,8 @@ package archie import ( + "archie/client" "context" - "github.com/minio/minio-go/v7" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "time" @@ -28,15 +28,19 @@ func (a *Archiver) copyObject(ctx context.Context, mLog zerolog.Logger, eventObj // get src object start := time.Now() - srcObject, err := a.SrcClient.GetObject(ctx, a.SrcBucket, eventObjKey, minio.GetObjectOptions{}) + srcObject, err := a.SrcClient.GetObject(ctx, a.SrcBucket, eventObjKey) if err != nil { return err, "Failed to GetObject from the source bucket", Nak } // get source size, the event's object size wasn't good enough - srcStat, err := srcObject.Stat() + srcStat, err := srcObject.Stat(ctx) if err != nil { if err.Error() == "The specified key does not exist." { + // minio error + return err, "Failed to Stat the source object", FiveNakThenTerm + } else if err.Error() == "storage: object doesn't exist" { + // gcs error return err, "Failed to Stat the source object", FiveNakThenTerm } else { return err, "Failed to Stat the source object", Nak @@ -50,14 +54,14 @@ func (a *Archiver) copyObject(ctx context.Context, mLog zerolog.Logger, eventObj // put dest object destPartSizeBytes := 1024 * 1024 * a.DestPartSize - putOpts := minio.PutObjectOptions{ + putOpts := client.PutOptions{ ContentType: srcStat.ContentType, NumThreads: a.DestThreads, PartSize: destPartSizeBytes, } start = time.Now() - _, err = a.DestClient.PutObject(ctx, a.DestBucket, eventObjKey, srcObject, srcStat.Size, putOpts) + _, err = a.DestClient.PutObject(ctx, a.DestBucket, eventObjKey, srcObject.GetReader(), srcStat.Size, putOpts) if err != nil { return err, "Failed to PutObject to the destination bucket", Nak } diff --git a/archie/healthCheck.go b/archie/healthCheck.go index 210212f..eaea455 100644 --- a/archie/healthCheck.go +++ b/archie/healthCheck.go @@ -1,11 +1,11 @@ package archie import ( + "archie/client" "encoding/json" "fmt" "github.com/InVisionApp/go-health/v2" "github.com/InVisionApp/go-health/v2/handlers" - "github.com/minio/minio-go/v7" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" @@ -17,8 +17,8 @@ import ( type HealthCheckStatusListener struct{} type readinessCheck struct { - DestClient *minio.Client - SrcClient *minio.Client + DestClient client.Client + SrcClient client.Client jetStreamConn *nats.Conn } diff --git a/archie/metrics.go b/archie/metrics.go index 6a5fe64..d5fadc1 100644 --- a/archie/metrics.go +++ b/archie/metrics.go @@ -32,7 +32,7 @@ var ( Subsystem: subSystem, Name: "messages_transfer_rate", Help: "a histogram of file transfer speed in kbytes/second", - Buckets: []float64{500, 1_000, 5_000, 10_000, 12_000, 15_000, 20_000, 25_000, 30_000, 50_000}, + Buckets: []float64{500, 1_000, 5_000, 10_000, 12_000, 15_000, 20_000, 25_000, 30_000, 50_000, 70_000}, }) messagesTransferSizeMetric = promauto.NewHistogram(prometheus.HistogramOpts{ Subsystem: subSystem, diff --git a/archie/removeObject.go b/archie/removeObject.go index 3166bbf..93e0192 100644 --- a/archie/removeObject.go +++ b/archie/removeObject.go @@ -3,19 +3,12 @@ package archie import ( "archie/event" "context" - "github.com/minio/minio-go/v7" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "time" ) -func (a *Archiver) removeObject( - ctx context.Context, - mLog zerolog.Logger, - eventObjKey string, - msg *nats.Msg, - record event.Record, -) (error, string, AckType) { +func (a *Archiver) removeObject(ctx context.Context, mLog zerolog.Logger, eventObjKey string, msg *nats.Msg, record event.Record) (error, string, AckType) { metadata, _ := msg.Metadata() if a.SkipLifecycleExpired && record.Source.Host == "Internal: [ILM-EXPIRY]" { @@ -47,9 +40,13 @@ func (a *Archiver) removeObject( start := time.Now() - err := a.DestClient.RemoveObject(ctx, a.DestBucket, eventObjKey, minio.RemoveObjectOptions{}) + err := a.DestClient.RemoveObject(ctx, a.DestBucket, eventObjKey) if err != nil { if err.Error() == "The specified key does not exist." { + // minio error + return err, "Failed to RemoveObject from destination bucket", FiveNakThenTerm + } else if err.Error() == "storage: object doesn't exist" { + // gcs error return err, "Failed to RemoveObject from destination bucket", FiveNakThenTerm } else { return err, "Failed to RemoveObject from destination bucket", Nak diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..333f166 --- /dev/null +++ b/client/client.go @@ -0,0 +1,45 @@ +package client + +import ( + "context" + "github.com/rs/zerolog" + "io" +) + +type Client interface { + EndpointURL() string + GetObject(ctx context.Context, bucket string, key string) (Object, error) + IsOffline() bool + New(ctx context.Context, name, bucket, endpoint string, creds Credentials, useSSL bool, p Params, logLevel zerolog.Level) context.CancelFunc + PutObject(ctx context.Context, bucket string, key string, reader io.Reader, objectSize int64, opts PutOptions) (UploadInfo, error) + RemoveObject(ctx context.Context, bucket string, key string) error +} + +type Object interface { + GetReader() io.Reader + Stat(ctx context.Context) (*ObjectInfo, error) +} + +type PutOptions struct { + ContentType string + NumThreads uint + PartSize uint64 +} + +type ObjectInfo struct { + ContentType string + Size int64 +} + +type UploadInfo struct{} + +type Credentials struct { + MinioAccessKey string + MinioSecretAccessKey string + GoogleCredentials string +} + +type Params struct { + PartSize uint64 + Threads uint +} diff --git a/client/gcs.go b/client/gcs.go new file mode 100644 index 0000000..c9a03fb --- /dev/null +++ b/client/gcs.go @@ -0,0 +1,129 @@ +package client + +import ( + "cloud.google.com/go/storage" + _ "cloud.google.com/go/storage" + "context" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "google.golang.org/api/option" + "io" +) + +type GCS struct { + client *storage.Client + buffer *[]byte + endpoint string +} + +type GCSObject struct { + Bucket string + Path string + Reader io.Reader + Client *storage.Client +} + +func (g *GCS) New(ctx context.Context, name, bucket, endpoint string, creds Credentials, useSSL bool, p Params, logLevel zerolog.Level) context.CancelFunc { + var clientOptions []option.ClientOption + + if endpoint != "" { + clientOptions = append(clientOptions, option.WithEndpoint(endpoint)) + g.endpoint = endpoint + } + + if creds.GoogleCredentials != "" { + clientOptions = append(clientOptions, option.WithCredentialsJSON([]byte(creds.GoogleCredentials))) + } + + client, err := storage.NewClient(ctx, clientOptions...) + if err != nil { + log.Fatal().Err(err).Msgf("Failed to setup %s client", name) + } + + // TODO: turn on debug in gcs client + //if logLevel == zerolog.TraceLevel + + if p == (Params{}) { + log.Info().Msgf("Setup %s client to %s", name, "GCS") + } else { + log.Info().Msgf("Setup %s client to %s with %d threads and %dMB part size", name, "GCS", p.Threads, p.PartSize) + } + + b := client.Bucket(bucket) + _, err = b.Attrs(ctx) + if err != nil { + if err.Error() == "storage: bucket doesn't exist" { + log.Fatal().Msgf("%s bucket %s does not exist or access is missing", name, bucket) + } else { + log.Fatal().Err(err).Msgf("Failed to check if %s bucket %s exists", name, bucket) + } + } + + // the gcs client doesn't offer a health-check + _, healthCheckCancel := context.WithCancel(ctx) + + g.client = client + + buffer := make([]byte, 100*1024*1024) // 100 MB + g.buffer = &buffer + + return healthCheckCancel +} + +func (g *GCS) GetObject(ctx context.Context, bucket string, key string) (Object, error) { + obj, err := g.client.Bucket(bucket).Object(key).NewReader(ctx) + if err != nil { + return nil, err + } + var mo Object = &GCSObject{Bucket: bucket, Path: key, Reader: obj, Client: g.client} + return mo, nil +} + +func (g *GCS) PutObject(ctx context.Context, bucket string, key string, reader io.Reader, objectSize int64, opts PutOptions) (UploadInfo, error) { + writer := g.client.Bucket(bucket).Object(key).NewWriter(ctx) + writer.ChunkSize = int(opts.PartSize) + writer.ContentType = opts.ContentType + writer.Size = objectSize + + _, err := io.CopyBuffer(writer, reader, *g.buffer) + if err != nil { + return UploadInfo{}, err + } + + err = writer.Close() + if err != nil { + return UploadInfo{}, err + } + + return UploadInfo{}, nil +} + +func (g *GCS) RemoveObject(ctx context.Context, bucket string, key string) error { + if err := g.client.Bucket(bucket).Object(key).Delete(ctx); err != nil { + if err != nil { + return err + } + } + return nil +} + +func (g *GCS) IsOffline() bool { + // the gcs library doesn't offer a health-check + return false +} + +func (g *GCS) EndpointURL() string { + return g.endpoint +} + +func (o *GCSObject) Stat(ctx context.Context) (*ObjectInfo, error) { + obj, err := o.Client.Bucket(o.Bucket).Object(o.Path).Attrs(ctx) + if err != nil { + return nil, err + } + return &ObjectInfo{Size: obj.Size, ContentType: obj.ContentType}, nil +} + +func (o *GCSObject) GetReader() io.Reader { + return o.Reader +} diff --git a/client/jetstream.go b/client/jetstream.go index 9b0516a..d7aebc0 100644 --- a/client/jetstream.go +++ b/client/jetstream.go @@ -13,7 +13,9 @@ func JetStream( msgTimeout, streamRetention, streamRepublishSubject string, provisioningDisabled bool, ) (*nats.Subscription, *nats.Conn) { - var connectOptions []nats.Option + // reconnect forever + connectOptions := []nats.Option{nats.MaxReconnects(-1)} + if rootCA != "" { connectOptions = append(connectOptions, nats.RootCAs(rootCA)) } @@ -34,14 +36,15 @@ func JetStream( } accountInfo, err := jetStream.AccountInfo() + if err != nil { + log.Fatal().Err(err).Msg("Failed to get JetStream account info") + } + log.Info().Uint64("memory", accountInfo.Tier.Memory). Uint64("storage", accountInfo.Tier.Store). Int("streams", accountInfo.Tier.Streams). Int("consumers", accountInfo.Tier.Consumers). Msg("JetStream account info") - if err != nil { - log.Fatal().Err(err).Msg("Failed to get JetStream account info") - } if !provisioningDisabled { // build the stream diff --git a/client/minio.go b/client/minio.go index a3b8f22..acc426f 100644 --- a/client/minio.go +++ b/client/minio.go @@ -6,27 +6,27 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "io" "os" "time" ) -type Params struct { - Threads uint - PartSize uint64 +type Minio struct { + client *minio.Client } -func MinIO( - ctx context.Context, - name, bucket, endpoint, accessKey, secretAccessKey string, - useSSL bool, - p Params, - logLevel zerolog.Level, -) (*minio.Client, context.CancelFunc) { +type MinioObject struct { + Bucket string + Path string + Reader io.Reader +} + +func (m *Minio) New(ctx context.Context, name, bucket, endpoint string, creds Credentials, useSSL bool, p Params, logLevel zerolog.Level) context.CancelFunc { //minio.MaxRetry = 0 client, err := minio.New(endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""), + Creds: credentials.NewStaticV4(creds.MinioAccessKey, creds.MinioSecretAccessKey, ""), Secure: useSSL, }) if err != nil { @@ -58,5 +58,58 @@ func MinIO( log.Fatal().Msgf("Failed to start %s client health check", name) } - return client, healthCheckCancel + m.client = client + + return healthCheckCancel +} + +func (m *Minio) GetObject(ctx context.Context, bucket string, key string) (Object, error) { + obj, err := m.client.GetObject(ctx, bucket, key, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + var mo Object = &MinioObject{Bucket: bucket, Path: key, Reader: obj} + return mo, nil +} + +func (m *Minio) PutObject(ctx context.Context, bucket string, key string, reader io.Reader, objectSize int64, opts PutOptions) (UploadInfo, error) { + putOpts := minio.PutObjectOptions{ + ContentType: opts.ContentType, + NumThreads: opts.NumThreads, + PartSize: opts.PartSize, + } + + _, err := m.client.PutObject(ctx, bucket, key, reader, objectSize, putOpts) + if err != nil { + return UploadInfo{}, err + } + return UploadInfo{}, nil +} + +func (m *Minio) RemoveObject(ctx context.Context, bucket string, key string) error { + err := m.client.RemoveObject(ctx, bucket, key, minio.RemoveObjectOptions{}) + if err != nil { + return err + } + return nil +} + +func (m *Minio) IsOffline() bool { + return m.client.IsOffline() +} + +func (m *Minio) EndpointURL() string { + return m.client.EndpointURL().String() +} + +func (o *MinioObject) Stat(ctx context.Context) (*ObjectInfo, error) { + srcStat, err := o.Reader.(*minio.Object).Stat() + if err != nil { + return nil, err + } + return &ObjectInfo{Size: srcStat.Size, ContentType: srcStat.ContentType}, nil +} + +func (o *MinioObject) GetReader() io.Reader { + return o.Reader } diff --git a/config.go b/config.go index 900e073..8375bcc 100644 --- a/config.go +++ b/config.go @@ -9,23 +9,25 @@ type Config struct { SkipLifecycleExpired bool `fig:"skipLifecycleExpired"` Src struct { - AccessKey string `fig:"accessKey"` - Bucket string `fig:"bucket"` - Endpoint string `fig:"endpoint" default:"localhost:9000"` - Name string `fig:"name" default:"destination"` - SecretKey string `fig:"secretKey"` - UseSSL bool `fig:"useSSL"` + AccessKey string `fig:"accessKey"` + Bucket string `fig:"bucket"` + Endpoint string `fig:"endpoint" default:"localhost:9000"` + GoogleCredentials string `fig:"googleCredentials"` + Name string `fig:"name" default:"destination"` + SecretKey string `fig:"secretKey"` + UseSSL bool `fig:"useSSL"` } Dest struct { - AccessKey string `fig:"accessKey"` - Bucket string `fig:"bucket"` - Endpoint string `fig:"endpoint" default:"localhost:9000"` - Name string `fig:"name" default:"source"` - PartSize uint64 `fig:"partSize" default:"16"` - SecretKey string `fig:"secretKey"` - Threads uint `fig:"threads" default:"4"` - UseSSL bool `fig:"useSSL"` + AccessKey string `fig:"accessKey"` + Bucket string `fig:"bucket"` + Endpoint string `fig:"endpoint" default:"localhost:9000"` + GoogleCredentials string `fig:"googleCredentials"` + Name string `fig:"name" default:"source"` + PartSize uint64 `fig:"partSize" default:"16"` + SecretKey string `fig:"secretKey"` + Threads uint `fig:"threads" default:"4"` + UseSSL bool `fig:"useSSL"` } ExcludePaths struct { diff --git a/go.mod b/go.mod index e7b4197..2582951 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module archie go 1.19 require ( + cloud.google.com/go/storage v1.10.0 github.com/InVisionApp/go-health/v2 v2.1.2 github.com/kkyr/fig v0.3.0 github.com/minio/minio-go/v7 v7.0.39 @@ -11,16 +12,21 @@ require ( github.com/rs/zerolog v1.28.0 go.arsenm.dev/pcre v0.0.0-20220530205550-74594f6c8b0e golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 + google.golang.org/api v0.30.0 ) require ( + cloud.google.com/go v0.65.0 // indirect github.com/InVisionApp/go-logger v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dustin/go-humanize v1.0.0 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/googleapis/gax-go/v2 v2.0.5 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/jstemmer/go-junit-report v0.9.1 // indirect github.com/klauspost/compress v1.15.11 // indirect github.com/klauspost/cpuid/v2 v2.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect @@ -41,10 +47,18 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rs/xid v1.4.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect + go.opencensus.io v0.22.4 // indirect golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b // indirect + golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20221004154528-8021a29435af // indirect + golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/tools v0.1.12 // indirect + google.golang.org/appengine v1.6.6 // indirect + google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect + google.golang.org/grpc v1.31.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 78480f4..f807ca3 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bP cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk= cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= +cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= @@ -29,6 +30,7 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= +cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -92,6 +94,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -132,7 +135,9 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/google/martian/v3 v3.0.0 h1:pMen7vLs8nvgEYhywH3KDWJIJTeEr2ULsVWHWYHQyBs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -145,6 +150,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= +github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -158,6 +164,7 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= +github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -284,6 +291,7 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -317,6 +325,7 @@ golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= @@ -326,6 +335,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -368,6 +379,7 @@ golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b h1:clP8eMhB30EHdc0bd2Twtq6kgU7yl5ub2cQLSdrv1Dg= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -484,6 +496,8 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -503,12 +517,14 @@ google.golang.org/api v0.22.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= +google.golang.org/api v0.30.0 h1:yfrXXP61wVuLb0vBcG6qaOoIoqYEzOQS8jum51jkv2w= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -538,6 +554,7 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 h1:PDIOdWxZ8eRizhKa1AAvY53xsvLB1cWorMjslvY3VA8= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -550,6 +567,7 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.31.0 h1:T7P4R73V3SSDPhH7WW7ATbfViLtmamH0DKrP3f9AuDI= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/helm/archie/Chart.yaml b/helm/archie/Chart.yaml index a130177..06a0fd3 100644 --- a/helm/archie/Chart.yaml +++ b/helm/archie/Chart.yaml @@ -7,10 +7,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.2.1 +version: 0.3.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.2.0" +appVersion: "0.3.0" diff --git a/helm/archie/templates/prometheusrules.yaml b/helm/archie/templates/prometheusrules.yaml index 5af890e..72773f5 100644 --- a/helm/archie/templates/prometheusrules.yaml +++ b/helm/archie/templates/prometheusrules.yaml @@ -27,7 +27,7 @@ spec: - name: minio.rules rules: - alert: ArchieMinioNotifyQueueFilesTooHigh - expr: sum(minio_notify_nats_queue_count) by (job, directory) > {{ .Values.archie.prometheusRules.rules.minioNotifyQueueFilesThreshold }} + expr: sum(minio_notify_nats_queue_count{id="{{ .Values.archie.env }}"}) by (job, id) > {{ .Values.archie.prometheusRules.rules.minioNotifyQueueFilesThreshold }} for: 1m labels: severity: warning @@ -40,7 +40,7 @@ spec: rules: - alert: ArchieNatsConsumerPendingMessagesTooHigh expr: | - nats_consumer_num_pending{stream_name="archie-stream",is_consumer_leader="true",consumer_name="b2",namespace="archie"} > + nats_consumer_num_pending{stream_name="archie-stream",is_consumer_leader="true",consumer_name="{{ .Values.jetstream.consumer.name }}",namespace="archie"} > {{ .Values.archie.prometheusRules.rules.natsMessagesPendingThreshold }} for: 1m labels: @@ -52,7 +52,7 @@ spec: - alert: ArchieNatsConsumerAckPendingMessagesTooHigh expr: | - nats_consumer_num_ack_pending{stream_name="archie-stream",is_consumer_leader="true",consumer_name="b2",namespace="archie"} > + nats_consumer_num_ack_pending{stream_name="archie-stream",is_consumer_leader="true",consumer_name="{{ .Values.jetstream.consumer.name }}",namespace="archie"} > {{ .Values.archie.prometheusRules.rules.natsMessagesAckPendingThreshold }} for: 1m labels: @@ -64,8 +64,8 @@ spec: - alert: ArchieNatsConsumerRedeliveredMessagePercentageTooHigh expr: | - rate(nats_consumer_num_redelivered{stream_name="archie-stream",is_consumer_leader="true",consumer_name="b2",namespace="archie"}[1m]) / - rate(nats_consumer_delivered_stream_seq{stream_name="archie-stream",is_consumer_leader="true",consumer_name="b2",namespace="archie"}[1m]) > + rate(nats_consumer_num_redelivered{stream_name="archie-stream",is_consumer_leader="true",consumer_name="{{ .Values.jetstream.consumer.name }}",namespace="archie"}[1m]) / + rate(nats_consumer_delivered_stream_seq{stream_name="archie-stream",is_consumer_leader="true",consumer_name="{{ .Values.jetstream.consumer.name }}",namespace="archie"}[1m]) > {{ .Values.archie.prometheusRules.rules.natsMessagesRedeliveredPercentageThreshold }} for: 1m labels: @@ -77,7 +77,7 @@ spec: - alert: ArchieNatsConsumerDeliveredMessagesTooLow expr: | - sum(increase(nats_consumer_delivered_consumer_seq{stream_name="archie-stream",is_consumer_leader="true",consumer_name="b2",namespace="archie"}[1m])) + sum(increase(nats_consumer_delivered_consumer_seq{stream_name="archie-stream",is_consumer_leader="true",consumer_name="{{ .Values.jetstream.consumer.name }}",namespace="archie"}[1m])) by (account, cluster, consumer_name, is_consumer_leader, is_stream_leader, stream_name) < {{ .Values.archie.prometheusRules.rules.natsMessagesDeliveredLowThreshold }} for: 10m diff --git a/helm/archie/templates/secret.yaml b/helm/archie/templates/secret.yaml index 146a366..474d108 100644 --- a/helm/archie/templates/secret.yaml +++ b/helm/archie/templates/secret.yaml @@ -42,6 +42,11 @@ stringData: secretKey: {{ .Values.source.secretKey }} {{- end }} + {{- if .Values.source.googleCredentials }} + googleCredentials: | + {{ .Values.source.googleCredentials | indent 8 }} + {{- end }} + dest: name: {{ .Values.destination.name }} bucket: {{ .Values.destination.bucket }} @@ -58,6 +63,11 @@ stringData: secretKey: {{ .Values.destination.secretKey }} {{- end }} + {{- if .Values.destination.googleCredentials }} + googleCredentials: | + {{ .Values.destination.googleCredentials | nindent 8 }} + {{- end }} + jetstream: provisioningDisabled: {{ .Values.jetstream.provisioningDisabled }} url: {{ .Values.jetstream.url }} diff --git a/helm/archie/values.yaml b/helm/archie/values.yaml index 5bcc05e..a0f05e8 100644 --- a/helm/archie/values.yaml +++ b/helm/archie/values.yaml @@ -21,6 +21,7 @@ archie: # disable the deployment deployment: create: true + env: dev healthCheck: enabled: true port: 8080 @@ -110,20 +111,22 @@ keda: source: name: src # just a label bucket: src-test - endpoint: localhost + endpoint: "" useSSL: true # accessKey: # secretKey: +# googleCredentials: destination: name: dest # just a label bucket: dest-test - endpoint: localhost + endpoint: "" useSSL: true threads: 4 partSize: 16 # MiB # accessKey: # secretKey: +# googleCredentials: imagePullSecrets: [] nameOverride: "" diff --git a/main.go b/main.go index bf8b5ab..b7582d8 100644 --- a/main.go +++ b/main.go @@ -113,31 +113,53 @@ func main() { var srcHealthCheckCancel, destHealthCheckCancel context.CancelFunc // source - a.SrcClient, srcHealthCheckCancel = client.MinIO( + var c client.Client + if cfg.Src.GoogleCredentials != "" { + c = &client.GCS{} + } else { + c = &client.Minio{} + } + + srcHealthCheckCancel = c.New( baseCtx, cfg.Src.Name, cfg.Src.Bucket, cfg.Src.Endpoint, - cfg.Src.AccessKey, - cfg.Src.SecretKey, + client.Credentials{ + MinioSecretAccessKey: cfg.Src.SecretKey, + MinioAccessKey: cfg.Src.AccessKey, + GoogleCredentials: cfg.Src.GoogleCredentials, + }, cfg.Src.UseSSL, client.Params{}, zerolog.GlobalLevel(), ) + a.SrcClient = c + defer func() { log.Trace().Msg("Deferred source health check context canceled") srcHealthCheckCancel() }() // destination - a.DestClient, destHealthCheckCancel = client.MinIO( + var d client.Client + if cfg.Dest.GoogleCredentials != "" { + d = &client.GCS{} + } else { + d = &client.Minio{} + } + + destHealthCheckCancel = d.New( baseCtx, cfg.Dest.Name, cfg.Dest.Bucket, cfg.Dest.Endpoint, - cfg.Dest.AccessKey, - cfg.Dest.SecretKey, + client.Credentials{ + MinioSecretAccessKey: cfg.Dest.SecretKey, + MinioAccessKey: cfg.Dest.AccessKey, + GoogleCredentials: cfg.Dest.GoogleCredentials, + }, cfg.Dest.UseSSL, client.Params{ Threads: cfg.Dest.Threads, @@ -146,6 +168,8 @@ func main() { zerolog.GlobalLevel(), ) + a.DestClient = d + defer func() { log.Trace().Msg("Deferred destination health check context canceled") destHealthCheckCancel()