Skip to content

Commit

Permalink
converting to config.yaml and adding excluded paths feature
Browse files Browse the repository at this point in the history
  • Loading branch information
rayjanoka committed Oct 24, 2022
1 parent 137cf1c commit 5f2dc83
Show file tree
Hide file tree
Showing 15 changed files with 473 additions and 241 deletions.
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ builds:
goos:
- linux
- darwin
- windows
# - windows

goarch:
- amd64
Expand Down
7 changes: 6 additions & 1 deletion archie/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package archie

import (
"github.com/minio/minio-go/v7"
"go.arsenm.dev/pcre"
"sync"
)

Expand All @@ -12,14 +13,18 @@ type Archiver struct {
DestPartSize uint64
DestThreads uint
FetchDone chan string
HealthCheckEnabled bool
HealthCheckDisabled bool
IsOffline bool
MsgTimeout string
SkipLifecycleExpired bool
SrcBucket string
SrcClient *minio.Client
SrcName string
WaitGroup *sync.WaitGroup
ExcludePaths struct {
CopyObject []*pcre.Regexp
RemoveObject []*pcre.Regexp
}
}

type AckType int
Expand Down
15 changes: 15 additions & 0 deletions archie/copyObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ import (
func (a *Archiver) copyObject(ctx context.Context, mLog zerolog.Logger, eventObjKey string, msg *nats.Msg) (error, string, AckType) {
metadata, _ := msg.Metadata()

for _, excludedPathRegexp := range a.ExcludePaths.CopyObject {
if excludedPathRegexp.MatchString(eventObjKey) {
mLog.Info().
Uint64("numDelivered", metadata.NumDelivered).
Str("queueDuration", time.Now().Sub(metadata.Timestamp).String()).
Str("pattern", excludedPathRegexp.String()).
Msg("Excluded path match, copy event skipped")

a.observeMessagesTransferNumDeliveredMetric(float64(metadata.NumDelivered))
a.observeMessagesTransferQueueDurationMetric(time.Now().Sub(metadata.Timestamp).Seconds())

return nil, "EXCLUDED_PATH", SkipAck
}
}

// get src object
start := time.Now()
srcObject, err := a.SrcClient.GetObject(ctx, a.SrcBucket, eventObjKey, minio.GetObjectOptions{})
Expand Down
2 changes: 1 addition & 1 deletion archie/healthCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type readinessCheck struct {
type livenessCheck struct{}

func (a *Archiver) StartHealthCheckServer(healthCheckPort int, jetStreamConn *nats.Conn) *http.Server {
if !a.HealthCheckEnabled {
if a.HealthCheckDisabled {
return nil
}

Expand Down
14 changes: 6 additions & 8 deletions archie/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,19 @@ func (a *Archiver) message(ctx context.Context, msg *nats.Msg) {
Msg("Message received")

var ack AckType
var s3ErrMsg, s3ErrCode string
var s3ErrMsg, s3ErrCode, execContext string

// message type router
switch eventType {
case "s3:ObjectCreated":
var errContext string
err, errContext, ack = a.copyObject(ctx, mLog, eventObjKey, msg)
err, execContext, ack = a.copyObject(ctx, mLog, eventObjKey, msg)
if err != nil {
s3ErrMsg, s3ErrCode = logS3Error(err, errContext, &mLog)
s3ErrMsg, s3ErrCode = logS3Error(err, execContext, &mLog)
}
case "s3:ObjectRemoved":
var errContext string
err, errContext, ack = a.removeObject(ctx, mLog, eventObjKey, msg, eventRecord)
err, execContext, ack = a.removeObject(ctx, mLog, eventObjKey, msg, eventRecord)
if err != nil {
s3ErrMsg, s3ErrCode = logS3Error(err, errContext, &mLog)
s3ErrMsg, s3ErrCode = logS3Error(err, execContext, &mLog)
}
default:
mLog.Error().Msgf("Unable to process the %s event type", event.EventName, eventType)
Expand All @@ -131,7 +129,7 @@ func (a *Archiver) message(ctx context.Context, msg *nats.Msg) {
// logging already happened
continue
}
a.cleanupAndCountMessagesProcessedMetric("skipped", "", "", event.EventName, eventType)
a.cleanupAndCountMessagesProcessedMetric("skipped", "", execContext, event.EventName, eventType)
case Nak:
sendNakSignal(msg, &mLog)
a.cleanupAndCountMessagesProcessedMetric("failed", s3ErrMsg, s3ErrCode, event.EventName, eventType)
Expand Down
17 changes: 16 additions & 1 deletion archie/removeObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,22 @@ func (a *Archiver) removeObject(
a.observeMessagesDeleteNumDeliveredMetric(float64(metadata.NumDelivered))
a.observeMessagesDeleteQueueDurationMetric(time.Now().Sub(metadata.Timestamp).Seconds())

return nil, "", SkipAck
return nil, "ILM_EXPIRY", SkipAck
}

for _, excludedPathRegexp := range a.ExcludePaths.RemoveObject {
if excludedPathRegexp.MatchString(eventObjKey) {
mLog.Info().
Uint64("numDelivered", metadata.NumDelivered).
Str("queueDuration", time.Now().Sub(metadata.Timestamp).String()).
Str("pattern", excludedPathRegexp.String()).
Msg("Excluded path match, remove event skipped")

a.observeMessagesDeleteNumDeliveredMetric(float64(metadata.NumDelivered))
a.observeMessagesDeleteQueueDurationMetric(time.Now().Sub(metadata.Timestamp).Seconds())

return nil, "EXCLUDED_PATH", SkipAck
}
}

start := time.Now()
Expand Down
33 changes: 17 additions & 16 deletions client/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"os"
"time"
Expand All @@ -16,46 +17,46 @@ type Params struct {

func MinIO(
ctx context.Context,
name, bucket, endpoint, accessKey, secretAccessKey *string,
useSSL *bool,
name, bucket, endpoint, accessKey, secretAccessKey string,
useSSL bool,
p Params,
trace *bool,
logLevel zerolog.Level,
) (*minio.Client, context.CancelFunc) {

//minio.MaxRetry = 0

client, err := minio.New(*endpoint, &minio.Options{
Creds: credentials.NewStaticV4(*accessKey, *secretAccessKey, ""),
Secure: *useSSL,
client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""),
Secure: useSSL,
})
if err != nil {
log.Fatal().Err(err).Msgf("Failed to setup %s client", *name)
log.Fatal().Err(err).Msgf("Failed to setup %s client", name)
}

if *trace {
if logLevel == zerolog.TraceLevel {
client.TraceOn(os.Stdout)
}

if p == (Params{}) {
log.Info().Msgf("Setup %s client to %s", *name, client.EndpointURL())
log.Info().Msgf("Setup %s client to %s", name, client.EndpointURL())
} else {
log.Info().Msgf("Setup %s client to %s with %d threads and %dMB part size", *name, client.EndpointURL(), p.Threads, p.PartSize)
log.Info().Msgf("Setup %s client to %s with %d threads and %dMB part size", name, client.EndpointURL(), p.Threads, p.PartSize)
}

bucketExists, err := client.BucketExists(ctx, *bucket)
bucketExists, err := client.BucketExists(ctx, bucket)
if err != nil {
log.Fatal().Err(err).Msgf("Failed to check if %s bucket %s exists", *name, *bucket)
log.Fatal().Err(err).Msgf("Failed to check if %s bucket %s exists", name, bucket)
}

if !bucketExists {
log.Fatal().Msgf("%s bucket %s does not exist or access is missing", *name, *bucket)
log.Fatal().Msgf("%s bucket %s does not exist or access is missing", name, bucket)
}

// enable health checking
destHealthCheckCancel, err := client.HealthCheck(5 * time.Second)
healthCheckCancel, err := client.HealthCheck(5 * time.Second)
if err != nil {
log.Fatal().Msgf("Failed to start %s client health check", *name)
log.Fatal().Msgf("Failed to start %s client health check", name)
}

return client, destHealthCheckCancel
return client, healthCheckCancel
}
104 changes: 104 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package main

type Config struct {
ApiVersion string `fig:"apiVersion" validate:"required"`

LogLevel string `fig:"logLevel" default:"info"`
MsgTimeout string `fig:"msgTimeout" default:"30m"`
ShutdownWait string `fig:"shutdownWait" default:"0s"`
SkipLifecycleExpired bool `fig:"skipLifecycleExpired"`

Src struct {
AccessKey string `fig:"accessKey"`
Bucket string `fig:"bucket"`
Endpoint string `fig:"endpoint" default:"localhost:9000"`
Name string `fig:"name" default:"destination"`
SecretKey string `fig:"secretKey"`
UseSSL bool `fig:"useSSL"`
}

Dest struct {
AccessKey string `fig:"accessKey"`
Bucket string `fig:"bucket"`
Endpoint string `fig:"endpoint" default:"localhost:9000"`
Name string `fig:"name" default:"source"`
PartSize uint64 `fig:"partSize" default:"16"`
SecretKey string `fig:"secretKey"`
Threads uint `fig:"threads" default:"4"`
UseSSL bool `fig:"useSSL"`
}

ExcludePaths struct {
CopyObject []string `fig:"copyObject"`
RemoveObject []string `fig:"removeObject"`
}

HealthCheck struct {
Disabled bool
Port int `default:"8080"`
}

Metrics struct {
Port int `default:"9999"`
}

Jetstream struct {
BatchSize int `fig:"batchSize" default:"1"`
Password string `fig:"password"`
ProvisioningDisabled bool `fig:"provisioningDisabled"`
RootCA string `fig:"rootCA"`
Subject string `fig:"subject" default:"archie-minio-events"`
URL string `fig:"url" default:"nats://localhost:4222"`
Username string `fig:"username"`

Stream struct {
MaxAge string `fig:"maxAge"`
MaxSize int64 `fig:"maxSize" default:"-1"`
Name string `fig:"name" default:"archie-stream"`
Replicas int `fig:"replicas" default:"1"`
RepublishSubject string `fig:"republishSubject"`
Retention string `fig:"retention" default:"limits"`
}

Consumer struct {
Name string `fig:"name" default:"archie-consumer"`
MaxAckPending int `fig:"maxAckPending" default:"1000"`
}
}
}

//jetStreamProvisioningDisabled := flag.Bool("jetstream-provisioning-disabled", LookupEnvOrBool("JETSTREAM_PROVISIONING_DISABLED", false), "disable the creation and configuration of the stream and consumer")
//jetStreamSubject := flag.String("jetstream-subject", LookupEnvOrString("JETSTREAM_SUBJECT", "archie-minio-events"), "nats jetstream subject to subscribe to")
//jetStreamURL := flag.String("jetstream-url", LookupEnvOrString("JETSTREAM_URL", "nats://localhost:4222"), "jetstream client url")
//jetStreamUsername := flag.String("jetstream-username", LookupEnvOrString("JETSTREAM_USERNAME", ""), "jetstream client username")
//metricsPort := flag.Int("metrics-port", LookupEnvOrInt("METRICS_PORT", 9999), "metrics tcp port number")
//msgTimeout := flag.String("msg-timeout", LookupEnvOrString("MSG_TIMEOUT", "30m"), "the max duration for a transfer, jetstream stream message ack timeout and internal transfer context timeout (set to -5s of this setting)")
//shutdownWait := flag.String("shutdown-wait", LookupEnvOrString("SHUTDOWN_WAIT", "0s"), "time to wait for running transfers to complete before exiting")
//skipLifecycleExpired := flag.Bool("skip-lifecycle-expired", LookupEnvOrBool("SKIP_LIFECYCLE_EXPIRED", false), "don't propagate deletes initiated by the lifecycle expiration")
//srcAccessKey := flag.String("src-access-key", LookupEnvOrString("SRC_ACCESS_KEY", ""), "source bucket access key")
//srcBucket := flag.String("src-bucket", LookupEnvOrString("SRC_BUCKET", "test"), "source bucket name")
//srcEndpoint := flag.String("src-endpoint", LookupEnvOrString("SRC_ENDPOINT", "localhost:9000"), "source endpoint")
//srcName := flag.String("src-name", LookupEnvOrString("SRC_NAME", "minio"), "source display name")
//srcSecretAccessKey := flag.String("src-secret-access-key", LookupEnvOrString("SRC_SECRET_ACCESS_KEY", ""), "source secret access key")
//srcUseSSL := flag.Bool("src-use-ssl", LookupEnvOrBool("SRC_USE_SSL", true), "use ssl for the source bucket")
//destAccessKey := flag.String("dest-access-key", LookupEnvOrString("DEST_ACCESS_KEY", ""), "destination bucket access key")
//destBucket := flag.String("dest-bucket", LookupEnvOrString("DEST_BUCKET", ""), "destination bucket name")
//destEndpoint := flag.String("dest-endpoint", LookupEnvOrString("DEST_ENDPOINT", "localhost:9000"), "destination endpoint")
//destName := flag.String("dest-name", LookupEnvOrString("DEST_NAME", "b2"), "destination display name")
//destPartSize := flag.Uint64("dest-part-size", LookupEnvOrUint64("DEST_PART_SIZE", 16), "upload part size in mebibytes")
//destSecretAccessKey := flag.String("dest-secret-access-key", LookupEnvOrString("DEST_SECRET_ACCESS_KEY", ""), "destination secret access key")
//destThreads := flag.Uint("dest-threads", LookupEnvOrUint("DEST_THREADS", 4), "number of upload threads")
//destUseSSL := flag.Bool("dest-use-ssl", LookupEnvOrBool("DEST_USE_SSL", true), "use ssl connection for the destination bucket")
//healthCheckEnabled := flag.Bool("health-check-enabled", LookupEnvOrBool("HEALTH_CHECK_ENABLED", true), "enable health-check server for k9s")
//healthCheckPort := flag.Int("health-check-port", LookupEnvOrInt("HEALTH_CHECK_PORT", 8080), "health check tcp port number")
//jetStreamBatchSize := flag.Int("jetstream-batch-size", LookupEnvOrInt("JETSTREAM_BATCH_SIZE", 1), "number of JetStream messages to pull per batch")
//jetStreamDurableConsumer := flag.String("jetstream-durable-consumer", LookupEnvOrString("JETSTREAM_DURABLE_CONSUMER", "archie-consumer"), "name of the durable stream consumer (queue group)")
//jetStreamMaxAckPending := flag.Int("jetstream-max-ack-pending", LookupEnvOrInt("JETSTREAM_MAX_ACK_PENDING", 1_000), "jetstream server will stop offering msgs for processing once it is waiting on too many un-ack'd msgs")
//jetStreamPassword := flag.String("jetstream-password", LookupEnvOrString("JETSTREAM_PASSWORD", ""), "jetstream client password")
//jetStreamRootCA := flag.String("jetstream-root-ca", LookupEnvOrString("JETSTREAM_ROOT_CA", ""), "path to the root CA cert file")
//jetStreamStream := flag.String("jetstream-stream", LookupEnvOrString("JETSTREAM_STREAM", "archie-stream"), "jetstream stream name")
//jetStreamStreamMaxAge := flag.String("jetstream-stream-max-age", LookupEnvOrString("JETSTREAM_STREAM_MAX_AGE", ""), "max duration to persist JetStream messages in the stream")
//jetStreamStreamMaxSize := flag.Int64("jetstream-stream-max-size", LookupEnvOrInt64("JETSTREAM_STREAM_MAX_SIZE", -1), "max size of stream in megabytes")
//jetStreamStreamReplicas := flag.Int("jetstream-stream-replicas", LookupEnvOrInt("JETSTREAM_STREAM_REPLICAS", 1), "number of replicas for the stream data")
//jetStreamStreamRetention := flag.String("jetstream-stream-retention", LookupEnvOrString("JETSTREAM_STREAM_RETENTION", "limits"), "stream retention policy: 'limits', 'interest', or 'work_queue'")
//jetStreamStreamRepublishSubject := flag.String("jetstream-stream-republish-subject", LookupEnvOrString("JETSTREAM_STREAM_REPUBLISH_SUBJECT", ""), "re-publish messages from the main subject to a separate subject")
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ go 1.19

require (
github.com/InVisionApp/go-health/v2 v2.1.2
github.com/kkyr/fig v0.3.0
github.com/minio/minio-go/v7 v7.0.39
github.com/nats-io/nats.go v1.17.0
github.com/prometheus/client_golang v1.13.0
github.com/rs/zerolog v1.28.0
go.arsenm.dev/pcre v0.0.0-20220530205550-74594f6c8b0e
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741
)

Expand All @@ -26,14 +28,17 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats-server/v2 v2.8.4 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b // indirect
Expand All @@ -42,4 +47,8 @@ require (
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
modernc.org/libc v1.16.8 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/memory v1.1.1 // indirect
)
Loading

0 comments on commit 5f2dc83

Please sign in to comment.