Skip to content

Commit

Permalink
adding arg to disable jetstream provisioning
Browse files Browse the repository at this point in the history
  • Loading branch information
rayjanoka committed Oct 18, 2022
1 parent 1d11f66 commit 86f3107
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 79 deletions.
8 changes: 7 additions & 1 deletion archie/removeObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import (
"time"
)

func (a *Archiver) removeObject(ctx context.Context, mLog zerolog.Logger, eventObjKey string, msg *nats.Msg, record event.Record) (error, string, AckType) {
func (a *Archiver) removeObject(
ctx context.Context,
mLog zerolog.Logger,
eventObjKey string,
msg *nats.Msg,
record event.Record,
) (error, string, AckType) {
metadata, _ := msg.Metadata()

if a.SkipLifecycleExpired && record.Source.Host == "Internal: [ILM-EXPIRY]" {
Expand Down
17 changes: 15 additions & 2 deletions archie/shutdownSignal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ import (
"time"
)

func (a *Archiver) WaitForSignal(shutdownWait string, baseCancel context.CancelFunc, msgCancel context.CancelFunc, healthCheckSrv *http.Server, metricsSrv *http.Server) {
func (a *Archiver) WaitForSignal(
shutdownWait string,
baseCancel context.CancelFunc,
msgCancel context.CancelFunc,
healthCheckSrv *http.Server,
metricsSrv *http.Server,
) {
shutdownWaitDuration, err := time.ParseDuration(shutdownWait)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse shutdown-wait duration argument")
Expand Down Expand Up @@ -50,7 +56,14 @@ func (a *Archiver) WaitForSignal(shutdownWait string, baseCancel context.CancelF
}
}

func (a *Archiver) shutdown(sig os.Signal, shutdownWaitDuration time.Duration, baseCancel context.CancelFunc, msgCancel context.CancelFunc, healthCheckSrv *http.Server, metricsSrv *http.Server) {
func (a *Archiver) shutdown(
sig os.Signal,
shutdownWaitDuration time.Duration,
baseCancel context.CancelFunc,
msgCancel context.CancelFunc,
healthCheckSrv *http.Server,
metricsSrv *http.Server,
) {
log.Info().Msgf("Received %s signal, starting %s shutdown wait", sig, shutdownWaitDuration)

// stop fetching new records
Expand Down
154 changes: 81 additions & 73 deletions client/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import (
"time"
)

func JetStream(url, subject, stream, durableConsumer, streamMaxAgeDur, rootCA, username, password string, streamReplicas, maxAckPending int, streamMaxSize int64, msgTimeout string, jetreamStreamRePublishEnabled bool) (*nats.Subscription, *nats.Conn) {
func JetStream(
url, subject, stream, durableConsumer, streamMaxAgeDur, rootCA, username, password string,
streamReplicas, maxAckPending int,
streamMaxSize int64,
msgTimeout string,
jetStreamStreamRePublishEnabled, jetStreamProvisioningDisabled bool,
) (*nats.Subscription, *nats.Conn) {
var connectOptions []nats.Option
if rootCA != "" {
connectOptions = append(connectOptions, nats.RootCAs(rootCA))
Expand All @@ -30,97 +36,99 @@ func JetStream(url, subject, stream, durableConsumer, streamMaxAgeDur, rootCA, u

// TODO: output some information about the JetStream server

// build the stream
streamMaxAge, err := time.ParseDuration(streamMaxAgeDur)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse jetstream-max-age duration argument")
}

streamMaxBytes := int64(-1)
if streamMaxSize != -1 {
streamMaxBytes = 1_000_000 * streamMaxSize // Megabytes
}

streamConfig := &nats.StreamConfig{
Name: stream,
Subjects: []string{subject},
MaxAge: streamMaxAge,
MaxBytes: -1, // TODO: fix this
Replicas: streamReplicas, // TODO: test this
Retention: nats.LimitsPolicy,
}

if jetreamStreamRePublishEnabled {
// set the main stream to delete msgs on ACK
// then use the archive stream as a backup
streamConfig.Retention = nats.InterestPolicy
if !jetStreamProvisioningDisabled {
// build the stream
streamMaxAge, err := time.ParseDuration(streamMaxAgeDur)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse jetstream-max-age duration argument")
}

// setup the msg forwarding ot the archive stream
streamConfig.RePublish = &nats.RePublish{Source: subject, Destination: fmt.Sprintf("%s-archive", subject)}
streamMaxBytes := int64(-1)
if streamMaxSize != -1 {
streamMaxBytes = 1_000_000 * streamMaxSize // Megabytes
}

// build an archive stream that will not be consumed
archiveStreamConfig := &nats.StreamConfig{
Name: fmt.Sprintf("%s-archive", stream),
Subjects: []string{fmt.Sprintf("%s-archive", subject)},
streamConfig := &nats.StreamConfig{
Name: stream,
Subjects: []string{subject},
MaxAge: streamMaxAge,
MaxBytes: streamMaxBytes, // TODO: test this
MaxBytes: -1, // TODO: fix this
Replicas: streamReplicas, // TODO: test this
Retention: nats.LimitsPolicy,
}

archiveStreamInfo := createOrUpdateStream(jetStream, fmt.Sprintf("%s-archive", stream), archiveStreamConfig)
if jetStreamStreamRePublishEnabled {
// set the main stream to delete msgs on ACK
// then use the archive stream as a backup
streamConfig.Retention = nats.InterestPolicy

// setup the msg forwarding to the archive stream
streamConfig.RePublish = &nats.RePublish{Source: subject, Destination: fmt.Sprintf("%s-archive", subject)}

// build an archive stream that will not be consumed
archiveStreamConfig := &nats.StreamConfig{
Name: fmt.Sprintf("%s-archive", stream),
Subjects: []string{fmt.Sprintf("%s-archive", subject)},
MaxAge: streamMaxAge,
MaxBytes: streamMaxBytes, // TODO: test this
Replicas: streamReplicas, // TODO: test this
Retention: nats.LimitsPolicy,
}

log.Info().Msgf("JetStream archive stream %s configured with %d replicas and limited by %s max age, and %d max bytes",
archiveStreamInfo.Config.Name, archiveStreamInfo.Config.Replicas, archiveStreamInfo.Config.MaxAge, archiveStreamInfo.Config.MaxBytes)
}
archiveStreamInfo := createOrUpdateStream(jetStream, fmt.Sprintf("%s-archive", stream), archiveStreamConfig)

streamInfo := createOrUpdateStream(jetStream, stream, streamConfig)
log.Info().Msgf("JetStream archive stream %s configured with %d replicas and limited by %s max age, and %d max bytes",
archiveStreamInfo.Config.Name, archiveStreamInfo.Config.Replicas, archiveStreamInfo.Config.MaxAge, archiveStreamInfo.Config.MaxBytes)
}

log.Info().Msgf("JetStream stream %s configured with %d replicas and limited by %s max age, and %d max bytes",
streamInfo.Config.Name, streamInfo.Config.Replicas, streamInfo.Config.MaxAge, streamInfo.Config.MaxBytes)
streamInfo := createOrUpdateStream(jetStream, stream, streamConfig)

// build the stream consumer
ackWait, err := time.ParseDuration(msgTimeout)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse jetstream-msg-timeout duration argument")
}
log.Info().Msgf("JetStream stream %s configured with %d replicas and limited by %s max age, and %d max bytes",
streamInfo.Config.Name, streamInfo.Config.Replicas, streamInfo.Config.MaxAge, streamInfo.Config.MaxBytes)

desiredConsumerConfig := &nats.ConsumerConfig{
//MaxWaiting: jetStreamMaxWaiting, // must match fetch() batch parameter
//MaxRequestExpires: 60 * time.Second, // limit max fetch() expires (pull)
//MaxRequestBatch: 10, // limit max fetch() batch size (pull)
AckPolicy: nats.AckExplicitPolicy, // always ack (default)
AckWait: ackWait, // wait before retry
DeliverPolicy: nats.DeliverNewPolicy, // deliver since consumer creation
Durable: durableConsumer, // consumer name
FilterSubject: subject, // nats subject for stream
MaxAckPending: maxAckPending, // stop offering msgs once we're waiting on too many acks
MaxDeliver: -1, // try to redeliver forever
}
// build the stream consumer
ackWait, err := time.ParseDuration(msgTimeout)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse jetstream-msg-timeout duration argument")
}

consumerInfo, err := jetStream.ConsumerInfo(stream, durableConsumer)
if err != nil {
if err.Error() == "nats: consumer not found" {
consumerInfo, err = jetStream.AddConsumer(stream, desiredConsumerConfig)
if err != nil {
log.Fatal().Err(err).Msg("Failed to add JetStream consumer")
desiredConsumerConfig := &nats.ConsumerConfig{
//MaxWaiting: jetStreamMaxWaiting, // must match fetch() batch parameter
//MaxRequestExpires: 60 * time.Second, // limit max fetch() expires (pull)
//MaxRequestBatch: 10, // limit max fetch() batch size (pull)
AckPolicy: nats.AckExplicitPolicy, // always ack (default)
AckWait: ackWait, // wait before retry
DeliverPolicy: nats.DeliverNewPolicy, // deliver since consumer creation
Durable: durableConsumer, // consumer name
FilterSubject: subject, // nats subject for stream
MaxAckPending: maxAckPending, // stop offering msgs once we're waiting on too many acks
MaxDeliver: -1, // try to redeliver forever
}

consumerInfo, err := jetStream.ConsumerInfo(stream, durableConsumer)
if err != nil {
if err.Error() == "nats: consumer not found" {
consumerInfo, err = jetStream.AddConsumer(stream, desiredConsumerConfig)
if err != nil {
log.Fatal().Err(err).Msg("Failed to add JetStream consumer")
}
} else {
log.Fatal().Err(err).Msg("Failed to get JetStream consumer info")
}
} else {
log.Fatal().Err(err).Msg("Failed to get JetStream consumer info")
}
} else {
activeConsumerConfig := consumerInfo.Config
activeConsumerConfig := consumerInfo.Config

if desiredConsumerConfig != &activeConsumerConfig {
consumerInfo, err = jetStream.UpdateConsumer(stream, desiredConsumerConfig)
if err != nil {
log.Fatal().Err(err).Msg("Failed to update JetStream consumer")
if desiredConsumerConfig != &activeConsumerConfig {
consumerInfo, err = jetStream.UpdateConsumer(stream, desiredConsumerConfig)
if err != nil {
log.Fatal().Err(err).Msg("Failed to update JetStream consumer")
}
}
}
}

log.Info().Msgf("JetStream Consumer %s configured with %s message timeout and %d max ack pending",
consumerInfo.Config.Durable, consumerInfo.Config.AckWait, consumerInfo.Config.MaxAckPending)
log.Info().Msgf("JetStream Consumer %s configured with %s message timeout and %d max ack pending",
consumerInfo.Config.Durable, consumerInfo.Config.AckWait, consumerInfo.Config.MaxAckPending)
}

// pull mode consumer
sub, err := jetStream.PullSubscribe(subject, durableConsumer)
Expand Down
8 changes: 7 additions & 1 deletion client/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ type Params struct {
PartSize uint64
}

func MinIO(ctx context.Context, name, bucket, endpoint, accessKey, secretAccessKey *string, useSSL *bool, p Params, trace *bool) (*minio.Client, context.CancelFunc) {
func MinIO(
ctx context.Context,
name, bucket, endpoint, accessKey, secretAccessKey *string,
useSSL *bool,
p Params,
trace *bool,
) (*minio.Client, context.CancelFunc) {

//minio.MaxRetry = 0

Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func main() {
jetStreamStreamMaxAge := flag.String("jetstream-stream-max-age", LookupEnvOrString("JETSTREAM_STREAM_MAX_AGE", "72h"), "max duration to persist JetStream messages in the stream")
jetStreamStreamMaxSize := flag.Int64("jetstream-stream-max", LookupEnvOrInt64("JETSTREAM_STREAM_MAX_SIZE", -1), "max size of stream in megabytes")
jetStreamStreamReplicas := flag.Int("jetstream-stream-replicas", LookupEnvOrInt("JETSTREAM_STREAM_REPLICAS", 1), "number of replicas for the stream data")
jetreamStreamRePublishEnabled := flag.Bool("jetstream-stream-republish-enabled", LookupEnvOrBool("JETSTREAM_STREAM_REPUBLISH_ENABLED", false), "re-publish messages from the main stream to a separate archive stream")
jetStreamStreamRePublishEnabled := flag.Bool("jetstream-stream-republish-enabled", LookupEnvOrBool("JETSTREAM_STREAM_REPUBLISH_ENABLED", false), "re-publish messages from the main stream to a separate archive stream")
jetStreamProvisioningDisabled := flag.Bool("jetstream-provisioning-disabled", LookupEnvOrBool("JETSTREAM_PROVISIONING_DISABLED", false), "disable the creation and configuration of the stream and consumer")
jetStreamSubject := flag.String("jetstream-subject", LookupEnvOrString("JETSTREAM_SUBJECT", "minioevents"), "nats jetstream subject to subscribe to")
jetStreamURL := flag.String("jetstream-url", LookupEnvOrString("JETSTREAM_URL", "nats://localhost:4222"), "jetstream client url")
jetStreamUsername := flag.String("jetstream-username", LookupEnvOrString("JETSTREAM_USERNAME", ""), "jetstream client username")
Expand Down Expand Up @@ -149,7 +150,8 @@ func main() {
*jetStreamMaxAckPending,
*jetStreamStreamMaxSize,
*msgTimeout,
*jetreamStreamRePublishEnabled,
*jetStreamStreamRePublishEnabled,
*jetStreamProvisioningDisabled,
)

// health check server
Expand Down

0 comments on commit 86f3107

Please sign in to comment.