diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 5f3c900..3cf105a 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -19,7 +19,7 @@ builds: goos: - linux - darwin - - windows +# - windows goarch: - amd64 diff --git a/archie/archiver.go b/archie/archiver.go index e17a4fd..6aede40 100644 --- a/archie/archiver.go +++ b/archie/archiver.go @@ -2,6 +2,7 @@ package archie import ( "github.com/minio/minio-go/v7" + "go.arsenm.dev/pcre" "sync" ) @@ -12,7 +13,7 @@ type Archiver struct { DestPartSize uint64 DestThreads uint FetchDone chan string - HealthCheckEnabled bool + HealthCheckDisabled bool IsOffline bool MsgTimeout string SkipLifecycleExpired bool @@ -20,6 +21,10 @@ type Archiver struct { SrcClient *minio.Client SrcName string WaitGroup *sync.WaitGroup + ExcludePaths struct { + CopyObject []*pcre.Regexp + RemoveObject []*pcre.Regexp + } } type AckType int diff --git a/archie/copyObject.go b/archie/copyObject.go index 3d87f17..d0d444a 100644 --- a/archie/copyObject.go +++ b/archie/copyObject.go @@ -11,6 +11,21 @@ import ( func (a *Archiver) copyObject(ctx context.Context, mLog zerolog.Logger, eventObjKey string, msg *nats.Msg) (error, string, AckType) { metadata, _ := msg.Metadata() + for _, excludedPathRegexp := range a.ExcludePaths.CopyObject { + if excludedPathRegexp.MatchString(eventObjKey) { + mLog.Info(). + Uint64("numDelivered", metadata.NumDelivered). + Str("queueDuration", time.Now().Sub(metadata.Timestamp).String()). + Str("pattern", excludedPathRegexp.String()). + Msg("Excluded path match, copy event skipped") + + a.observeMessagesTransferNumDeliveredMetric(float64(metadata.NumDelivered)) + a.observeMessagesTransferQueueDurationMetric(time.Now().Sub(metadata.Timestamp).Seconds()) + + return nil, "EXCLUDED_PATH", SkipAck + } + } + // get src object start := time.Now() srcObject, err := a.SrcClient.GetObject(ctx, a.SrcBucket, eventObjKey, minio.GetObjectOptions{}) diff --git a/archie/healthCheck.go b/archie/healthCheck.go index 48cc44a..210212f 100644 --- a/archie/healthCheck.go +++ b/archie/healthCheck.go @@ -25,7 +25,7 @@ type readinessCheck struct { type livenessCheck struct{} func (a *Archiver) StartHealthCheckServer(healthCheckPort int, jetStreamConn *nats.Conn) *http.Server { - if !a.HealthCheckEnabled { + if a.HealthCheckDisabled { return nil } diff --git a/archie/message.go b/archie/message.go index bee4a02..4013104 100644 --- a/archie/message.go +++ b/archie/message.go @@ -95,21 +95,19 @@ func (a *Archiver) message(ctx context.Context, msg *nats.Msg) { Msg("Message received") var ack AckType - var s3ErrMsg, s3ErrCode string + var s3ErrMsg, s3ErrCode, execContext string // message type router switch eventType { case "s3:ObjectCreated": - var errContext string - err, errContext, ack = a.copyObject(ctx, mLog, eventObjKey, msg) + err, execContext, ack = a.copyObject(ctx, mLog, eventObjKey, msg) if err != nil { - s3ErrMsg, s3ErrCode = logS3Error(err, errContext, &mLog) + s3ErrMsg, s3ErrCode = logS3Error(err, execContext, &mLog) } case "s3:ObjectRemoved": - var errContext string - err, errContext, ack = a.removeObject(ctx, mLog, eventObjKey, msg, eventRecord) + err, execContext, ack = a.removeObject(ctx, mLog, eventObjKey, msg, eventRecord) if err != nil { - s3ErrMsg, s3ErrCode = logS3Error(err, errContext, &mLog) + s3ErrMsg, s3ErrCode = logS3Error(err, execContext, &mLog) } default: mLog.Error().Msgf("Unable to process the %s event type", event.EventName, eventType) @@ -131,7 +129,7 @@ func (a *Archiver) message(ctx context.Context, msg *nats.Msg) { // logging already happened continue } - a.cleanupAndCountMessagesProcessedMetric("skipped", "", "", event.EventName, eventType) + a.cleanupAndCountMessagesProcessedMetric("skipped", "", execContext, event.EventName, eventType) case Nak: sendNakSignal(msg, &mLog) a.cleanupAndCountMessagesProcessedMetric("failed", s3ErrMsg, s3ErrCode, event.EventName, eventType) diff --git a/archie/removeObject.go b/archie/removeObject.go index a29921d..3166bbf 100644 --- a/archie/removeObject.go +++ b/archie/removeObject.go @@ -27,7 +27,22 @@ func (a *Archiver) removeObject( a.observeMessagesDeleteNumDeliveredMetric(float64(metadata.NumDelivered)) a.observeMessagesDeleteQueueDurationMetric(time.Now().Sub(metadata.Timestamp).Seconds()) - return nil, "", SkipAck + return nil, "ILM_EXPIRY", SkipAck + } + + for _, excludedPathRegexp := range a.ExcludePaths.RemoveObject { + if excludedPathRegexp.MatchString(eventObjKey) { + mLog.Info(). + Uint64("numDelivered", metadata.NumDelivered). + Str("queueDuration", time.Now().Sub(metadata.Timestamp).String()). + Str("pattern", excludedPathRegexp.String()). + Msg("Excluded path match, remove event skipped") + + a.observeMessagesDeleteNumDeliveredMetric(float64(metadata.NumDelivered)) + a.observeMessagesDeleteQueueDurationMetric(time.Now().Sub(metadata.Timestamp).Seconds()) + + return nil, "EXCLUDED_PATH", SkipAck + } } start := time.Now() diff --git a/client/minio.go b/client/minio.go index e4cf9e2..a3b8f22 100644 --- a/client/minio.go +++ b/client/minio.go @@ -4,6 +4,7 @@ import ( "context" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "os" "time" @@ -16,46 +17,46 @@ type Params struct { func MinIO( ctx context.Context, - name, bucket, endpoint, accessKey, secretAccessKey *string, - useSSL *bool, + name, bucket, endpoint, accessKey, secretAccessKey string, + useSSL bool, p Params, - trace *bool, + logLevel zerolog.Level, ) (*minio.Client, context.CancelFunc) { //minio.MaxRetry = 0 - client, err := minio.New(*endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(*accessKey, *secretAccessKey, ""), - Secure: *useSSL, + client, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""), + Secure: useSSL, }) if err != nil { - log.Fatal().Err(err).Msgf("Failed to setup %s client", *name) + log.Fatal().Err(err).Msgf("Failed to setup %s client", name) } - if *trace { + if logLevel == zerolog.TraceLevel { client.TraceOn(os.Stdout) } if p == (Params{}) { - log.Info().Msgf("Setup %s client to %s", *name, client.EndpointURL()) + log.Info().Msgf("Setup %s client to %s", name, client.EndpointURL()) } else { - log.Info().Msgf("Setup %s client to %s with %d threads and %dMB part size", *name, client.EndpointURL(), p.Threads, p.PartSize) + log.Info().Msgf("Setup %s client to %s with %d threads and %dMB part size", name, client.EndpointURL(), p.Threads, p.PartSize) } - bucketExists, err := client.BucketExists(ctx, *bucket) + bucketExists, err := client.BucketExists(ctx, bucket) if err != nil { - log.Fatal().Err(err).Msgf("Failed to check if %s bucket %s exists", *name, *bucket) + log.Fatal().Err(err).Msgf("Failed to check if %s bucket %s exists", name, bucket) } if !bucketExists { - log.Fatal().Msgf("%s bucket %s does not exist or access is missing", *name, *bucket) + log.Fatal().Msgf("%s bucket %s does not exist or access is missing", name, bucket) } // enable health checking - destHealthCheckCancel, err := client.HealthCheck(5 * time.Second) + healthCheckCancel, err := client.HealthCheck(5 * time.Second) if err != nil { - log.Fatal().Msgf("Failed to start %s client health check", *name) + log.Fatal().Msgf("Failed to start %s client health check", name) } - return client, destHealthCheckCancel + return client, healthCheckCancel } diff --git a/config.go b/config.go new file mode 100644 index 0000000..1a98158 --- /dev/null +++ b/config.go @@ -0,0 +1,104 @@ +package main + +type Config struct { + ApiVersion string `fig:"apiVersion" validate:"required"` + + LogLevel string `fig:"logLevel" default:"info"` + MsgTimeout string `fig:"msgTimeout" default:"30m"` + ShutdownWait string `fig:"shutdownWait" default:"0s"` + SkipLifecycleExpired bool `fig:"skipLifecycleExpired"` + + Src struct { + AccessKey string `fig:"accessKey"` + Bucket string `fig:"bucket"` + Endpoint string `fig:"endpoint" default:"localhost:9000"` + Name string `fig:"name" default:"destination"` + SecretKey string `fig:"secretKey"` + UseSSL bool `fig:"useSSL"` + } + + Dest struct { + AccessKey string `fig:"accessKey"` + Bucket string `fig:"bucket"` + Endpoint string `fig:"endpoint" default:"localhost:9000"` + Name string `fig:"name" default:"source"` + PartSize uint64 `fig:"partSize" default:"16"` + SecretKey string `fig:"secretKey"` + Threads uint `fig:"threads" default:"4"` + UseSSL bool `fig:"useSSL"` + } + + ExcludePaths struct { + CopyObject []string `fig:"copyObject"` + RemoveObject []string `fig:"removeObject"` + } + + HealthCheck struct { + Disabled bool + Port int `default:"8080"` + } + + Metrics struct { + Port int `default:"9999"` + } + + Jetstream struct { + BatchSize int `fig:"batchSize" default:"1"` + Password string `fig:"password"` + ProvisioningDisabled bool `fig:"provisioningDisabled"` + RootCA string `fig:"rootCA"` + Subject string `fig:"subject" default:"archie-minio-events"` + URL string `fig:"url" default:"nats://localhost:4222"` + Username string `fig:"username"` + + Stream struct { + MaxAge string `fig:"maxAge"` + MaxSize int64 `fig:"maxSize" default:"-1"` + Name string `fig:"name" default:"archie-stream"` + Replicas int `fig:"replicas" default:"1"` + RepublishSubject string `fig:"republishSubject"` + Retention string `fig:"retention" default:"limits"` + } + + Consumer struct { + Name string `fig:"name" default:"archie-consumer"` + MaxAckPending int `fig:"maxAckPending" default:"1000"` + } + } +} + +//jetStreamProvisioningDisabled := flag.Bool("jetstream-provisioning-disabled", LookupEnvOrBool("JETSTREAM_PROVISIONING_DISABLED", false), "disable the creation and configuration of the stream and consumer") +//jetStreamSubject := flag.String("jetstream-subject", LookupEnvOrString("JETSTREAM_SUBJECT", "archie-minio-events"), "nats jetstream subject to subscribe to") +//jetStreamURL := flag.String("jetstream-url", LookupEnvOrString("JETSTREAM_URL", "nats://localhost:4222"), "jetstream client url") +//jetStreamUsername := flag.String("jetstream-username", LookupEnvOrString("JETSTREAM_USERNAME", ""), "jetstream client username") +//metricsPort := flag.Int("metrics-port", LookupEnvOrInt("METRICS_PORT", 9999), "metrics tcp port number") +//msgTimeout := flag.String("msg-timeout", LookupEnvOrString("MSG_TIMEOUT", "30m"), "the max duration for a transfer, jetstream stream message ack timeout and internal transfer context timeout (set to -5s of this setting)") +//shutdownWait := flag.String("shutdown-wait", LookupEnvOrString("SHUTDOWN_WAIT", "0s"), "time to wait for running transfers to complete before exiting") +//skipLifecycleExpired := flag.Bool("skip-lifecycle-expired", LookupEnvOrBool("SKIP_LIFECYCLE_EXPIRED", false), "don't propagate deletes initiated by the lifecycle expiration") +//srcAccessKey := flag.String("src-access-key", LookupEnvOrString("SRC_ACCESS_KEY", ""), "source bucket access key") +//srcBucket := flag.String("src-bucket", LookupEnvOrString("SRC_BUCKET", "test"), "source bucket name") +//srcEndpoint := flag.String("src-endpoint", LookupEnvOrString("SRC_ENDPOINT", "localhost:9000"), "source endpoint") +//srcName := flag.String("src-name", LookupEnvOrString("SRC_NAME", "minio"), "source display name") +//srcSecretAccessKey := flag.String("src-secret-access-key", LookupEnvOrString("SRC_SECRET_ACCESS_KEY", ""), "source secret access key") +//srcUseSSL := flag.Bool("src-use-ssl", LookupEnvOrBool("SRC_USE_SSL", true), "use ssl for the source bucket") +//destAccessKey := flag.String("dest-access-key", LookupEnvOrString("DEST_ACCESS_KEY", ""), "destination bucket access key") +//destBucket := flag.String("dest-bucket", LookupEnvOrString("DEST_BUCKET", ""), "destination bucket name") +//destEndpoint := flag.String("dest-endpoint", LookupEnvOrString("DEST_ENDPOINT", "localhost:9000"), "destination endpoint") +//destName := flag.String("dest-name", LookupEnvOrString("DEST_NAME", "b2"), "destination display name") +//destPartSize := flag.Uint64("dest-part-size", LookupEnvOrUint64("DEST_PART_SIZE", 16), "upload part size in mebibytes") +//destSecretAccessKey := flag.String("dest-secret-access-key", LookupEnvOrString("DEST_SECRET_ACCESS_KEY", ""), "destination secret access key") +//destThreads := flag.Uint("dest-threads", LookupEnvOrUint("DEST_THREADS", 4), "number of upload threads") +//destUseSSL := flag.Bool("dest-use-ssl", LookupEnvOrBool("DEST_USE_SSL", true), "use ssl connection for the destination bucket") +//healthCheckEnabled := flag.Bool("health-check-enabled", LookupEnvOrBool("HEALTH_CHECK_ENABLED", true), "enable health-check server for k9s") +//healthCheckPort := flag.Int("health-check-port", LookupEnvOrInt("HEALTH_CHECK_PORT", 8080), "health check tcp port number") +//jetStreamBatchSize := flag.Int("jetstream-batch-size", LookupEnvOrInt("JETSTREAM_BATCH_SIZE", 1), "number of JetStream messages to pull per batch") +//jetStreamDurableConsumer := flag.String("jetstream-durable-consumer", LookupEnvOrString("JETSTREAM_DURABLE_CONSUMER", "archie-consumer"), "name of the durable stream consumer (queue group)") +//jetStreamMaxAckPending := flag.Int("jetstream-max-ack-pending", LookupEnvOrInt("JETSTREAM_MAX_ACK_PENDING", 1_000), "jetstream server will stop offering msgs for processing once it is waiting on too many un-ack'd msgs") +//jetStreamPassword := flag.String("jetstream-password", LookupEnvOrString("JETSTREAM_PASSWORD", ""), "jetstream client password") +//jetStreamRootCA := flag.String("jetstream-root-ca", LookupEnvOrString("JETSTREAM_ROOT_CA", ""), "path to the root CA cert file") +//jetStreamStream := flag.String("jetstream-stream", LookupEnvOrString("JETSTREAM_STREAM", "archie-stream"), "jetstream stream name") +//jetStreamStreamMaxAge := flag.String("jetstream-stream-max-age", LookupEnvOrString("JETSTREAM_STREAM_MAX_AGE", ""), "max duration to persist JetStream messages in the stream") +//jetStreamStreamMaxSize := flag.Int64("jetstream-stream-max-size", LookupEnvOrInt64("JETSTREAM_STREAM_MAX_SIZE", -1), "max size of stream in megabytes") +//jetStreamStreamReplicas := flag.Int("jetstream-stream-replicas", LookupEnvOrInt("JETSTREAM_STREAM_REPLICAS", 1), "number of replicas for the stream data") +//jetStreamStreamRetention := flag.String("jetstream-stream-retention", LookupEnvOrString("JETSTREAM_STREAM_RETENTION", "limits"), "stream retention policy: 'limits', 'interest', or 'work_queue'") +//jetStreamStreamRepublishSubject := flag.String("jetstream-stream-republish-subject", LookupEnvOrString("JETSTREAM_STREAM_REPUBLISH_SUBJECT", ""), "re-publish messages from the main subject to a separate subject") diff --git a/go.mod b/go.mod index 1f01d40..e7b4197 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,12 @@ go 1.19 require ( github.com/InVisionApp/go-health/v2 v2.1.2 + github.com/kkyr/fig v0.3.0 github.com/minio/minio-go/v7 v7.0.39 github.com/nats-io/nats.go v1.17.0 github.com/prometheus/client_golang v1.13.0 github.com/rs/zerolog v1.28.0 + go.arsenm.dev/pcre v0.0.0-20220530205550-74594f6c8b0e golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 ) @@ -26,14 +28,17 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.0 // indirect + github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/nats-server/v2 v2.8.4 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/pelletier/go-toml v1.9.3 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rs/xid v1.4.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b // indirect @@ -42,4 +47,8 @@ require ( golang.org/x/text v0.3.7 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + modernc.org/libc v1.16.8 // indirect + modernc.org/mathutil v1.4.1 // indirect + modernc.org/memory v1.1.1 // indirect ) diff --git a/go.sum b/go.sum index e85061c..78480f4 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= @@ -160,7 +161,10 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kkyr/fig v0.3.0 h1:5bd1amYKp/gsK2bGEUJYzcCrQPKOZp6HZD9K21v9Guo= +github.com/kkyr/fig v0.3.0/go.mod h1:fEnrLjwg/iwSr8ksJF4DxrDmCUir5CaVMLORGYMcz30= github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -170,12 +174,15 @@ github.com/klauspost/cpuid/v2 v2.1.1/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8t github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -189,6 +196,8 @@ github.com/minio/minio-go/v7 v7.0.39 h1:upnbu1jCGOqEvrGSpRauSN9ZG7RCHK7VHxXS8Vmg github.com/minio/minio-go/v7 v7.0.39/go.mod h1:nCrRzjoSUQh8hgKKtu3Y708OLvRLtuASMg2/nvmbarw= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -211,6 +220,8 @@ github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= +github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -241,6 +252,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -262,8 +275,11 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= github.com/zaffka/mongodb-boltdb-mock v0.0.0-20180816124423-49954d88fa3e/go.mod h1:GsDD1qsG+86MeeCG7ndi6Ei3iGthKL3wQ7PTFigDfNY= +go.arsenm.dev/pcre v0.0.0-20220530205550-74594f6c8b0e h1:4XwLmFDvAKt7ZvS3E3hD2R++0wr75fBUEvXkK9dLXzk= +go.arsenm.dev/pcre v0.0.0-20220530205550-74594f6c8b0e/go.mod h1:c/E0D60A6rRLoDLh6mLUdFV9gxyth+CnXnqGHos2CAQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -339,6 +355,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -360,6 +377,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -380,6 +398,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -394,6 +413,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -401,6 +421,7 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -462,6 +483,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -546,6 +568,7 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= @@ -570,6 +593,26 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= +modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= +modernc.org/ccgo/v3 v3.0.0-20220428102840-41399a37e894/go.mod h1:eI31LL8EwEBKPpNpA4bU1/i+sKOwOrQy8D87zWUcRZc= +modernc.org/ccgo/v3 v3.0.0-20220430103911-bc99d88307be/go.mod h1:bwdAnOoaIt8Ax9YdWGjxWsdkPcZyRPHqrOvJxaKAKGw= +modernc.org/ccgo/v3 v3.16.6/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ= +modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= +modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= +modernc.org/libc v0.0.0-20220428101251-2d5f3daf273b/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= +modernc.org/libc v1.16.0/go.mod h1:N4LD6DBE9cf+Dzf9buBlzVJndKr/iJHG97vGLHYnb5A= +modernc.org/libc v1.16.1/go.mod h1:JjJE0eu4yeK7tab2n4S1w8tlWd9MxXLRzheaRnAKymU= +modernc.org/libc v1.16.8 h1:Ux98PaOMvolgoFX/YwusFOHBnanXdGRmWgI8ciI2z4o= +modernc.org/libc v1.16.8/go.mod h1:hYIV5VZczAmGZAnG15Vdngn5HSF5cSkbvfz2B7GRuVU= +modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/memory v1.1.1 h1:bDOL0DIDLQv7bWhP3gMvIrnoFw+Eo6F7a2QK9HPDiFU= +modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= +modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw= +modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/helm/archie/templates/deployment.yaml b/helm/archie/templates/deployment.yaml index ff3af18..72e6734 100644 --- a/helm/archie/templates/deployment.yaml +++ b/helm/archie/templates/deployment.yaml @@ -41,9 +41,6 @@ spec: {{- with .Values.image.args }} {{- toYaml . | nindent 12 }} {{- end }} - envFrom: - - secretRef: - name: {{ include "archie.fullname" . }} imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - name: metrics @@ -63,10 +60,14 @@ spec: port: http {{- end }} volumeMounts: + - name: {{ include "archie.fullname" . }}-config-volume + mountPath: /app/config.yaml + subPath: config.yaml + readOnly: true {{- with .Values.jetstream.rootCA }} - - name: {{ .secretName }}-clients-volume - mountPath: /etc/nats-cert/{{ .secretName }} - subPath: {{ .fileName }} + - name: {{ .secretName }}-clients-volume + mountPath: /etc/nats-cert/{{ .secretName }} + subPath: {{ .fileName }} {{- end }} resources: {{- toYaml .Values.archie.resources | nindent 12 }} @@ -84,6 +85,10 @@ spec: {{- toYaml . | nindent 8 }} {{- end }} volumes: + - name: {{ include "archie.fullname" . }}-config-volume + secret: + secretName: {{ include "archie.fullname" . }} + optional: false {{- with .Values.jetstream.rootCA }} - name: {{ .secretName }}-clients-volume secret: diff --git a/helm/archie/templates/scaledobject.yaml b/helm/archie/templates/scaledobject.yaml index 618330c..c3512f5 100644 --- a/helm/archie/templates/scaledobject.yaml +++ b/helm/archie/templates/scaledobject.yaml @@ -40,7 +40,7 @@ spec: metadata: natsServerMonitoringEndpoint: {{ .Values.jetstream.metricsURL }} account: "$G" - stream: {{ .Values.jetstream.stream }} - consumer: {{ .Values.jetstream.durableConsumer }} + stream: {{ .Values.jetstream.stream.name }} + consumer: {{ .Values.jetstream.consumer.name }} lagThreshold: "{{ .Values.keda.trigger.lagThreshold }}" {{- end }} diff --git a/helm/archie/templates/secret.yaml b/helm/archie/templates/secret.yaml index cd06636..5bd70de 100644 --- a/helm/archie/templates/secret.yaml +++ b/helm/archie/templates/secret.yaml @@ -6,106 +6,121 @@ metadata: labels: {{- include "archie.labels" . | nindent 4 }} type: Opaque -data: - DEBUG: {{ .Values.archie.debug | toString | b64enc | quote }} - SHUTDOWN_WAIT: {{ $shutdownWaitDuration | b64enc | quote }} - JETSTREAM_PROVISIONING_DISABLED: {{ .Values.jetstream.provisioningDisabled | toString | b64enc | quote }} - JETSTREAM_URL: {{ .Values.jetstream.url | b64enc | quote }} - SKIP_LIFECYCLE_EXPIRED: {{ .Values.archie.skipLifecycleExpired | toString | b64enc | quote }} - - {{- if .Values.archie.msgTimeout }} - MSG_TIMEOUT: {{ .Values.archie.msgTimeout | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.rootCA }} - {{ $caPath := print "/etc/nats-cert/" .Values.jetstream.rootCA.secretName }} - JETSTREAM_ROOT_CA: {{ $caPath | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.username }} - JETSTREAM_USERNAME: {{ .Values.jetstream.username | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.password }} - JETSTREAM_PASSWORD: {{ .Values.jetstream.password | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.stream }} - JETSTREAM_STREAM: {{ .Values.jetstream.stream | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.streamReplicas }} - JETSTREAM_STREAM_REPLICAS: {{ .Values.jetstream.streamReplicas | toString | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.streamRetention }} - JETSTREAM_STREAM_RETENTION: {{ .Values.jetstream.streamRetention | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.streamMaxSize }} - JETSTREAM_STREAM_MAX_SIZE: {{ .Values.jetstream.streamMaxSize | toString | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.durableConsumer }} - JETSTREAM_DURABLE_CONSUMER: {{ .Values.jetstream.durableConsumer | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.subject }} - JETSTREAM_SUBJECT: {{ .Values.jetstream.subject | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.maxAckPending }} - JETSTREAM_MAX_ACK_PENDING: {{ .Values.jetstream.maxAckPending | toString | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.streamMaxAge }} - JETSTREAM_STREAM_MAX_AGE: {{ .Values.jetstream.streamMaxAge | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.streamRepublishSubject }} - JETSTREAM_STREAM_REPUBLISH_SUBJECT: {{ .Values.jetstream.streamRepublishSubject | b64enc | quote }} - {{- end }} - - {{- if .Values.jetstream.batchSize }} - JETSTREAM_BATCH_SIZE: {{ .Values.jetstream.batchSize | b64enc | quote }} - {{- end }} - - SRC_NAME: {{ .Values.source.name | b64enc | quote }} - SRC_BUCKET: {{ .Values.source.bucket | b64enc | quote }} - SRC_ENDPOINT: {{ .Values.source.endpoint | b64enc | quote }} - SRC_USE_SSL: {{ .Values.source.useSSL | toString | b64enc | quote }} - - {{- if .Values.source.accessKey }} - SRC_ACCESS_KEY: {{ .Values.source.accessKey | b64enc | quote }} - {{- end }} - - {{- if .Values.source.secretAccessKey }} - SRC_SECRET_ACCESS_KEY: {{ .Values.source.secretAccessKey | b64enc | quote }} - {{- end }} - - DEST_NAME: {{ .Values.destination.name | b64enc | quote }} - DEST_BUCKET: {{ .Values.destination.bucket | b64enc | quote }} - DEST_ENDPOINT: {{ .Values.destination.endpoint | b64enc | quote }} - DEST_USE_SSL: {{ .Values.destination.useSSL | toString | b64enc | quote }} - DEST_THREADS: {{ .Values.destination.threads | toString | b64enc | quote }} - DEST_PART_SIZE: {{ .Values.destination.partSize | toString | b64enc | quote }} - - {{- if .Values.destination.accessKey }} - DEST_ACCESS_KEY: {{ .Values.destination.accessKey | b64enc | quote }} - {{- end }} - - {{- if .Values.destination.secretAccessKey }} - DEST_SECRET_ACCESS_KEY: {{ .Values.destination.secretAccessKey | b64enc | quote }} - {{- end }} - - {{- if .Values.archie.healthCheck.enabled }} - HEALTH_CHECK_ENABLED: {{ .Values.archie.healthCheck.enabled | toString | b64enc | quote }} - {{- end }} - - {{- if .Values.archie.healthCheck.port }} - HEALTH_CHECK_PORT: {{ .Values.archie.healthCheck.port | toString | b64enc | quote }} - {{- end }} - - {{- if .Values.archie.metrics.port }} - METRICS_PORT: {{ .Values.archie.metrics.port | toString | b64enc | quote }} - {{- end }} +stringData: + config.yaml: | + apiVersion: v1 + + logLevel: {{ .Values.archie.logLevel | quote }} + shutdownWait: {{ $shutdownWaitDuration | quote }} + skipLifecycleExpired: {{ .Values.archie.skipLifecycleExpired | toString | quote }} + + {{- if .Values.archie.msgTimeout }} + msgTimeout: {{ .Values.archie.msgTimeout | quote }} + {{- end }} + + excludePaths: + {{- with .Values.archie.excludePaths.copyObject }} + copyObject: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.archie.excludePaths.removeObject }} + removeObject: + {{- toYaml . | nindent 8 }} + {{- end }} + + src: + name: {{ .Values.source.name | quote }} + bucket: {{ .Values.source.bucket | quote }} + endpoint: {{ .Values.source.endpoint | quote }} + useSSL: {{ .Values.source.useSSL | toString | quote }} + + {{- if .Values.source.accessKey }} + accessKey: {{ .Values.source.accessKey | quote }} + {{- end }} + + {{- if .Values.source.secretKey }} + secretKey: {{ .Values.source.secretKey | quote }} + {{- end }} + + + dest: + name: {{ .Values.destination.name | quote }} + bucket: {{ .Values.destination.bucket | quote }} + endpoint: {{ .Values.destination.endpoint | quote }} + useSSL: {{ .Values.destination.useSSL | toString | quote }} + threads: {{ .Values.destination.threads | toString | quote }} + partSize: {{ .Values.destination.partSize | toString | quote }} + + {{- if .Values.destination.accessKey }} + accessKey: {{ .Values.destination.accessKey | quote }} + {{- end }} + + {{- if .Values.destination.secretKey }} + secretKey: {{ .Values.destination.secretKey | quote }} + {{- end }} + + jetstream: + provisioningDisabled: {{ .Values.jetstream.provisioningDisabled | toString | quote }} + url: {{ .Values.jetstream.url | quote }} + + {{- if .Values.jetstream.rootCA }} + {{ $caPath := print "/etc/nats-cert/" .Values.jetstream.rootCA.secretName }} + rootCA: {{ $caPath | quote }} + {{- end }} + + {{- if .Values.jetstream.username }} + username: {{ .Values.jetstream.username | quote }} + {{- end }} + + {{- if .Values.jetstream.password }} + password: {{ .Values.jetstream.password | quote }} + {{- end }} + + {{- if .Values.jetstream.subject }} + subject: {{ .Values.jetstream.subject | quote }} + {{- end }} + + {{- if .Values.jetstream.batchSize }} + batchSize: {{ .Values.jetstream.batchSize | quote }} + {{- end }} + + stream: + {{- if .Values.jetstream.stream.name }} + name: {{ .Values.jetstream.stream.name | quote }} + {{- end }} + + {{- if .Values.jetstream.stream.replicas }} + replicas: {{ .Values.jetstream.stream.replicas | toString | quote }} + {{- end }} + + {{- if .Values.jetstream.stream.retention }} + retention: {{ .Values.jetstream.stream.retention | quote }} + {{- end }} + + {{- if .Values.jetstream.stream.maxSize }} + maxSize: {{ .Values.jetstream.stream.maxSize | toString | quote }} + {{- end }} + + {{- if .Values.jetstream.stream.maxAge }} + maxAge: {{ .Values.jetstream.stream.maxAge | quote }} + {{- end }} + + {{- if .Values.jetstream.stream.republishSubject }} + republishSubject: {{ .Values.jetstream.streamRepublishSubject | quote }} + {{- end }} + + consumer: + {{- if .Values.jetstream.consumer.name }} + name: {{ .Values.jetstream.consumer.name | quote }} + {{- end }} + + {{- if .Values.jetstream.consumer.maxAckPending }} + maxAckPending: {{ .Values.jetstream.consumer.maxAckPending | toString | quote }} + {{- end }} + + healthCheck: + disabled: {{ eq .Values.archie.healthCheck.enabled false | toString | quote }} + port: {{ .Values.archie.healthCheck.port | toString | quote }} + + metrics: + port: {{ .Values.archie.metrics.port | toString | quote }} diff --git a/helm/archie/values.yaml b/helm/archie/values.yaml index b4ac67f..5bcc05e 100644 --- a/helm/archie/values.yaml +++ b/helm/archie/values.yaml @@ -8,9 +8,19 @@ image: # args: [] archie: - debug: false + logLevel: info # or "debug" or "trace" + msgTimeout: 30m shutdownWait: 30 # seconds -# msgTimeout: 30m + skipLifecycleExpired: false + # pcre regex matching + #excludePaths: + # copyObject: + # - '.*' + # removeObject: + # - '.*' + # disable the deployment + deployment: + create: true healthCheck: enabled: true port: 8080 @@ -39,27 +49,25 @@ archie: natsMessagesDeliveredLowThreshold: 30 natsMessagesPendingThreshold: 20000 natsMessagesRedeliveredPercentageThreshold: 2 - skipLifecycleExpired: false - # disable the deployment - deployment: - create: true jetstream: url: nats://localhost:4222 metricsURL: localhost:8222 provisioningDisabled: false -# stream: archie-stream -# streamRetention: "interest" # or "limits" or "work_queue" # subject: minio-archie-events -# maxAckPending: 1000 -# durableConsumer: durable -# streamMaxAge: 72h -# streamReplicas: 1 -# streamMaxSize: -1 # MB -# streamRepublishSubject: minio-archie-events-archive # batchSize: 1 # username: # password: +# stream: +# name: archie-stream +# retention: interest # or "limits" or "work_queue" +# maxAge: 72h +# replicas: 1 +# maxSize: -1 # MB +# republishSubject: minio-archie-events-archive +# consumer: +# name: durable +# maxAckPending: 1000 # rootCA: # fileName: ca.crt # secretName: nats-ca @@ -100,22 +108,22 @@ keda: replicas: 0 source: - name: src + name: src # just a label bucket: src-test endpoint: localhost useSSL: true # accessKey: -# secretAccessKey: +# secretKey: destination: - name: dest + name: dest # just a label bucket: dest-test endpoint: localhost useSSL: true threads: 4 partSize: 16 # MiB # accessKey: -# secretAccessKey: +# secretKey: imagePullSecrets: [] nameOverride: "" diff --git a/main.go b/main.go index 002d2b6..bf8b5ab 100644 --- a/main.go +++ b/main.go @@ -5,55 +5,28 @@ import ( "archie/client" "context" "flag" + "github.com/kkyr/fig" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "go.arsenm.dev/pcre" + "path/filepath" "sync" "time" ) func main() { - //TODO: verify all settings are output somewhere - debug := flag.Bool("debug", LookupEnvOrBool("DEBUG", false), "set log level to debug") - destAccessKey := flag.String("dest-access-key", LookupEnvOrString("DEST_ACCESS_KEY", ""), "destination bucket access key") - destBucket := flag.String("dest-bucket", LookupEnvOrString("DEST_BUCKET", ""), "destination bucket name") - destEndpoint := flag.String("dest-endpoint", LookupEnvOrString("DEST_ENDPOINT", "localhost:9000"), "destination endpoint") - destName := flag.String("dest-name", LookupEnvOrString("DEST_NAME", "b2"), "destination display name") - destPartSize := flag.Uint64("dest-part-size", LookupEnvOrUint64("DEST_PART_SIZE", 16), "upload part size in mebibytes") - destSecretAccessKey := flag.String("dest-secret-access-key", LookupEnvOrString("DEST_SECRET_ACCESS_KEY", ""), "destination secret access key") - destThreads := flag.Uint("dest-threads", LookupEnvOrUint("DEST_THREADS", 4), "number of upload threads") - destUseSSL := flag.Bool("dest-use-ssl", LookupEnvOrBool("DEST_USE_SSL", true), "use ssl connection for the destination bucket") - healthCheckEnabled := flag.Bool("health-check-enabled", LookupEnvOrBool("HEALTH_CHECK_ENABLED", true), "enable health-check server for k9s") - healthCheckPort := flag.Int("health-check-port", LookupEnvOrInt("HEALTH_CHECK_PORT", 8080), "health check tcp port number") - jetStreamBatchSize := flag.Int("jetstream-batch-size", LookupEnvOrInt("JETSTREAM_BATCH_SIZE", 1), "number of JetStream messages to pull per batch") - jetStreamDurableConsumer := flag.String("jetstream-durable-consumer", LookupEnvOrString("JETSTREAM_DURABLE_CONSUMER", "archie-consumer"), "name of the durable stream consumer (queue group)") - jetStreamMaxAckPending := flag.Int("jetstream-max-ack-pending", LookupEnvOrInt("JETSTREAM_MAX_ACK_PENDING", 1_000), "jetstream server will stop offering msgs for processing once it is waiting on too many un-ack'd msgs") - jetStreamPassword := flag.String("jetstream-password", LookupEnvOrString("JETSTREAM_PASSWORD", ""), "jetstream client password") - jetStreamRootCA := flag.String("jetstream-root-ca", LookupEnvOrString("JETSTREAM_ROOT_CA", ""), "path to the root CA cert file") - jetStreamStream := flag.String("jetstream-stream", LookupEnvOrString("JETSTREAM_STREAM", "archie-stream"), "jetstream stream name") - jetStreamStreamMaxAge := flag.String("jetstream-stream-max-age", LookupEnvOrString("JETSTREAM_STREAM_MAX_AGE", ""), "max duration to persist JetStream messages in the stream") - jetStreamStreamMaxSize := flag.Int64("jetstream-stream-max-size", LookupEnvOrInt64("JETSTREAM_STREAM_MAX_SIZE", -1), "max size of stream in megabytes") - jetStreamStreamReplicas := flag.Int("jetstream-stream-replicas", LookupEnvOrInt("JETSTREAM_STREAM_REPLICAS", 1), "number of replicas for the stream data") - jetStreamStreamRetention := flag.String("jetstream-stream-retention", LookupEnvOrString("JETSTREAM_STREAM_RETENTION", "limits"), "stream retention policy: 'limits', 'interest', or 'work_queue'") - jetStreamStreamRepublishSubject := flag.String("jetstream-stream-republish-subject", LookupEnvOrString("JETSTREAM_STREAM_REPUBLISH_SUBJECT", ""), "re-publish messages from the main subject to a separate subject") - jetStreamProvisioningDisabled := flag.Bool("jetstream-provisioning-disabled", LookupEnvOrBool("JETSTREAM_PROVISIONING_DISABLED", false), "disable the creation and configuration of the stream and consumer") - jetStreamSubject := flag.String("jetstream-subject", LookupEnvOrString("JETSTREAM_SUBJECT", "archie-minio-events"), "nats jetstream subject to subscribe to") - jetStreamURL := flag.String("jetstream-url", LookupEnvOrString("JETSTREAM_URL", "nats://localhost:4222"), "jetstream client url") - jetStreamUsername := flag.String("jetstream-username", LookupEnvOrString("JETSTREAM_USERNAME", ""), "jetstream client username") - metricsPort := flag.Int("metrics-port", LookupEnvOrInt("METRICS_PORT", 9999), "metrics tcp port number") - msgTimeout := flag.String("msg-timeout", LookupEnvOrString("MSG_TIMEOUT", "30m"), "the max duration for a transfer, jetstream stream message ack timeout and internal transfer context timeout (set to -5s of this setting)") - shutdownWait := flag.String("shutdown-wait", LookupEnvOrString("SHUTDOWN_WAIT", "0s"), "time to wait for running transfers to complete before exiting") - skipLifecycleExpired := flag.Bool("skip-lifecycle-expired", LookupEnvOrBool("SKIP_LIFECYCLE_EXPIRED", false), "don't propagate deletes initiated by the lifecycle expiration") - srcAccessKey := flag.String("src-access-key", LookupEnvOrString("SRC_ACCESS_KEY", ""), "source bucket access key") - srcBucket := flag.String("src-bucket", LookupEnvOrString("SRC_BUCKET", "test"), "source bucket name") - srcEndpoint := flag.String("src-endpoint", LookupEnvOrString("SRC_ENDPOINT", "localhost:9000"), "source endpoint") - srcName := flag.String("src-name", LookupEnvOrString("SRC_NAME", "minio"), "source display name") - srcSecretAccessKey := flag.String("src-secret-access-key", LookupEnvOrString("SRC_SECRET_ACCESS_KEY", ""), "source secret access key") - srcUseSSL := flag.Bool("src-use-ssl", LookupEnvOrBool("SRC_USE_SSL", true), "use ssl for the source bucket") - trace := flag.Bool("trace", LookupEnvOrBool("TRACE", false), "set log level to trace") + zerolog.TimeFieldFormat = time.RFC3339Nano + configFile := flag.String("config", "config.yaml", "config file path") + logLevelFlag := flag.String("log-level", LookupEnvOrString("LOG_LEVEL", ""), "set the log level (default: info)") flag.Parse() - zerolog.TimeFieldFormat = time.RFC3339Nano + var cfg Config + err := fig.Load(&cfg, fig.File(filepath.Base(*configFile)), fig.Dirs(".", filepath.Dir(*configFile))) + if err != nil { + log.Fatal().Err(err).Msg("Failed to load config file") + } + log.Info(). Str("version", archie.Version). Str("releaseTag", archie.ReleaseTag). @@ -61,11 +34,42 @@ func main() { Str("buildDate", archie.BuildDate). Msg("Starting archie") + // compile and validate pcre regex exclude patterns + var excludedPathCopyObject, excludedPathRemoveObject []*pcre.Regexp + + if len(cfg.ExcludePaths.CopyObject) > 0 || len(cfg.ExcludePaths.RemoveObject) > 0 { + for _, excludedPathPattern := range cfg.ExcludePaths.CopyObject { + excludedPathRegexp, err := pcre.Compile(excludedPathPattern) + if err != nil { + log.Fatal().Err(err).Str("pattern", excludedPathPattern).Msg("Failed to compile CopyObject pcre regex") + } + excludedPathCopyObject = append(excludedPathCopyObject, excludedPathRegexp) + } + + for _, excludedPathPattern := range cfg.ExcludePaths.RemoveObject { + excludedPathRegexp, err := pcre.Compile(excludedPathPattern) + if err != nil { + log.Fatal().Err(err).Str("pattern", excludedPathPattern).Msg("Failed to compile RemoveObject pcre regex") + } + excludedPathRemoveObject = append(excludedPathRemoveObject, excludedPathRegexp) + } + + log.Info().Msgf("Regex patterns compiled with pcre v%s", pcre.Version()) + } + + var logLevel string + // prefer cli arg over config + if *logLevelFlag != "" { + logLevel = *logLevelFlag + } else { + logLevel = cfg.LogLevel + } + zerolog.SetGlobalLevel(zerolog.InfoLevel) - if *trace { + if logLevel == "trace" { log.Info().Msg("Trace logging enabled") zerolog.SetGlobalLevel(zerolog.TraceLevel) - } else if *debug { + } else if logLevel == "debug" { log.Info().Msg("Debug logging enabled") zerolog.SetGlobalLevel(zerolog.DebugLevel) } @@ -86,17 +90,24 @@ func main() { // archiver a := archie.Archiver{ - DestBucket: *destBucket, - DestName: *destName, - DestPartSize: *destPartSize, - DestThreads: *destThreads, + DestBucket: cfg.Dest.Bucket, + DestName: cfg.Dest.Name, + DestPartSize: cfg.Dest.PartSize, + DestThreads: cfg.Dest.Threads, FetchDone: make(chan string, 1), - HealthCheckEnabled: *healthCheckEnabled, - MsgTimeout: *msgTimeout, - SkipLifecycleExpired: *skipLifecycleExpired, - SrcBucket: *srcBucket, - SrcName: *srcName, + HealthCheckDisabled: cfg.HealthCheck.Disabled, + MsgTimeout: cfg.MsgTimeout, + SkipLifecycleExpired: cfg.SkipLifecycleExpired, + SrcBucket: cfg.Src.Bucket, + SrcName: cfg.Src.Name, WaitGroup: &sync.WaitGroup{}, + ExcludePaths: struct { + CopyObject []*pcre.Regexp + RemoveObject []*pcre.Regexp + }{ + CopyObject: excludedPathCopyObject, + RemoveObject: excludedPathRemoveObject, + }, } var srcHealthCheckCancel, destHealthCheckCancel context.CancelFunc @@ -104,14 +115,14 @@ func main() { // source a.SrcClient, srcHealthCheckCancel = client.MinIO( baseCtx, - srcName, - srcBucket, - srcEndpoint, - srcAccessKey, - srcSecretAccessKey, - srcUseSSL, + cfg.Src.Name, + cfg.Src.Bucket, + cfg.Src.Endpoint, + cfg.Src.AccessKey, + cfg.Src.SecretKey, + cfg.Src.UseSSL, client.Params{}, - trace, + zerolog.GlobalLevel(), ) defer func() { @@ -122,14 +133,17 @@ func main() { // destination a.DestClient, destHealthCheckCancel = client.MinIO( baseCtx, - destName, - destBucket, - destEndpoint, - destAccessKey, - destSecretAccessKey, - destUseSSL, - client.Params{Threads: *destThreads, PartSize: *destPartSize}, - trace, + cfg.Dest.Name, + cfg.Dest.Bucket, + cfg.Dest.Endpoint, + cfg.Dest.AccessKey, + cfg.Dest.SecretKey, + cfg.Dest.UseSSL, + client.Params{ + Threads: cfg.Dest.Threads, + PartSize: cfg.Dest.PartSize, + }, + zerolog.GlobalLevel(), ) defer func() { @@ -139,34 +153,34 @@ func main() { // queue jetStreamSub, jetStreamConn := client.JetStream( - *jetStreamURL, - *jetStreamSubject, - *jetStreamStream, - *jetStreamDurableConsumer, - *jetStreamStreamMaxAge, - *jetStreamRootCA, - *jetStreamUsername, - *jetStreamPassword, - *jetStreamStreamReplicas, - *jetStreamMaxAckPending, - *jetStreamStreamMaxSize, - *msgTimeout, - *jetStreamStreamRetention, - *jetStreamStreamRepublishSubject, - *jetStreamProvisioningDisabled, + cfg.Jetstream.URL, + cfg.Jetstream.Subject, + cfg.Jetstream.Stream.Name, + cfg.Jetstream.Consumer.Name, + cfg.Jetstream.Stream.MaxAge, + cfg.Jetstream.RootCA, + cfg.Jetstream.Username, + cfg.Jetstream.Password, + cfg.Jetstream.Stream.Replicas, + cfg.Jetstream.Consumer.MaxAckPending, + cfg.Jetstream.Stream.MaxSize, + cfg.MsgTimeout, + cfg.Jetstream.Stream.Retention, + cfg.Jetstream.Stream.RepublishSubject, + cfg.Jetstream.ProvisioningDisabled, ) // health check server - healthCheckSrv := a.StartHealthCheckServer(*healthCheckPort, jetStreamConn) + healthCheckSrv := a.StartHealthCheckServer(cfg.HealthCheck.Port, jetStreamConn) // metrics server - metricsSrv := a.StartMetricsServer(*metricsPort) + metricsSrv := a.StartMetricsServer(cfg.Metrics.Port) // single-thread message processor - go a.MessageProcessor(baseCtx, msgCtx, jetStreamSub, *jetStreamBatchSize) + go a.MessageProcessor(baseCtx, msgCtx, jetStreamSub, cfg.Jetstream.BatchSize) // shutdown manager - a.WaitForSignal(*shutdownWait, baseCancel, msgCancel, healthCheckSrv, metricsSrv) + a.WaitForSignal(cfg.ShutdownWait, baseCancel, msgCancel, healthCheckSrv, metricsSrv) log.Info().Msg("Shutdown complete") }