Skip to content

Commit

Permalink
refactor: started migration to jetstream
Browse files Browse the repository at this point in the history
  • Loading branch information
drev74 committed Mar 25, 2024
1 parent 0d07838 commit f6da761
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 51 deletions.
7 changes: 4 additions & 3 deletions ajc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/sirupsen/logrus"
"github.com/xlab/tablewriter"
"golang.org/x/term"
Expand Down Expand Up @@ -167,8 +168,8 @@ func showTasks(tasks *asyncjobs.TasksInfo) {
}
}

func showElectionStatus(kv nats.KeyValue) {
status, err := kv.Status()
func showElectionStatus(kv jetstream.KeyValue) {
status, err := kv.Status(context.TODO())
if err != nil {
return
}
Expand All @@ -195,7 +196,7 @@ func showElectionStatus(kv nats.KeyValue) {
}

for _, k := range keys {
entry, err := kv.Get(k)
entry, err := kv.Get(context.TODO(), k)
if err != nil {
fmt.Printf(" Could not get value for %v: %v", k, err)
}
Expand Down
5 changes: 3 additions & 2 deletions asyncjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

const (
Expand Down Expand Up @@ -40,7 +41,7 @@ type StorageAdmin interface {
TasksInfo() (*TasksInfo, error)
Tasks(ctx context.Context, limit int32) (chan *Task, error)
TasksStore() (*jsm.Manager, *jsm.Stream, error)
ElectionStorage() (nats.KeyValue, error)
ElectionStorage() (jetstream.KeyValue, error)
}

type ScheduledTaskStorage interface {
Expand All @@ -50,7 +51,7 @@ type ScheduledTaskStorage interface {
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
ElectionStorage() (nats.KeyValue, error)
ElectionStorage() (jetstream.KeyValue, error)
PublishLeaderElectedEvent(ctx context.Context, name string, component string) error
}

Expand Down
4 changes: 0 additions & 4 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ func CustomLogger(log Logger) ClientOpt {
// NatsConn sets an already connected NATS connection as communications channel
func NatsConn(nc *nats.Conn) ClientOpt {
return func(opts *ClientOpts) error {
if !nc.Opts.UseOldRequestStyle {
return fmt.Errorf("connection with UseOldRequestStyle() is required")
}

opts.nc = nc
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sync"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

// Backoff controls the interval of campaigns
Expand Down Expand Up @@ -68,7 +68,7 @@ type election struct {

var skipValidate bool

func NewElection(name string, key string, bucket nats.KeyValue, opts ...Option) (Election, error) {
func NewElection(name string, key string, bucket jetstream.KeyValue, opts ...Option) (Election, error) {
e := &election{
state: UnknownState,
lastSeq: math.MaxUint64,
Expand All @@ -79,7 +79,7 @@ func NewElection(name string, key string, bucket nats.KeyValue, opts ...Option)
},
}

status, err := bucket.Status()
status, err := bucket.Status(context.TODO())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (e *election) debugf(format string, a ...any) {
func (e *election) campaignForLeadership() error {
campaignsCounter.WithLabelValues(e.opts.key, e.opts.name, stateNames[CandidateState]).Inc()

seq, err := e.opts.bucket.Create(e.opts.key, []byte(e.opts.name))
seq, err := e.opts.bucket.Create(context.TODO(), e.opts.key, []byte(e.opts.name))
if err != nil {
e.tries++
return nil
Expand All @@ -139,7 +139,7 @@ func (e *election) campaignForLeadership() error {
func (e *election) maintainLeadership() error {
campaignsCounter.WithLabelValues(e.opts.key, e.opts.name, stateNames[LeaderState]).Inc()

seq, err := e.opts.bucket.Update(e.opts.key, []byte(e.opts.name), e.lastSeq)
seq, err := e.opts.bucket.Update(context.TODO(), e.opts.key, []byte(e.opts.name), e.lastSeq)
if err != nil {
e.debugf("key update failed, moving to candidate state: %v", err)
e.state = CandidateState
Expand Down
15 changes: 7 additions & 8 deletions election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand All @@ -27,19 +28,17 @@ var _ = Describe("Leader Election", func() {
var (
srv *server.Server
nc *nats.Conn
js nats.KeyValueManager
kv nats.KeyValue
js jetstream.KeyValueManager
kv jetstream.KeyValue
err error
debugger func(f string, a ...any)
)

BeforeEach(func() {
skipValidate = false
srv, nc = startJSServer(GinkgoT())
js, err = nc.JetStream()
Expect(err).ToNot(HaveOccurred())

kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
kv, err = js.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
Bucket: "LEADER_ELECTION",
TTL: 750 * time.Millisecond,
})
Expand All @@ -60,7 +59,7 @@ var _ = Describe("Leader Election", func() {

Describe("Election", func() {
It("Should validate the TTL", func() {
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
kv, err := js.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
Bucket: "LE",
TTL: 24 * time.Hour,
})
Expand Down Expand Up @@ -174,10 +173,10 @@ var _ = Describe("Leader Election", func() {
kills++
if kills%3 == 0 {
debugger("deleting key")
Expect(kv.Delete("election")).ToNot(HaveOccurred())
Expect(kv.Delete(context.TODO(), "election")).ToNot(HaveOccurred())
} else {
debugger("corrupting key")
_, err := kv.Put("election", nil)
_, err := kv.Put(context.TODO(), "election", nil)
Expect(err).ToNot(HaveOccurred())
}
}
Expand Down
4 changes: 2 additions & 2 deletions election/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package election
import (
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

// Option configures the election system
Expand All @@ -16,7 +16,7 @@ type Option func(o *options)
type options struct {
name string
key string
bucket nats.KeyValue
bucket jetstream.KeyValue
ttl time.Duration
cInterval time.Duration
wonCb func()
Expand Down
48 changes: 25 additions & 23 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

const (
Expand Down Expand Up @@ -68,8 +69,8 @@ type jetStreamStorage struct {
mgr *jsm.Manager

tasks *taskStorage
configBucket nats.KeyValue
leaderElections nats.KeyValue
configBucket jetstream.KeyValue
leaderElections jetstream.KeyValue
retry RetryPolicyProvider

qStreams map[string]*jsm.Stream
Expand Down Expand Up @@ -502,7 +503,7 @@ func (s *jetStreamStorage) ConfigurationInfo() (*nats.KeyValueBucketStatus, erro
return nil, fmt.Errorf("%w: configuration bucket not configured", ErrStorageNotReady)
}

st, err := s.configBucket.Status()
st, err := s.configBucket.Status(context.TODO())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -575,7 +576,7 @@ func (s *jetStreamStorage) DeleteScheduledTaskByName(name string) error {
return fmt.Errorf("%w: scheduled storage not prepared", ErrStorageNotReady)
}

return s.configBucket.Delete(fmt.Sprintf("scheduled_tasks.%s", name))
return s.configBucket.Delete(context.TODO(), fmt.Sprintf("scheduled_tasks.%s", name))
}

func (s *jetStreamStorage) ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error) {
Expand All @@ -586,7 +587,7 @@ func (s *jetStreamStorage) ScheduledTasks(ctx context.Context) ([]*ScheduledTask
wctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

watch, err := s.configBucket.Watch("scheduled_tasks.*", nats.Context(wctx))
watch, err := s.configBucket.Watch(wctx, "scheduled_tasks.*")
if err != nil {
return nil, err
}
Expand All @@ -601,7 +602,7 @@ func (s *jetStreamStorage) ScheduledTasks(ctx context.Context) ([]*ScheduledTask
return tasks, nil
}

if entry.Operation() != nats.KeyValuePut {
if entry.Operation() != jetstream.KeyValuePut {
continue
}

Expand All @@ -625,7 +626,7 @@ func (s *jetStreamStorage) ScheduledTasksWatch(ctx context.Context) (chan *Sched
return nil, fmt.Errorf("%w: scheduled storage not prepared", ErrStorageNotReady)
}

watch, err := s.configBucket.Watch("scheduled_tasks.*", nats.Context(ctx))
watch, err := s.configBucket.Watch(ctx, "scheduled_tasks.*")
if err != nil {
return nil, err
}
Expand All @@ -641,7 +642,7 @@ func (s *jetStreamStorage) ScheduledTasksWatch(ctx context.Context) (chan *Sched
continue
}

if entry.Operation() == nats.KeyValueDelete || entry.Operation() == nats.KeyValuePurge {
if entry.Operation() == jetstream.KeyValueDelete || entry.Operation() == jetstream.KeyValuePurge {
parts := strings.Split(entry.Key(), ".")
tasks <- &ScheduleWatchEntry{
Name: parts[len(parts)-1],
Expand Down Expand Up @@ -678,7 +679,7 @@ func (s *jetStreamStorage) LoadScheduledTaskByName(name string) (*ScheduledTask,
return nil, fmt.Errorf("%w: scheduled storage not prepared", ErrStorageNotReady)
}

e, err := s.configBucket.Get(fmt.Sprintf("scheduled_tasks.%s", name))
e, err := s.configBucket.Get(context.TODO(), fmt.Sprintf("scheduled_tasks.%s", name))
if err != nil {
if err == nats.ErrKeyNotFound {
return nil, ErrScheduledTaskNotFound
Expand Down Expand Up @@ -710,9 +711,9 @@ func (s *jetStreamStorage) SaveScheduledTask(st *ScheduledTask, update bool) err
var rev uint64

if update {
rev, err = s.configBucket.Put(key, stj)
rev, err = s.configBucket.Put(context.TODO(), key, stj)
} else {
rev, err = s.configBucket.Create(key, stj)
rev, err = s.configBucket.Create(context.TODO(), key, stj)
}
if err != nil {
if strings.Contains(err.Error(), "wrong last sequence") {
Expand All @@ -727,25 +728,26 @@ func (s *jetStreamStorage) SaveScheduledTask(st *ScheduledTask, update bool) err
}

func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) error {
ctx := context.TODO()
var err error

if replicas == 0 {
replicas = 1
}

js, err := s.nc.JetStream()
js, err := jetstream.New(s.nc)
if err != nil {
return err
}

storage := nats.FileStorage
if replicas == 0 {
replicas = 1
}

storage := jetstream.FileStorage
if memory {
storage = nats.MemoryStorage
storage = jetstream.MemoryStorage
}

kv, err := js.KeyValue(ConfigBucketName)
kv, err := js.KeyValue(ctx, ConfigBucketName)
if err == nats.ErrBucketNotFound {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: ConfigBucketName,
Description: "Choria Async Jobs Configuration",
Storage: storage,
Expand All @@ -758,9 +760,9 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int)

s.configBucket = kv

kv, err = js.KeyValue(LeaderElectionBucketName)
kv, err = js.KeyValue(ctx, LeaderElectionBucketName)
if err == nats.ErrBucketNotFound {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: LeaderElectionBucketName,
Description: "Choria Async Jobs Leader Elections",
Storage: storage,
Expand Down Expand Up @@ -908,7 +910,7 @@ func (s *jetStreamStorage) TasksStore() (*jsm.Manager, *jsm.Stream, error) {
}

// ElectionStorage gives access to the key-value store used for elections
func (s *jetStreamStorage) ElectionStorage() (nats.KeyValue, error) {
func (s *jetStreamStorage) ElectionStorage() (jetstream.KeyValue, error) {
if s.leaderElections == nil {
return nil, fmt.Errorf("%s: election bucket not configured", ErrStorageNotReady)
}
Expand Down
8 changes: 4 additions & 4 deletions storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ var _ = Describe("Storage", func() {
err = storage.PrepareConfigurationStore(true, 1)
Expect(err).ToNot(HaveOccurred())

_, err = storage.configBucket.Put("scheduled_tasks.test", []byte("{invalid"))
_, err = storage.configBucket.Put(context.TODO(), "scheduled_tasks.test", []byte("{invalid"))
Expect(err).ToNot(HaveOccurred())

_, err = storage.LoadScheduledTaskByName("test")
Expand Down Expand Up @@ -215,7 +215,7 @@ var _ = Describe("Storage", func() {
err = storage.SaveScheduledTask(st, true)
Expect(err).ToNot(HaveOccurred())

e, err := storage.configBucket.Get(fmt.Sprintf("scheduled_tasks.%s", st.Name))
e, err := storage.configBucket.Get(context.TODO(), fmt.Sprintf("scheduled_tasks.%s", st.Name))
Expect(err).ToNot(HaveOccurred())
st = &ScheduledTask{}
err = json.Unmarshal(e.Value(), st)
Expand Down Expand Up @@ -253,7 +253,7 @@ var _ = Describe("Storage", func() {
err = storage.PrepareConfigurationStore(true, 1)
Expect(err).ToNot(HaveOccurred())

kvs, err := storage.configBucket.Status()
kvs, err := storage.configBucket.Status(context.TODO())
Expect(err).ToNot(HaveOccurred())

Expect(kvs.(*nats.KeyValueBucketStatus).StreamInfo().Config.Storage).To(Equal(nats.MemoryStorage))
Expand All @@ -268,7 +268,7 @@ var _ = Describe("Storage", func() {
err = storage.PrepareConfigurationStore(false, 1)
Expect(err).ToNot(HaveOccurred())

kvs, err := storage.configBucket.Status()
kvs, err := storage.configBucket.Status(context.TODO())
Expect(err).ToNot(HaveOccurred())
stream := kvs.(*nats.KeyValueBucketStatus).StreamInfo().Config
Expect(stream.MaxMsgsPerSubject).To(Equal(int64(1)))
Expand Down

0 comments on commit f6da761

Please sign in to comment.