From f6da7617a4836ffc106ad26758315832a3023f15 Mon Sep 17 00:00:00 2001 From: drev74 Date: Mon, 25 Mar 2024 13:12:12 +0300 Subject: [PATCH] refactor: started migration to jetstream --- ajc/util.go | 7 +++--- asyncjobs.go | 5 ++-- client_options.go | 4 ---- election/election.go | 10 ++++---- election/election_test.go | 15 ++++++------ election/options.go | 4 ++-- storage.go | 48 ++++++++++++++++++++------------------- storage_test.go | 8 +++---- 8 files changed, 50 insertions(+), 51 deletions(-) diff --git a/ajc/util.go b/ajc/util.go index 83e25dd..1c3a37a 100644 --- a/ajc/util.go +++ b/ajc/util.go @@ -17,6 +17,7 @@ import ( "github.com/dustin/go-humanize" "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/sirupsen/logrus" "github.com/xlab/tablewriter" "golang.org/x/term" @@ -167,8 +168,8 @@ func showTasks(tasks *asyncjobs.TasksInfo) { } } -func showElectionStatus(kv nats.KeyValue) { - status, err := kv.Status() +func showElectionStatus(kv jetstream.KeyValue) { + status, err := kv.Status(context.TODO()) if err != nil { return } @@ -195,7 +196,7 @@ func showElectionStatus(kv nats.KeyValue) { } for _, k := range keys { - entry, err := kv.Get(k) + entry, err := kv.Get(context.TODO(), k) if err != nil { fmt.Printf(" Could not get value for %v: %v", k, err) } diff --git a/asyncjobs.go b/asyncjobs.go index 9c5dbe6..fb14073 100644 --- a/asyncjobs.go +++ b/asyncjobs.go @@ -11,6 +11,7 @@ import ( "github.com/nats-io/jsm.go" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) const ( @@ -40,7 +41,7 @@ type StorageAdmin interface { TasksInfo() (*TasksInfo, error) Tasks(ctx context.Context, limit int32) (chan *Task, error) TasksStore() (*jsm.Manager, *jsm.Stream, error) - ElectionStorage() (nats.KeyValue, error) + ElectionStorage() (jetstream.KeyValue, error) } type ScheduledTaskStorage interface { @@ -50,7 +51,7 @@ type ScheduledTaskStorage interface { ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error) ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error) EnqueueTask(ctx context.Context, queue *Queue, task *Task) error - ElectionStorage() (nats.KeyValue, error) + ElectionStorage() (jetstream.KeyValue, error) PublishLeaderElectedEvent(ctx context.Context, name string, component string) error } diff --git a/client_options.go b/client_options.go index a8ab621..fbad6c9 100644 --- a/client_options.go +++ b/client_options.go @@ -105,10 +105,6 @@ func CustomLogger(log Logger) ClientOpt { // NatsConn sets an already connected NATS connection as communications channel func NatsConn(nc *nats.Conn) ClientOpt { return func(opts *ClientOpts) error { - if !nc.Opts.UseOldRequestStyle { - return fmt.Errorf("connection with UseOldRequestStyle() is required") - } - opts.nc = nc return nil } diff --git a/election/election.go b/election/election.go index ac662ba..361a568 100644 --- a/election/election.go +++ b/election/election.go @@ -12,7 +12,7 @@ import ( "sync" "time" - "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // Backoff controls the interval of campaigns @@ -68,7 +68,7 @@ type election struct { var skipValidate bool -func NewElection(name string, key string, bucket nats.KeyValue, opts ...Option) (Election, error) { +func NewElection(name string, key string, bucket jetstream.KeyValue, opts ...Option) (Election, error) { e := &election{ state: UnknownState, lastSeq: math.MaxUint64, @@ -79,7 +79,7 @@ func NewElection(name string, key string, bucket nats.KeyValue, opts ...Option) }, } - status, err := bucket.Status() + status, err := bucket.Status(context.TODO()) if err != nil { return nil, err } @@ -121,7 +121,7 @@ func (e *election) debugf(format string, a ...any) { func (e *election) campaignForLeadership() error { campaignsCounter.WithLabelValues(e.opts.key, e.opts.name, stateNames[CandidateState]).Inc() - seq, err := e.opts.bucket.Create(e.opts.key, []byte(e.opts.name)) + seq, err := e.opts.bucket.Create(context.TODO(), e.opts.key, []byte(e.opts.name)) if err != nil { e.tries++ return nil @@ -139,7 +139,7 @@ func (e *election) campaignForLeadership() error { func (e *election) maintainLeadership() error { campaignsCounter.WithLabelValues(e.opts.key, e.opts.name, stateNames[LeaderState]).Inc() - seq, err := e.opts.bucket.Update(e.opts.key, []byte(e.opts.name), e.lastSeq) + seq, err := e.opts.bucket.Update(context.TODO(), e.opts.key, []byte(e.opts.name), e.lastSeq) if err != nil { e.debugf("key update failed, moving to candidate state: %v", err) e.state = CandidateState diff --git a/election/election_test.go b/election/election_test.go index f5d254e..a99dad0 100644 --- a/election/election_test.go +++ b/election/election_test.go @@ -14,6 +14,7 @@ import ( "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -27,8 +28,8 @@ var _ = Describe("Leader Election", func() { var ( srv *server.Server nc *nats.Conn - js nats.KeyValueManager - kv nats.KeyValue + js jetstream.KeyValueManager + kv jetstream.KeyValue err error debugger func(f string, a ...any) ) @@ -36,10 +37,8 @@ var _ = Describe("Leader Election", func() { BeforeEach(func() { skipValidate = false srv, nc = startJSServer(GinkgoT()) - js, err = nc.JetStream() - Expect(err).ToNot(HaveOccurred()) - kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + kv, err = js.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ Bucket: "LEADER_ELECTION", TTL: 750 * time.Millisecond, }) @@ -60,7 +59,7 @@ var _ = Describe("Leader Election", func() { Describe("Election", func() { It("Should validate the TTL", func() { - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + kv, err := js.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ Bucket: "LE", TTL: 24 * time.Hour, }) @@ -174,10 +173,10 @@ var _ = Describe("Leader Election", func() { kills++ if kills%3 == 0 { debugger("deleting key") - Expect(kv.Delete("election")).ToNot(HaveOccurred()) + Expect(kv.Delete(context.TODO(), "election")).ToNot(HaveOccurred()) } else { debugger("corrupting key") - _, err := kv.Put("election", nil) + _, err := kv.Put(context.TODO(), "election", nil) Expect(err).ToNot(HaveOccurred()) } } diff --git a/election/options.go b/election/options.go index 34fdb34..861afdd 100644 --- a/election/options.go +++ b/election/options.go @@ -7,7 +7,7 @@ package election import ( "time" - "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // Option configures the election system @@ -16,7 +16,7 @@ type Option func(o *options) type options struct { name string key string - bucket nats.KeyValue + bucket jetstream.KeyValue ttl time.Duration cInterval time.Duration wonCb func() diff --git a/storage.go b/storage.go index 4d2ed6b..1aea642 100644 --- a/storage.go +++ b/storage.go @@ -20,6 +20,7 @@ import ( "github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) const ( @@ -68,8 +69,8 @@ type jetStreamStorage struct { mgr *jsm.Manager tasks *taskStorage - configBucket nats.KeyValue - leaderElections nats.KeyValue + configBucket jetstream.KeyValue + leaderElections jetstream.KeyValue retry RetryPolicyProvider qStreams map[string]*jsm.Stream @@ -502,7 +503,7 @@ func (s *jetStreamStorage) ConfigurationInfo() (*nats.KeyValueBucketStatus, erro return nil, fmt.Errorf("%w: configuration bucket not configured", ErrStorageNotReady) } - st, err := s.configBucket.Status() + st, err := s.configBucket.Status(context.TODO()) if err != nil { return nil, err } @@ -575,7 +576,7 @@ func (s *jetStreamStorage) DeleteScheduledTaskByName(name string) error { return fmt.Errorf("%w: scheduled storage not prepared", ErrStorageNotReady) } - return s.configBucket.Delete(fmt.Sprintf("scheduled_tasks.%s", name)) + return s.configBucket.Delete(context.TODO(), fmt.Sprintf("scheduled_tasks.%s", name)) } func (s *jetStreamStorage) ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error) { @@ -586,7 +587,7 @@ func (s *jetStreamStorage) ScheduledTasks(ctx context.Context) ([]*ScheduledTask wctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - watch, err := s.configBucket.Watch("scheduled_tasks.*", nats.Context(wctx)) + watch, err := s.configBucket.Watch(wctx, "scheduled_tasks.*") if err != nil { return nil, err } @@ -601,7 +602,7 @@ func (s *jetStreamStorage) ScheduledTasks(ctx context.Context) ([]*ScheduledTask return tasks, nil } - if entry.Operation() != nats.KeyValuePut { + if entry.Operation() != jetstream.KeyValuePut { continue } @@ -625,7 +626,7 @@ func (s *jetStreamStorage) ScheduledTasksWatch(ctx context.Context) (chan *Sched return nil, fmt.Errorf("%w: scheduled storage not prepared", ErrStorageNotReady) } - watch, err := s.configBucket.Watch("scheduled_tasks.*", nats.Context(ctx)) + watch, err := s.configBucket.Watch(ctx, "scheduled_tasks.*") if err != nil { return nil, err } @@ -641,7 +642,7 @@ func (s *jetStreamStorage) ScheduledTasksWatch(ctx context.Context) (chan *Sched continue } - if entry.Operation() == nats.KeyValueDelete || entry.Operation() == nats.KeyValuePurge { + if entry.Operation() == jetstream.KeyValueDelete || entry.Operation() == jetstream.KeyValuePurge { parts := strings.Split(entry.Key(), ".") tasks <- &ScheduleWatchEntry{ Name: parts[len(parts)-1], @@ -678,7 +679,7 @@ func (s *jetStreamStorage) LoadScheduledTaskByName(name string) (*ScheduledTask, return nil, fmt.Errorf("%w: scheduled storage not prepared", ErrStorageNotReady) } - e, err := s.configBucket.Get(fmt.Sprintf("scheduled_tasks.%s", name)) + e, err := s.configBucket.Get(context.TODO(), fmt.Sprintf("scheduled_tasks.%s", name)) if err != nil { if err == nats.ErrKeyNotFound { return nil, ErrScheduledTaskNotFound @@ -710,9 +711,9 @@ func (s *jetStreamStorage) SaveScheduledTask(st *ScheduledTask, update bool) err var rev uint64 if update { - rev, err = s.configBucket.Put(key, stj) + rev, err = s.configBucket.Put(context.TODO(), key, stj) } else { - rev, err = s.configBucket.Create(key, stj) + rev, err = s.configBucket.Create(context.TODO(), key, stj) } if err != nil { if strings.Contains(err.Error(), "wrong last sequence") { @@ -727,25 +728,26 @@ func (s *jetStreamStorage) SaveScheduledTask(st *ScheduledTask, update bool) err } func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) error { + ctx := context.TODO() var err error - if replicas == 0 { - replicas = 1 - } - - js, err := s.nc.JetStream() + js, err := jetstream.New(s.nc) if err != nil { return err } - storage := nats.FileStorage + if replicas == 0 { + replicas = 1 + } + + storage := jetstream.FileStorage if memory { - storage = nats.MemoryStorage + storage = jetstream.MemoryStorage } - kv, err := js.KeyValue(ConfigBucketName) + kv, err := js.KeyValue(ctx, ConfigBucketName) if err == nats.ErrBucketNotFound { - kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ Bucket: ConfigBucketName, Description: "Choria Async Jobs Configuration", Storage: storage, @@ -758,9 +760,9 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) s.configBucket = kv - kv, err = js.KeyValue(LeaderElectionBucketName) + kv, err = js.KeyValue(ctx, LeaderElectionBucketName) if err == nats.ErrBucketNotFound { - kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ Bucket: LeaderElectionBucketName, Description: "Choria Async Jobs Leader Elections", Storage: storage, @@ -908,7 +910,7 @@ func (s *jetStreamStorage) TasksStore() (*jsm.Manager, *jsm.Stream, error) { } // ElectionStorage gives access to the key-value store used for elections -func (s *jetStreamStorage) ElectionStorage() (nats.KeyValue, error) { +func (s *jetStreamStorage) ElectionStorage() (jetstream.KeyValue, error) { if s.leaderElections == nil { return nil, fmt.Errorf("%s: election bucket not configured", ErrStorageNotReady) } diff --git a/storage_test.go b/storage_test.go index 6a23086..55f86f5 100644 --- a/storage_test.go +++ b/storage_test.go @@ -157,7 +157,7 @@ var _ = Describe("Storage", func() { err = storage.PrepareConfigurationStore(true, 1) Expect(err).ToNot(HaveOccurred()) - _, err = storage.configBucket.Put("scheduled_tasks.test", []byte("{invalid")) + _, err = storage.configBucket.Put(context.TODO(), "scheduled_tasks.test", []byte("{invalid")) Expect(err).ToNot(HaveOccurred()) _, err = storage.LoadScheduledTaskByName("test") @@ -215,7 +215,7 @@ var _ = Describe("Storage", func() { err = storage.SaveScheduledTask(st, true) Expect(err).ToNot(HaveOccurred()) - e, err := storage.configBucket.Get(fmt.Sprintf("scheduled_tasks.%s", st.Name)) + e, err := storage.configBucket.Get(context.TODO(), fmt.Sprintf("scheduled_tasks.%s", st.Name)) Expect(err).ToNot(HaveOccurred()) st = &ScheduledTask{} err = json.Unmarshal(e.Value(), st) @@ -253,7 +253,7 @@ var _ = Describe("Storage", func() { err = storage.PrepareConfigurationStore(true, 1) Expect(err).ToNot(HaveOccurred()) - kvs, err := storage.configBucket.Status() + kvs, err := storage.configBucket.Status(context.TODO()) Expect(err).ToNot(HaveOccurred()) Expect(kvs.(*nats.KeyValueBucketStatus).StreamInfo().Config.Storage).To(Equal(nats.MemoryStorage)) @@ -268,7 +268,7 @@ var _ = Describe("Storage", func() { err = storage.PrepareConfigurationStore(false, 1) Expect(err).ToNot(HaveOccurred()) - kvs, err := storage.configBucket.Status() + kvs, err := storage.configBucket.Status(context.TODO()) Expect(err).ToNot(HaveOccurred()) stream := kvs.(*nats.KeyValueBucketStatus).StreamInfo().Config Expect(stream.MaxMsgsPerSubject).To(Equal(int64(1)))