Skip to content

Commit

Permalink
Bump Go version, some format changes and add subscription based notif…
Browse files Browse the repository at this point in the history
…ication
  • Loading branch information
orhanasan committed Nov 22, 2023
1 parent 9794d1a commit f01f527
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 16 deletions.
33 changes: 32 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,41 @@
module github.com/edgedelta/leader-election

go 1.16
go 1.21

require (
github.com/cenkalti/backoff v2.2.1+incompatible
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 // indirect
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
golang.org/x/text v0.3.4 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog/v2 v2.8.0 // indirect
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
44 changes: 36 additions & 8 deletions leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
Expand Down Expand Up @@ -110,6 +109,7 @@ func New(opts ...K8sLeaderEngineOption) (*K8sLeaderEngine, error) {
logger: &defaultLogger{},
errorLogger: &defaultLogger{},
stopped: make(chan struct{}),
subscribers: make(map[string]LeaderEngineSubscriber),
}

for _, o := range opts {
Expand All @@ -133,7 +133,7 @@ func New(opts ...K8sLeaderEngineOption) (*K8sLeaderEngine, error) {

// Start the leader engine and block until a leader is elected.
func (le *K8sLeaderEngine) Start() error {
if !atomic.CompareAndSwapInt32(&le.running, 0, 1) {
if !le.isRunning.CompareAndSwap(false, true) {
return fmt.Errorf("leader engine is already started")
}

Expand All @@ -157,17 +157,45 @@ func (le *K8sLeaderEngine) Start() error {

// Stop the engine and wait until the goroutines have stopped
func (le *K8sLeaderEngine) Stop() error {
if !atomic.CompareAndSwapInt32(&le.running, 1, 0) {
if !le.isRunning.CompareAndSwap(true, false) {
return fmt.Errorf("leader engine is not started or already stopped")
}

le.ctxCancel()
le.logger.Log("Leader election engine cancelled internal context, will wait until stopped signal is recieved")
le.logger.Log("Leader election engine cancelled internal context, will wait until stopped signal is received")
<-le.stopped
le.logger.Log("Leader election engine stopped")
return nil
}

func (le *K8sLeaderEngine) Subscribe(subscriber LeaderEngineSubscriber) {
le.subscribersMu.Lock()
defer le.subscribersMu.Unlock()

subscriberName := subscriber.Name()
_, ok := le.subscribers[subscriberName]
if ok {
le.logger.Log("Already have a subscriber with name: %q, skipping", subscriber)
return
}

le.subscribers[subscriberName] = subscriber
}

func (le *K8sLeaderEngine) Unsubscribe(subscriber LeaderEngineSubscriber) error {
le.subscribersMu.Lock()
defer le.subscribersMu.Unlock()

subscriberName := subscriber.Name()
_, ok := le.subscribers[subscriberName]
if ok {
return fmt.Errorf("there is no subscriber with name: %q", subscriberName)
}

delete(le.subscribers, subscriberName)
return nil
}

func (le *K8sLeaderEngine) GetLeader() string {
le.leaderIdentityMutex.Lock()
defer le.leaderIdentityMutex.Unlock()
Expand All @@ -187,7 +215,7 @@ func (le *K8sLeaderEngine) initializeLeaderEngine() error {
}
}

le.logger.Log("Initializing leader engine with holder identity %q", le.holderIdentity)
le.logger.Log("Initializing leader engine with holder identity: %q", le.holderIdentity)

apiClient, err := GetAPIClient()
if err != nil {
Expand All @@ -198,7 +226,7 @@ func (le *K8sLeaderEngine) initializeLeaderEngine() error {
le.coordinationClient = apiClient.CoordinationV1()
_, err = le.coordinationClient.Leases(le.leaseNamespace).Get(le.parentCtx, le.leaseName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to obtain leases from namespace %q, err: %v", le.leaseNamespace, err)
return fmt.Errorf("failed to obtain leases from namespace: %q, err: %v", le.leaseNamespace, err)
}

le.leaderElector, err = le.newLeaderElector(le.parentCtx)
Expand All @@ -211,7 +239,7 @@ func (le *K8sLeaderEngine) initializeLeaderEngine() error {

func (le *K8sLeaderEngine) runLeaderEngine() {
for {
le.logger.Log("Starting leader election process for %q under namespace %q", le.holderIdentity, le.leaseNamespace)
le.logger.Log("Starting leader election process for %q under namespace: %q", le.holderIdentity, le.leaseNamespace)
le.leaderElector.Run(le.ctx)
select {
case <-le.ctx.Done():
Expand Down Expand Up @@ -241,7 +269,7 @@ func (le *K8sLeaderEngine) getLocalResourceNamespace() string {
}
ns, err := os.ReadFile(namespaceFilePath)
if err != nil {
le.logger.Log("Failed to access namespace file %q, err: %v. Will use 'default' namespace.", err)
le.logger.Log("Failed to access namespace file: %q, err: %v. Will use 'default' namespace.", err)
return "default"
}
return string(ns)
Expand Down
28 changes: 23 additions & 5 deletions leader_election_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (le *K8sLeaderEngine) newLeaderElector(ctx context.Context) (*ld.LeaderElec
callbacks := ld.LeaderCallbacks{
OnNewLeader: func(identity string) {
le.updateLeaderIdentity(identity)
le.logger.Log("New leader selected for lease name %q and namespace %q, new leader is %q", le.leaseName, le.leaseNamespace, identity)
le.logger.Log("New leader selected for lease name: %q and namespace: %q, new leader is %q", le.leaseName, le.leaseNamespace, identity)
},
OnStartedLeading: func(ctx context.Context) {
le.updateLeaderIdentity(le.holderIdentity)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (le *K8sLeaderEngine) ensureLeaseCreated(ctx context.Context) error {
}

if !errors.IsConflict(err) || errors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create lease %s, err: %v", le.leaseName, err)
return fmt.Errorf("failed to create lease: %s, err: %v", le.leaseName, err)
}

return nil
Expand All @@ -121,7 +121,7 @@ func (le *K8sLeaderEngine) getCurrentLeader(ctx context.Context) (string, *coord

value, ok := lease.Annotations[rl.LeaderElectionRecordAnnotationKey]
if !ok {
le.logger.Log("The lease has no annotation with respect to leader selection, no one is leading at the namespace %q with respect to key %q", le.leaseNamespace, rl.LeaderElectionRecordAnnotationKey)
le.logger.Log("The lease has no annotation with respect to leader selection, no one is leading at the namespace: %q with respect to key: %q", le.leaseNamespace, rl.LeaderElectionRecordAnnotationKey)
return "", lease, nil
}

Expand All @@ -135,7 +135,25 @@ func (le *K8sLeaderEngine) getCurrentLeader(ctx context.Context) (string, *coord

func (le *K8sLeaderEngine) updateLeaderIdentity(leaderIdentity string) {
le.leaderIdentityMutex.Lock()
defer le.leaderIdentityMutex.Unlock()

previousLeader := le.currentLeaderIdentity
le.currentLeaderIdentity = leaderIdentity
le.leaderIdentityMutex.Unlock()

payload := &NewLeaderPayload{
CurrentlyLeading: le.IsLeader(),
OldLeader: previousLeader,
NewLeader: leaderIdentity,
}

le.subscribersMu.Lock()
defer le.subscribersMu.Unlock()
for name, subscriber := range le.subscribers {
name := name
subscriber := subscriber
go func() {
if err := subscriber.NewLeader(payload); err != nil {
le.errorLogger.Log("Failed to send new leader event to subscriber: %q, err: %v", name, err)
}
}()
}
}
19 changes: 17 additions & 2 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package leaderelection
import (
"context"
"sync"
"sync/atomic"
"time"

"k8s.io/client-go/kubernetes"
Expand All @@ -12,9 +13,20 @@ import (
leaderelection "k8s.io/client-go/tools/leaderelection"
)

type NewLeaderPayload struct {
CurrentlyLeading bool
OldLeader string
NewLeader string
}

type LeaderEngineSubscriber interface {
Name() string
NewLeader(payload *NewLeaderPayload) error
}

type K8sLeaderEngine struct {
running int32
stopped chan struct{}
isRunning atomic.Bool

Check failure on line 28 in types.go

View workflow job for this annotation

GitHub Actions / build

undefined: atomic.Bool
stopped chan struct{}

parentCtx context.Context
ctx context.Context
Expand All @@ -34,6 +46,9 @@ type K8sLeaderEngine struct {
leaderIdentityMutex sync.Mutex
currentLeaderIdentity string

subscribersMu sync.Mutex
subscribers map[string]LeaderEngineSubscriber

logger Logger
errorLogger Logger
}
Expand Down

0 comments on commit f01f527

Please sign in to comment.