-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add leadership package #661
base: main
Are you sure you want to change the base?
Conversation
Warning Rate limit exceeded@gfyrag has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 6 minutes and 11 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (8)
WalkthroughThe pull request introduces a new Changes
Suggested Labels
Suggested Reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (6)
internal/leadership/leadership.go (2)
46-52
: Implement backoff strategy to prevent tight looping on errorsIn the
Run
method, ifacquire
continually returns errors (other thancontext.Canceled
), the loop could consume excessive CPU resources due to tight looping.Introduce a backoff or delay before retrying to acquire leadership after an error:
func (l *Leadership) Run(ctx context.Context) { for { if err := l.acquire(ctx); err != nil { if errors.Is(err, context.Canceled) { return } l.logger.Errorf("error acquiring leadership: %s", err) + select { + case <-ctx.Done(): + return + case <-time.After(l.retryPeriod): + } } } }
76-80
: Validate input inWithRetryPeriod
to prevent invalid configurationsThe
WithRetryPeriod
function does not validate theduration
input. A non-positive duration could lead to unintended behavior, such as immediate retries without delay.Add validation to ensure
duration
is positive:func WithRetryPeriod(duration time.Duration) Option { + if duration <= 0 { + panic("retryPeriod must be positive") + } return func(leadership *Leadership) { leadership.retryPeriod = duration } }Alternatively, set a default value instead of panicking:
func WithRetryPeriod(duration time.Duration) Option { return func(leadership *Leadership) { + if duration <= 0 { + leadership.logger.Warn("Invalid retryPeriod; using default of 2s") + duration = 2 * time.Second + } leadership.retryPeriod = duration } }internal/leadership/locker.go (2)
43-45
: Handle errors when closing the connection in the release functionErrors returned by
conn.Close()
are currently ignored. While closing a database connection typically doesn't fail, it's good practice to handle any potential errors.Modify the release function to handle errors:
return true, func() { - _ = conn.Close() + if err := conn.Close(); err != nil { + // Handle or log the error appropriately + fmt.Printf("Error closing DB connection: %v\n", err) + } }, nil
48-50
: Consider returning an error ifdb
is nil inNewDefaultLocker
If
NewDefaultLocker
is called with anil
db
, it could lead to a runtime error later whenTake
is called.Add a check for
nil
and return an error if necessary:func NewDefaultLocker(db *bun.DB) Locker { + if db == nil { + return nil, fmt.Errorf("db cannot be nil") + } return &defaultLocker{db: db} }internal/leadership/signal.go (1)
74-83
: Consider non-blocking signal dispatch to prevent slow listeners from blocking others.The current implementation might block if a listener's channel is full. Consider using a non-blocking send or a separate goroutine for each listener.
Apply this diff to implement non-blocking signal dispatch:
func (h *Signal) Signal(t bool) { h.mu.Lock() defer h.mu.Unlock() h.t = &t for _, inner := range h.inner { - inner.channel <- t + select { + case inner.channel <- t: + default: + // Skip slow listeners + } } }internal/leadership/leadership_test.go (1)
60-61
: Consider making the timeout configurable to prevent flaky tests.The hard-coded 100ms timeout might be too short on slower systems or under high load, leading to flaky tests.
Apply this diff to make the timeout configurable:
+const testTimeout = 500 * time.Millisecond + func TestLeaderShip(t *testing.T) { // ... existing code ... - case <-time.After(100 * time.Millisecond): + case <-time.After(testTimeout): t.Fatal("signal should have been received") }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
cmd/serve.go
(3 hunks)internal/README.md
(99 hunks)internal/leadership/leadership.go
(1 hunks)internal/leadership/leadership_test.go
(1 hunks)internal/leadership/locker.go
(1 hunks)internal/leadership/locker_generated_test.go
(1 hunks)internal/leadership/module.go
(1 hunks)internal/leadership/signal.go
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- internal/README.md
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Tests
🔇 Additional comments (5)
internal/leadership/leadership.go (1)
19-20
:⚠️ Potential issueEnsure
locker.Take(ctx)
handles context cancellation promptlyIn the
acquire
method, iflocker.Take(ctx)
does not respect context cancellation, it may block indefinitely even ifctx
is canceled. To prevent this, ensure that theLocker
implementation properly handles context cancellation and returns immediately whenctx
is done.Verify that
locker.Take(ctx)
is implemented to handle context cancellation appropriately. If not, consider modifying theLocker
interface or its implementations to support this behavior.internal/leadership/module.go (1)
21-23
: Avoid potential goroutine leaks by ensuringRun
exits on context cancellationThe goroutine running
runner.Run(ctx)
should exit when the context is canceled. Ensure thatLeadership.Run
properly handles context cancellation to prevent goroutine leaks.Verify that
Leadership.Run
exits whenctx
is canceled. If not, adjustLeadership.Run
to respect context cancellation.internal/leadership/locker_generated_test.go (1)
1-55
: LGTM! Well-structured mock implementation.The mock implementation is correctly generated and provides the necessary testing capabilities for the Locker interface.
internal/leadership/leadership_test.go (1)
53-73
: Add test cases for edge scenarios.The test covers basic leadership transfer but misses important edge cases:
- Concurrent leadership attempts
- Network failures (error cases)
- Leadership loss during operation
Would you like me to generate additional test cases for these scenarios?
cmd/serve.go (1)
112-112
: LGTM! Clean integration of the leadership module.The leadership module is properly integrated into the service's dependency injection setup.
if acquired { | ||
l.changes.Signal(true) | ||
l.logger.Info("leadership acquired") | ||
<-ctx.Done() | ||
l.logger.Info("leadership lost") | ||
release() | ||
l.changes.Signal(false) | ||
return ctx.Err() | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Release lock immediately upon context cancellation
There is a potential delay in releasing the lock when the context is canceled. If ctx.Done()
is signaled, the lock should be released promptly to avoid holding onto leadership longer than necessary.
Consider restructuring the code to ensure the lock is released immediately upon context cancellation:
if acquired {
l.changes.Signal(true)
l.logger.Info("leadership acquired")
- <-ctx.Done()
+ select {
+ case <-ctx.Done():
+ l.logger.Info("leadership lost")
+ release()
+ l.changes.Signal(false)
+ return ctx.Err()
+ }
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(l.retryPeriod):
}
}
Committable suggestion skipped: line range outside the PR's diff.
) | ||
lc.Append(fx.Hook{ | ||
OnStart: func(ctx context.Context) error { | ||
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.WithoutCancel
is not a standard function
At line 19, context.WithoutCancel(ctx)
is used, but this function does not exist in the standard library. This will result in a compilation error.
Modify the code to use a valid context function. If the intention is to create a child context that isn't canceled when the parent is, consider using context.Background()
. However, be cautious as this may have implications for context propagation.
func(ctx context.Context) error {
- ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
+ ctx, cancel = context.WithCancel(context.Background())
Alternatively, if you want the child context to inherit cancellation, simply use:
func(ctx context.Context) error {
- ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
+ ctx, cancel = context.WithCancel(ctx)
Ensure that the choice aligns with the desired behavior of the application.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx)) | |
ctx, cancel = context.WithCancel(context.Background()) |
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx)) | |
ctx, cancel = context.WithCancel(ctx) |
if err := ret.Scan(&acquired); err != nil { | ||
_ = conn.Close() | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using panic
in library code
Using panic
in a library function can cause the entire application to crash unexpectedly. It's better to return an error to the caller for proper handling.
Replace the panic
with an error return:
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
- panic(err)
+ return false, nil, fmt.Errorf("error scanning result: %w", err)
}
Committable suggestion skipped: line range outside the PR's diff.
if ret.Err() != nil { | ||
_ = conn.Close() | ||
return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect error handling after QueryRowContext
Using ret.Err()
after QueryRowContext
is incorrect because sql.Row
does not have an Err()
method. Errors should be captured from the Scan
method.
Modify the code to check for errors correctly:
ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
- if ret.Err() != nil {
- _ = conn.Close()
- return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
- }
var acquired bool
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
- panic(err)
+ return false, nil, fmt.Errorf("error scanning result: %w", err)
}
Committable suggestion skipped: line range outside the PR's diff.
internal/leadership/signal.go
Outdated
func (h *Signal) copyValue() *bool { | ||
if h.t == nil { | ||
return nil | ||
} | ||
|
||
data, err := json.Marshal(*h.t) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
var ret bool | ||
if err := json.Unmarshal(data, &ret); err != nil { | ||
panic(err) | ||
} | ||
|
||
return &ret | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using a simpler value copying mechanism.
The current implementation uses JSON marshaling/unmarshaling for copying a boolean value, which is unnecessarily complex and can panic.
Apply this diff for a simpler solution:
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}
-
- data, err := json.Marshal(*h.t)
- if err != nil {
- panic(err)
- }
-
- var ret bool
- if err := json.Unmarshal(data, &ret); err != nil {
- panic(err)
- }
-
+ ret := *h.t
return &ret
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (h *Signal) copyValue() *bool { | |
if h.t == nil { | |
return nil | |
} | |
data, err := json.Marshal(*h.t) | |
if err != nil { | |
panic(err) | |
} | |
var ret bool | |
if err := json.Unmarshal(data, &ret); err != nil { | |
panic(err) | |
} | |
return &ret | |
} | |
func (h *Signal) copyValue() *bool { | |
if h.t == nil { | |
return nil | |
} | |
ret := *h.t | |
return &ret | |
} |
internal/leadership/signal.go
Outdated
func (h *Signal) Listen() (<-chan bool, func()) { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
|
||
newChannel := make(chan bool, 1) | ||
index := len(h.inner) | ||
h.inner = append(h.inner, listener{ | ||
channel: newChannel, | ||
}) | ||
if h.t != nil { | ||
newChannel <- *h.copyValue() | ||
} | ||
|
||
return newChannel, func() { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
|
||
if index < len(h.inner)-1 { | ||
h.inner = append(h.inner[:index], h.inner[index+1:]...) | ||
} else { | ||
h.inner = h.inner[:index] | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential race condition in listener removal.
The current implementation might have a race condition if multiple goroutines try to remove listeners concurrently, as the index-based removal assumes the slice hasn't been modified.
Apply this diff to implement safer listener removal:
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan bool, 1)
- index := len(h.inner)
+ listener := listener{
+ channel: newChannel,
+ }
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
- if index < len(h.inner)-1 {
- h.inner = append(h.inner[:index], h.inner[index+1:]...)
- } else {
- h.inner = h.inner[:index]
+ for i, l := range h.inner {
+ if l.channel == newChannel {
+ h.inner = append(h.inner[:i], h.inner[i+1:]...)
+ break
+ }
}
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (h *Signal) Listen() (<-chan bool, func()) { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
newChannel := make(chan bool, 1) | |
index := len(h.inner) | |
h.inner = append(h.inner, listener{ | |
channel: newChannel, | |
}) | |
if h.t != nil { | |
newChannel <- *h.copyValue() | |
} | |
return newChannel, func() { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
if index < len(h.inner)-1 { | |
h.inner = append(h.inner[:index], h.inner[index+1:]...) | |
} else { | |
h.inner = h.inner[:index] | |
} | |
} | |
} | |
func (h *Signal) Listen() (<-chan bool, func()) { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
newChannel := make(chan bool, 1) | |
listener := listener{ | |
channel: newChannel, | |
} | |
h.inner = append(h.inner, listener{ | |
channel: newChannel, | |
}) | |
if h.t != nil { | |
newChannel <- *h.copyValue() | |
} | |
return newChannel, func() { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
for i, l := range h.inner { | |
if l.channel == newChannel { | |
h.inner = append(h.inner[:i], h.inner[i+1:]...) | |
break | |
} | |
} | |
} | |
} |
61adcea
to
6ef70af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
internal/leadership/signal.go (1)
44-53
:⚠️ Potential issueFix potential race condition in listener removal
Using the
index
variable to remove listeners assumes that theinner
slice hasn't been modified between appending the listener and removing it. This can lead to incorrect listener removal if other operations modify the slice concurrently.Apply this diff to safely remove the listener based on the channel reference:
return newChannel, func() { h.mu.Lock() defer h.mu.Unlock() - if index < len(h.inner)-1 { - h.inner = append(h.inner[:index], h.inner[index+1:]...) - } else { - h.inner = h.inner[:index] - } + for i, l := range h.inner { + if l.channel == newChannel { + h.inner = append(h.inner[:i], h.inner[i+1:]...) + break + } + } }This modification searches for the listener by comparing channel references, ensuring the correct listener is removed even if the slice has been altered.
🧹 Nitpick comments (1)
internal/leadership/signal.go (1)
13-13
: Simplify the Mutex field by embeddingsync.Mutex
directlyCurrently, the
mu
field is a pointer to async.Mutex
, which introduces unnecessary pointer indirection. Embeddingsync.Mutex
directly is the idiomatic Go approach and simplifies the code.Apply this diff to simplify the Mutex field:
type Signal struct { - mu *sync.Mutex + mu sync.Mutex t *bool inner []listener - outer chan bool }Update the mutex usage in your methods accordingly (the usage remains the same).
Additionally, in the
NewSignal
function, you no longer need to initializemu
:func NewSignal() *Signal { return &Signal{ - mu: &sync.Mutex{}, } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
cmd/serve.go
(3 hunks)internal/README.md
(99 hunks)internal/leadership/leadership.go
(1 hunks)internal/leadership/leadership_test.go
(1 hunks)internal/leadership/locker.go
(1 hunks)internal/leadership/locker_generated_test.go
(1 hunks)internal/leadership/module.go
(1 hunks)internal/leadership/signal.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- internal/leadership/leadership_test.go
- internal/leadership/module.go
- cmd/serve.go
- internal/leadership/leadership.go
- internal/leadership/locker_generated_test.go
- internal/leadership/locker.go
- internal/README.md
t *bool | ||
|
||
inner []listener | ||
outer chan bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove the unused outer
channel field
The outer
field in the Signal
struct is declared but never used within the code. Removing it will clean up the struct and prevent any confusion about its purpose.
Apply this diff to remove the unused field:
type Signal struct {
mu sync.Mutex
t *bool
inner []listener
- outer chan bool
}
Also, remove the initialization of outer
in the NewSignal
function:
func NewSignal() *Signal {
return &Signal{
- outer: make(chan bool),
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
outer chan bool | |
type Signal struct { | |
mu sync.Mutex | |
t *bool | |
inner []listener | |
} |
for _, inner := range h.inner { | ||
inner.channel <- t | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent potential blocking in Signal()
when sending to listeners
If a listener's channel buffer is full (which can happen if the listener is slow to consume messages), the send operation inner.channel <- t
will block indefinitely, possibly causing a deadlock.
Consider modifying the send operation to a non-blocking send using a select
statement:
for _, inner := range h.inner {
- inner.channel <- t
+ select {
+ case inner.channel <- t:
+ // Sent successfully
+ default:
+ // Listener's channel is full; skip to prevent blocking
+ }
}
Alternatively, you could increase the buffer size of the channels to accommodate more messages, but be cautious as this only mitigates the issue rather than fully resolving it.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for _, inner := range h.inner { | |
inner.channel <- t | |
} | |
for _, inner := range h.inner { | |
select { | |
case inner.channel <- t: | |
// Sent successfully | |
default: | |
// Listener's channel is full; skip to prevent blocking | |
} | |
} |
func (h *Signal) Close() { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
|
||
for _, inner := range h.inner { | ||
close(inner.channel) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent sending on closed channels after Close()
is called
After Close()
is invoked, the listener channels are closed. If Signal()
is subsequently called, it will attempt to send on these closed channels, resulting in a panic.
Introduce an isClosed
flag to track the closed state of the Signal
and prevent signaling after closure:
type Signal struct {
mu sync.Mutex
t *bool
inner []listener
- outer chan bool
+ isClosed bool
}
func (h *Signal) Signal(t bool) {
h.mu.Lock()
defer h.mu.Unlock()
+ if h.isClosed {
+ // Signal has been closed; ignore further signals
+ return
+ }
h.t = &t
for _, inner := range h.inner {
select {
case inner.channel <- t:
// Sent successfully
default:
// Listener's channel is full; skip to prevent blocking
}
}
}
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()
+ h.isClosed = true
for _, inner := range h.inner {
close(inner.channel)
}
}
This ensures that no signals are sent after closure, preventing panics from sending on closed channels.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (h *Signal) Close() { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
for _, inner := range h.inner { | |
close(inner.channel) | |
} | |
} | |
type Signal struct { | |
mu sync.Mutex | |
t *bool | |
inner []listener | |
isClosed bool | |
} | |
func (h *Signal) Signal(t bool) { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
if h.isClosed { | |
// Signal has been closed; ignore further signals | |
return | |
} | |
h.t = &t | |
for _, inner := range h.inner { | |
select { | |
case inner.channel <- t: | |
// Sent successfully | |
default: | |
// Listener's channel is full; skip to prevent blocking | |
} | |
} | |
} | |
func (h *Signal) Close() { | |
h.mu.Lock() | |
defer h.mu.Unlock() | |
h.isClosed = true | |
for _, inner := range h.inner { | |
close(inner.channel) | |
} | |
} |
6ef70af
to
5223fa4
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #661 +/- ##
==========================================
- Coverage 81.71% 81.50% -0.21%
==========================================
Files 131 135 +4
Lines 7059 7207 +148
==========================================
+ Hits 5768 5874 +106
- Misses 990 1025 +35
- Partials 301 308 +7 ☔ View full report in Codecov by Sentry. |
5223fa4
to
96c9f3e
Compare
No description provided.