Skip to content

Commit

Permalink
adding google cloud storage support (#3)
Browse files Browse the repository at this point in the history
* adding google cloud storage support

* updating consumer_name and minio folder metric
  • Loading branch information
rayjanoka authored Dec 16, 2022
1 parent a982b47 commit 6835e49
Show file tree
Hide file tree
Showing 18 changed files with 403 additions and 86 deletions.
53 changes: 34 additions & 19 deletions CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,24 @@ src:
useSSL: false
accessKey: xxx
secretKey: yyy
googleCredentials: |
{
"type": "service_account",
"project_id": "xxx",
"private_key_id": "123"
...
}
```

| Flag | Description |
|-------------|----------------------------------------|
| `name` | label name for the file source |
| `endpoint` | endpoint (default: localhost:9000) |
| `useSSL` | enable ssl connection (default: false) |
| `bucket` | bucket name |
| `accessKey` | access key |
| `secretKey` | secret access key |
| Flag | Description |
|---------------------|---------------------------------------------------|
| `name` | label name for the file source |
| `endpoint` | endpoint (default: localhost:9000) |
| `useSSL` | enable ssl connection (default: false) |
| `bucket` | bucket name |
| `accessKey` | aws access key |
| `secretKey` | aws secret access key |
| `googleCredentials` | service account or refresh token JSON credentials |


### Transfer Destination Options
Expand All @@ -127,19 +135,26 @@ dest:
partSize: 16
accessKey: xxx
secretKey: yyy
googleCredentials: |
{
"type": "service_account",
"project_id": "xxx",
"private_key_id": "123"
...
}
```

| Flag | Description |
|-------------|------------------------------------------------|
| `name` | label name for the file source |
| `endpoint` | endpoint (default: localhost:9000) |
| `useSSL` | enable ssl connection (default: false) |
| `bucket` | bucket name |
| `accessKey` | access key |
| `secretKey` | secret access key |
| `threads` | number of transfer threads (default: 4) |
| `partSize` | size of parts for uploads in MiB (default: 16) |

| Flag | Description |
|---------------------|---------------------------------------------------|
| `name` | label name for the file source |
| `endpoint` | endpoint (default: localhost:9000) |
| `useSSL` | enable ssl connection (default: false) |
| `bucket` | bucket name |
| `accessKey` | aws access key |
| `secretKey` | aws secret access key |
| `threads` | number of transfer threads (default: 4) |
| `partSize` | size of parts for uploads in MiB (default: 16) |
| `googleCredentials` | service account or refresh token JSON credentials |

### Health Check Server Options

Expand Down
6 changes: 3 additions & 3 deletions archie/archiver.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package archie

import (
"github.com/minio/minio-go/v7"
"archie/client"
"go.arsenm.dev/pcre"
"sync"
)

type Archiver struct {
DestBucket string
DestClient *minio.Client
DestClient client.Client
DestName string
DestPartSize uint64
DestThreads uint
Expand All @@ -18,7 +18,7 @@ type Archiver struct {
MsgTimeout string
SkipLifecycleExpired bool
SrcBucket string
SrcClient *minio.Client
SrcClient client.Client
SrcName string
WaitGroup *sync.WaitGroup
ExcludePaths struct {
Expand Down
14 changes: 9 additions & 5 deletions archie/copyObject.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package archie

import (
"archie/client"
"context"
"github.com/minio/minio-go/v7"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
"time"
Expand All @@ -28,15 +28,19 @@ func (a *Archiver) copyObject(ctx context.Context, mLog zerolog.Logger, eventObj

// get src object
start := time.Now()
srcObject, err := a.SrcClient.GetObject(ctx, a.SrcBucket, eventObjKey, minio.GetObjectOptions{})
srcObject, err := a.SrcClient.GetObject(ctx, a.SrcBucket, eventObjKey)
if err != nil {
return err, "Failed to GetObject from the source bucket", Nak
}

// get source size, the event's object size wasn't good enough
srcStat, err := srcObject.Stat()
srcStat, err := srcObject.Stat(ctx)
if err != nil {
if err.Error() == "The specified key does not exist." {
// minio error
return err, "Failed to Stat the source object", FiveNakThenTerm
} else if err.Error() == "storage: object doesn't exist" {
// gcs error
return err, "Failed to Stat the source object", FiveNakThenTerm
} else {
return err, "Failed to Stat the source object", Nak
Expand All @@ -50,14 +54,14 @@ func (a *Archiver) copyObject(ctx context.Context, mLog zerolog.Logger, eventObj

// put dest object
destPartSizeBytes := 1024 * 1024 * a.DestPartSize
putOpts := minio.PutObjectOptions{
putOpts := client.PutOptions{
ContentType: srcStat.ContentType,
NumThreads: a.DestThreads,
PartSize: destPartSizeBytes,
}

start = time.Now()
_, err = a.DestClient.PutObject(ctx, a.DestBucket, eventObjKey, srcObject, srcStat.Size, putOpts)
_, err = a.DestClient.PutObject(ctx, a.DestBucket, eventObjKey, srcObject.GetReader(), srcStat.Size, putOpts)
if err != nil {
return err, "Failed to PutObject to the destination bucket", Nak
}
Expand Down
6 changes: 3 additions & 3 deletions archie/healthCheck.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package archie

import (
"archie/client"
"encoding/json"
"fmt"
"github.com/InVisionApp/go-health/v2"
"github.com/InVisionApp/go-health/v2/handlers"
"github.com/minio/minio-go/v7"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"

Expand All @@ -17,8 +17,8 @@ import (
type HealthCheckStatusListener struct{}

type readinessCheck struct {
DestClient *minio.Client
SrcClient *minio.Client
DestClient client.Client
SrcClient client.Client
jetStreamConn *nats.Conn
}

Expand Down
2 changes: 1 addition & 1 deletion archie/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Subsystem: subSystem,
Name: "messages_transfer_rate",
Help: "a histogram of file transfer speed in kbytes/second",
Buckets: []float64{500, 1_000, 5_000, 10_000, 12_000, 15_000, 20_000, 25_000, 30_000, 50_000},
Buckets: []float64{500, 1_000, 5_000, 10_000, 12_000, 15_000, 20_000, 25_000, 30_000, 50_000, 70_000},
})
messagesTransferSizeMetric = promauto.NewHistogram(prometheus.HistogramOpts{
Subsystem: subSystem,
Expand Down
15 changes: 6 additions & 9 deletions archie/removeObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,12 @@ package archie
import (
"archie/event"
"context"
"github.com/minio/minio-go/v7"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
"time"
)

func (a *Archiver) removeObject(
ctx context.Context,
mLog zerolog.Logger,
eventObjKey string,
msg *nats.Msg,
record event.Record,
) (error, string, AckType) {
func (a *Archiver) removeObject(ctx context.Context, mLog zerolog.Logger, eventObjKey string, msg *nats.Msg, record event.Record) (error, string, AckType) {
metadata, _ := msg.Metadata()

if a.SkipLifecycleExpired && record.Source.Host == "Internal: [ILM-EXPIRY]" {
Expand Down Expand Up @@ -47,9 +40,13 @@ func (a *Archiver) removeObject(

start := time.Now()

err := a.DestClient.RemoveObject(ctx, a.DestBucket, eventObjKey, minio.RemoveObjectOptions{})
err := a.DestClient.RemoveObject(ctx, a.DestBucket, eventObjKey)
if err != nil {
if err.Error() == "The specified key does not exist." {
// minio error
return err, "Failed to RemoveObject from destination bucket", FiveNakThenTerm
} else if err.Error() == "storage: object doesn't exist" {
// gcs error
return err, "Failed to RemoveObject from destination bucket", FiveNakThenTerm
} else {
return err, "Failed to RemoveObject from destination bucket", Nak
Expand Down
45 changes: 45 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package client

import (
"context"
"github.com/rs/zerolog"
"io"
)

type Client interface {
EndpointURL() string
GetObject(ctx context.Context, bucket string, key string) (Object, error)
IsOffline() bool
New(ctx context.Context, name, bucket, endpoint string, creds Credentials, useSSL bool, p Params, logLevel zerolog.Level) context.CancelFunc
PutObject(ctx context.Context, bucket string, key string, reader io.Reader, objectSize int64, opts PutOptions) (UploadInfo, error)
RemoveObject(ctx context.Context, bucket string, key string) error
}

type Object interface {
GetReader() io.Reader
Stat(ctx context.Context) (*ObjectInfo, error)
}

type PutOptions struct {
ContentType string
NumThreads uint
PartSize uint64
}

type ObjectInfo struct {
ContentType string
Size int64
}

type UploadInfo struct{}

type Credentials struct {
MinioAccessKey string
MinioSecretAccessKey string
GoogleCredentials string
}

type Params struct {
PartSize uint64
Threads uint
}
129 changes: 129 additions & 0 deletions client/gcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package client

import (
"cloud.google.com/go/storage"
_ "cloud.google.com/go/storage"
"context"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/api/option"
"io"
)

type GCS struct {
client *storage.Client
buffer *[]byte
endpoint string
}

type GCSObject struct {
Bucket string
Path string
Reader io.Reader
Client *storage.Client
}

func (g *GCS) New(ctx context.Context, name, bucket, endpoint string, creds Credentials, useSSL bool, p Params, logLevel zerolog.Level) context.CancelFunc {
var clientOptions []option.ClientOption

if endpoint != "" {
clientOptions = append(clientOptions, option.WithEndpoint(endpoint))
g.endpoint = endpoint
}

if creds.GoogleCredentials != "" {
clientOptions = append(clientOptions, option.WithCredentialsJSON([]byte(creds.GoogleCredentials)))
}

client, err := storage.NewClient(ctx, clientOptions...)
if err != nil {
log.Fatal().Err(err).Msgf("Failed to setup %s client", name)
}

// TODO: turn on debug in gcs client
//if logLevel == zerolog.TraceLevel

if p == (Params{}) {
log.Info().Msgf("Setup %s client to %s", name, "GCS")
} else {
log.Info().Msgf("Setup %s client to %s with %d threads and %dMB part size", name, "GCS", p.Threads, p.PartSize)
}

b := client.Bucket(bucket)
_, err = b.Attrs(ctx)
if err != nil {
if err.Error() == "storage: bucket doesn't exist" {
log.Fatal().Msgf("%s bucket %s does not exist or access is missing", name, bucket)
} else {
log.Fatal().Err(err).Msgf("Failed to check if %s bucket %s exists", name, bucket)
}
}

// the gcs client doesn't offer a health-check
_, healthCheckCancel := context.WithCancel(ctx)

g.client = client

buffer := make([]byte, 100*1024*1024) // 100 MB
g.buffer = &buffer

return healthCheckCancel
}

func (g *GCS) GetObject(ctx context.Context, bucket string, key string) (Object, error) {
obj, err := g.client.Bucket(bucket).Object(key).NewReader(ctx)
if err != nil {
return nil, err
}
var mo Object = &GCSObject{Bucket: bucket, Path: key, Reader: obj, Client: g.client}
return mo, nil
}

func (g *GCS) PutObject(ctx context.Context, bucket string, key string, reader io.Reader, objectSize int64, opts PutOptions) (UploadInfo, error) {
writer := g.client.Bucket(bucket).Object(key).NewWriter(ctx)
writer.ChunkSize = int(opts.PartSize)
writer.ContentType = opts.ContentType
writer.Size = objectSize

_, err := io.CopyBuffer(writer, reader, *g.buffer)
if err != nil {
return UploadInfo{}, err
}

err = writer.Close()
if err != nil {
return UploadInfo{}, err
}

return UploadInfo{}, nil
}

func (g *GCS) RemoveObject(ctx context.Context, bucket string, key string) error {
if err := g.client.Bucket(bucket).Object(key).Delete(ctx); err != nil {
if err != nil {
return err
}
}
return nil
}

func (g *GCS) IsOffline() bool {
// the gcs library doesn't offer a health-check
return false
}

func (g *GCS) EndpointURL() string {
return g.endpoint
}

func (o *GCSObject) Stat(ctx context.Context) (*ObjectInfo, error) {
obj, err := o.Client.Bucket(o.Bucket).Object(o.Path).Attrs(ctx)
if err != nil {
return nil, err
}
return &ObjectInfo{Size: obj.Size, ContentType: obj.ContentType}, nil
}

func (o *GCSObject) GetReader() io.Reader {
return o.Reader
}
Loading

0 comments on commit 6835e49

Please sign in to comment.