Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add watchdog mechanism for automatic lock renewal #167

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/base64"
"log"
"time"

"github.com/go-redsync/redsync/v4/redis"
Expand Down Expand Up @@ -34,6 +35,9 @@ type Mutex struct {
setNXOnExtend bool

pools []redis.Pool

watchdogTimeout time.Duration
stopWatchdog chan struct{}
}

// Name returns mutex name (i.e. the Redis key).
Expand Down Expand Up @@ -63,7 +67,15 @@ func (m *Mutex) TryLockContext(ctx context.Context) error {

// Lock locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) Lock() error {
return m.LockContext(context.Background())
err := m.LockContext(context.Background())
if err != nil {
return err
}
if m.watchdogTimeout > 0 {
m.stopWatchdog = make(chan struct{})
go m.startWatchdog()
}
return nil
}

// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
Expand Down Expand Up @@ -135,6 +147,9 @@ func (m *Mutex) lockContext(ctx context.Context, tries int) error {

// Unlock unlocks m and returns the status of unlock.
func (m *Mutex) Unlock() (bool, error) {
if m.stopWatchdog != nil {
close(m.stopWatchdog)
}
return m.UnlockContext(context.Background())
}

Expand Down Expand Up @@ -348,3 +363,22 @@ func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, erro
}
return n, err
}

func (m *Mutex) startWatchdog() {
ticker := time.NewTicker(m.watchdogTimeout / 2)
defer ticker.Stop()

for {
select {
case <-ticker.C:
ok, err := m.ExtendContext(context.Background())
if !ok || err != nil {
log.Printf("Watchdog failed to extend lock: %v", err)
return
}
case <-m.stopWatchdog:
log.Println("Watchdog stopped")
return
}
}
}
26 changes: 26 additions & 0 deletions mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,29 @@ func assertAcquired(ctx context.Context, t *testing.T, pools []redis.Pool, mutex
t.Fatalf("Expected n >= %d, got %d", mutex.quorum, n)
}
}

func TestMutexWatchdog(t *testing.T) {
for k, v := range makeCases(4) {
t.Run(k, func(t *testing.T) {
rs := New(v.pools...)
key := k + "-test-watchdog"
mutex := rs.NewMutex(key, WithExpiry(2*time.Second), WithWatchdogTimeout(2*time.Second))
err := mutex.Lock()
if err != nil {
t.Fatalf("mutex lock failed: %s", err)
}
defer mutex.Unlock()
time.Sleep(5 * time.Second)
if time.Now().Before(mutex.Until()) {
fmt.Println("Lock is still valid")
} else {
t.Fatalf("expected the mutex to still be valid, but it is not")
}
mutex.Unlock()
time.Sleep(4 * time.Second)
if time.Now().Before(mutex.Until()) {
t.Fatalf("expected the mutex to be invalid after expiration, but it is still valid")
}
})
}
}
6 changes: 6 additions & 0 deletions redsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func WithExpiry(expiry time.Duration) Option {
})
}

func WithWatchdogTimeout(timeout time.Duration) Option {
return OptionFunc(func(m *Mutex) {
m.watchdogTimeout = timeout
})
}

// WithTries can be used to set the number of times lock acquire is attempted.
// The default value is 32.
func WithTries(tries int) Option {
Expand Down