diff --git a/go.mod b/go.mod index 1848b7b..fba5007 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/edgedelta/leader-election -go 1.16 +go 1.21 require ( github.com/cenkalti/backoff v2.2.1+incompatible @@ -8,3 +8,34 @@ require ( k8s.io/apimachinery v0.21.0 k8s.io/client-go v0.21.0 ) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/evanphx/json-patch v4.9.0+incompatible // indirect + github.com/go-logr/logr v0.4.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/golang/protobuf v1.4.3 // indirect + github.com/google/go-cmp v0.5.2 // indirect + github.com/google/gofuzz v1.1.0 // indirect + github.com/googleapis/gnostic v0.4.1 // indirect + github.com/json-iterator/go v1.1.10 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect + golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect + golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 // indirect + golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect + golang.org/x/text v0.3.4 // indirect + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect + google.golang.org/appengine v1.6.5 // indirect + google.golang.org/protobuf v1.25.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/klog/v2 v2.8.0 // indirect + k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect + k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect + sigs.k8s.io/yaml v1.2.0 // indirect +) diff --git a/leader_election.go b/leader_election.go index 9481e09..a2ef23e 100644 --- a/leader_election.go +++ b/leader_election.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "sync/atomic" "time" "github.com/cenkalti/backoff" @@ -110,6 +109,7 @@ func New(opts ...K8sLeaderEngineOption) (*K8sLeaderEngine, error) { logger: &defaultLogger{}, errorLogger: &defaultLogger{}, stopped: make(chan struct{}), + subscribers: make(map[string]LeaderEngineSubscriber), } for _, o := range opts { @@ -133,7 +133,7 @@ func New(opts ...K8sLeaderEngineOption) (*K8sLeaderEngine, error) { // Start the leader engine and block until a leader is elected. func (le *K8sLeaderEngine) Start() error { - if !atomic.CompareAndSwapInt32(&le.running, 0, 1) { + if !le.isRunning.CompareAndSwap(false, true) { return fmt.Errorf("leader engine is already started") } @@ -157,17 +157,45 @@ func (le *K8sLeaderEngine) Start() error { // Stop the engine and wait until the goroutines have stopped func (le *K8sLeaderEngine) Stop() error { - if !atomic.CompareAndSwapInt32(&le.running, 1, 0) { + if !le.isRunning.CompareAndSwap(true, false) { return fmt.Errorf("leader engine is not started or already stopped") } le.ctxCancel() - le.logger.Log("Leader election engine cancelled internal context, will wait until stopped signal is recieved") + le.logger.Log("Leader election engine cancelled internal context, will wait until stopped signal is received") <-le.stopped le.logger.Log("Leader election engine stopped") return nil } +func (le *K8sLeaderEngine) Subscribe(subscriber LeaderEngineSubscriber) { + le.subscribersMu.Lock() + defer le.subscribersMu.Unlock() + + subscriberName := subscriber.Name() + _, ok := le.subscribers[subscriberName] + if ok { + le.logger.Log("Already have a subscriber with name: %q, skipping", subscriber) + return + } + + le.subscribers[subscriberName] = subscriber +} + +func (le *K8sLeaderEngine) Unsubscribe(subscriber LeaderEngineSubscriber) error { + le.subscribersMu.Lock() + defer le.subscribersMu.Unlock() + + subscriberName := subscriber.Name() + _, ok := le.subscribers[subscriberName] + if ok { + return fmt.Errorf("there is no subscriber with name: %q", subscriberName) + } + + delete(le.subscribers, subscriberName) + return nil +} + func (le *K8sLeaderEngine) GetLeader() string { le.leaderIdentityMutex.Lock() defer le.leaderIdentityMutex.Unlock() @@ -187,7 +215,7 @@ func (le *K8sLeaderEngine) initializeLeaderEngine() error { } } - le.logger.Log("Initializing leader engine with holder identity %q", le.holderIdentity) + le.logger.Log("Initializing leader engine with holder identity: %q", le.holderIdentity) apiClient, err := GetAPIClient() if err != nil { @@ -198,7 +226,7 @@ func (le *K8sLeaderEngine) initializeLeaderEngine() error { le.coordinationClient = apiClient.CoordinationV1() _, err = le.coordinationClient.Leases(le.leaseNamespace).Get(le.parentCtx, le.leaseName, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("failed to obtain leases from namespace %q, err: %v", le.leaseNamespace, err) + return fmt.Errorf("failed to obtain leases from namespace: %q, err: %v", le.leaseNamespace, err) } le.leaderElector, err = le.newLeaderElector(le.parentCtx) @@ -211,7 +239,7 @@ func (le *K8sLeaderEngine) initializeLeaderEngine() error { func (le *K8sLeaderEngine) runLeaderEngine() { for { - le.logger.Log("Starting leader election process for %q under namespace %q", le.holderIdentity, le.leaseNamespace) + le.logger.Log("Starting leader election process for %q under namespace: %q", le.holderIdentity, le.leaseNamespace) le.leaderElector.Run(le.ctx) select { case <-le.ctx.Done(): @@ -241,7 +269,7 @@ func (le *K8sLeaderEngine) getLocalResourceNamespace() string { } ns, err := os.ReadFile(namespaceFilePath) if err != nil { - le.logger.Log("Failed to access namespace file %q, err: %v. Will use 'default' namespace.", err) + le.logger.Log("Failed to access namespace file: %q, err: %v. Will use 'default' namespace.", err) return "default" } return string(ns) diff --git a/leader_election_engine.go b/leader_election_engine.go index 8acb0c6..95f0b33 100644 --- a/leader_election_engine.go +++ b/leader_election_engine.go @@ -32,7 +32,7 @@ func (le *K8sLeaderEngine) newLeaderElector(ctx context.Context) (*ld.LeaderElec callbacks := ld.LeaderCallbacks{ OnNewLeader: func(identity string) { le.updateLeaderIdentity(identity) - le.logger.Log("New leader selected for lease name %q and namespace %q, new leader is %q", le.leaseName, le.leaseNamespace, identity) + le.logger.Log("New leader selected for lease name: %q and namespace: %q, new leader is %q", le.leaseName, le.leaseNamespace, identity) }, OnStartedLeading: func(ctx context.Context) { le.updateLeaderIdentity(le.holderIdentity) @@ -107,7 +107,7 @@ func (le *K8sLeaderEngine) ensureLeaseCreated(ctx context.Context) error { } if !errors.IsConflict(err) || errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create lease %s, err: %v", le.leaseName, err) + return fmt.Errorf("failed to create lease: %s, err: %v", le.leaseName, err) } return nil @@ -121,7 +121,7 @@ func (le *K8sLeaderEngine) getCurrentLeader(ctx context.Context) (string, *coord value, ok := lease.Annotations[rl.LeaderElectionRecordAnnotationKey] if !ok { - le.logger.Log("The lease has no annotation with respect to leader selection, no one is leading at the namespace %q with respect to key %q", le.leaseNamespace, rl.LeaderElectionRecordAnnotationKey) + le.logger.Log("The lease has no annotation with respect to leader selection, no one is leading at the namespace: %q with respect to key: %q", le.leaseNamespace, rl.LeaderElectionRecordAnnotationKey) return "", lease, nil } @@ -135,7 +135,25 @@ func (le *K8sLeaderEngine) getCurrentLeader(ctx context.Context) (string, *coord func (le *K8sLeaderEngine) updateLeaderIdentity(leaderIdentity string) { le.leaderIdentityMutex.Lock() - defer le.leaderIdentityMutex.Unlock() - + previousLeader := le.currentLeaderIdentity le.currentLeaderIdentity = leaderIdentity + le.leaderIdentityMutex.Unlock() + + payload := &NewLeaderPayload{ + CurrentlyLeading: le.IsLeader(), + OldLeader: previousLeader, + NewLeader: leaderIdentity, + } + + le.subscribersMu.Lock() + defer le.subscribersMu.Unlock() + for name, subscriber := range le.subscribers { + name := name + subscriber := subscriber + go func() { + if err := subscriber.NewLeader(payload); err != nil { + le.errorLogger.Log("Failed to send new leader event to subscriber: %q, err: %v", name, err) + } + }() + } } diff --git a/types.go b/types.go index 1ce6bd2..4274322 100644 --- a/types.go +++ b/types.go @@ -3,6 +3,7 @@ package leaderelection import ( "context" "sync" + "sync/atomic" "time" "k8s.io/client-go/kubernetes" @@ -12,9 +13,20 @@ import ( leaderelection "k8s.io/client-go/tools/leaderelection" ) +type NewLeaderPayload struct { + CurrentlyLeading bool + OldLeader string + NewLeader string +} + +type LeaderEngineSubscriber interface { + Name() string + NewLeader(payload *NewLeaderPayload) error +} + type K8sLeaderEngine struct { - running int32 - stopped chan struct{} + isRunning atomic.Bool + stopped chan struct{} parentCtx context.Context ctx context.Context @@ -34,6 +46,9 @@ type K8sLeaderEngine struct { leaderIdentityMutex sync.Mutex currentLeaderIdentity string + subscribersMu sync.Mutex + subscribers map[string]LeaderEngineSubscriber + logger Logger errorLogger Logger }