Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leadership package #661

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

feat: add leadership package #661

wants to merge 1 commit into from

Conversation

gfyrag
Copy link
Contributor

@gfyrag gfyrag commented Jan 23, 2025

No description provided.

@gfyrag gfyrag requested a review from a team as a code owner January 23, 2025 10:30
Copy link

coderabbitai bot commented Jan 23, 2025

Warning

Rate limit exceeded

@gfyrag has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 6 minutes and 11 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 6ef70af and 96c9f3e.

📒 Files selected for processing (8)
  • cmd/serve.go (3 hunks)
  • internal/doc.go (1 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/leadership_test.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/locker_generated_test.go (1 hunks)
  • internal/leadership/module.go (1 hunks)
  • internal/leadership/signal.go (1 hunks)

Walkthrough

The pull request introduces a new leadership package in the internal directory, implementing a leadership acquisition mechanism for distributed systems. The package provides a robust way to manage leadership through a locking mechanism, with support for Uber FX dependency injection. The changes include creating multiple files that define a Locker interface, a Leadership struct, signal handling, and test utilities. The implementation allows for acquiring and releasing leadership with configurable retry periods and provides a clean, concurrent-safe approach to leadership management.

Changes

File Change Summary
cmd/serve.go Added import and integration of leadership.NewFXModule() to service configuration
internal/README.md Removed GitHub links from function and type declarations, simplifying documentation
internal/leadership/leadership.go Implemented core leadership acquisition logic with signal handling and retry mechanism
internal/leadership/leadership_test.go Added test suite for leadership functionality
internal/leadership/locker.go Defined Locker interface and default implementation for lock acquisition
internal/leadership/locker_generated_test.go Created mock implementation for Locker interface
internal/leadership/module.go Added FX module for dependency injection of leadership components
internal/leadership/signal.go Implemented signal handling mechanism for leadership changes

Suggested Labels

build-images

Suggested Reviewers

  • flemzord
  • paul-nicolas

Poem

🐰 Leadership's dance, a rabbit's delight,
Locks acquired with algorithmic might
Signals bounce, goroutines take flight
Concurrency leaps with technical height
FX injects, the system burns bright! 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (6)
internal/leadership/leadership.go (2)

46-52: Implement backoff strategy to prevent tight looping on errors

In the Run method, if acquire continually returns errors (other than context.Canceled), the loop could consume excessive CPU resources due to tight looping.

Introduce a backoff or delay before retrying to acquire leadership after an error:

	func (l *Leadership) Run(ctx context.Context) {
		for {
			if err := l.acquire(ctx); err != nil {
				if errors.Is(err, context.Canceled) {
					return
				}
				l.logger.Errorf("error acquiring leadership: %s", err)
+				select {
+				case <-ctx.Done():
+					return
+				case <-time.After(l.retryPeriod):
+				}
			}
		}
	}

76-80: Validate input in WithRetryPeriod to prevent invalid configurations

The WithRetryPeriod function does not validate the duration input. A non-positive duration could lead to unintended behavior, such as immediate retries without delay.

Add validation to ensure duration is positive:

	func WithRetryPeriod(duration time.Duration) Option {
+		if duration <= 0 {
+			panic("retryPeriod must be positive")
+		}
		return func(leadership *Leadership) {
			leadership.retryPeriod = duration
		}
	}

Alternatively, set a default value instead of panicking:

	func WithRetryPeriod(duration time.Duration) Option {
		return func(leadership *Leadership) {
+			if duration <= 0 {
+				leadership.logger.Warn("Invalid retryPeriod; using default of 2s")
+				duration = 2 * time.Second
+			}
			leadership.retryPeriod = duration
		}
	}
internal/leadership/locker.go (2)

43-45: Handle errors when closing the connection in the release function

Errors returned by conn.Close() are currently ignored. While closing a database connection typically doesn't fail, it's good practice to handle any potential errors.

Modify the release function to handle errors:

	return true, func() {
-		_ = conn.Close()
+		if err := conn.Close(); err != nil {
+			// Handle or log the error appropriately
+			fmt.Printf("Error closing DB connection: %v\n", err)
+		}
	}, nil

48-50: Consider returning an error if db is nil in NewDefaultLocker

If NewDefaultLocker is called with a nil db, it could lead to a runtime error later when Take is called.

Add a check for nil and return an error if necessary:

	func NewDefaultLocker(db *bun.DB) Locker {
+		if db == nil {
+			return nil, fmt.Errorf("db cannot be nil")
+		}
		return &defaultLocker{db: db}
	}
internal/leadership/signal.go (1)

74-83: Consider non-blocking signal dispatch to prevent slow listeners from blocking others.

The current implementation might block if a listener's channel is full. Consider using a non-blocking send or a separate goroutine for each listener.

Apply this diff to implement non-blocking signal dispatch:

 func (h *Signal) Signal(t bool) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	h.t = &t
 
 	for _, inner := range h.inner {
-		inner.channel <- t
+		select {
+		case inner.channel <- t:
+		default:
+			// Skip slow listeners
+		}
 	}
 }
internal/leadership/leadership_test.go (1)

60-61: Consider making the timeout configurable to prevent flaky tests.

The hard-coded 100ms timeout might be too short on slower systems or under high load, leading to flaky tests.

Apply this diff to make the timeout configurable:

+const testTimeout = 500 * time.Millisecond
+
 func TestLeaderShip(t *testing.T) {
     // ... existing code ...
-    case <-time.After(100 * time.Millisecond):
+    case <-time.After(testTimeout):
         t.Fatal("signal should have been received")
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c10b32b and 61adcea.

📒 Files selected for processing (8)
  • cmd/serve.go (3 hunks)
  • internal/README.md (99 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/leadership_test.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/locker_generated_test.go (1 hunks)
  • internal/leadership/module.go (1 hunks)
  • internal/leadership/signal.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • internal/README.md
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Tests
🔇 Additional comments (5)
internal/leadership/leadership.go (1)

19-20: ⚠️ Potential issue

Ensure locker.Take(ctx) handles context cancellation promptly

In the acquire method, if locker.Take(ctx) does not respect context cancellation, it may block indefinitely even if ctx is canceled. To prevent this, ensure that the Locker implementation properly handles context cancellation and returns immediately when ctx is done.

Verify that locker.Take(ctx) is implemented to handle context cancellation appropriately. If not, consider modifying the Locker interface or its implementations to support this behavior.

internal/leadership/module.go (1)

21-23: Avoid potential goroutine leaks by ensuring Run exits on context cancellation

The goroutine running runner.Run(ctx) should exit when the context is canceled. Ensure that Leadership.Run properly handles context cancellation to prevent goroutine leaks.

Verify that Leadership.Run exits when ctx is canceled. If not, adjust Leadership.Run to respect context cancellation.

internal/leadership/locker_generated_test.go (1)

1-55: LGTM! Well-structured mock implementation.

The mock implementation is correctly generated and provides the necessary testing capabilities for the Locker interface.

internal/leadership/leadership_test.go (1)

53-73: Add test cases for edge scenarios.

The test covers basic leadership transfer but misses important edge cases:

  • Concurrent leadership attempts
  • Network failures (error cases)
  • Leadership loss during operation

Would you like me to generate additional test cases for these scenarios?

cmd/serve.go (1)

112-112: LGTM! Clean integration of the leadership module.

The leadership module is properly integrated into the service's dependency injection setup.

Comment on lines +25 to +33
if acquired {
l.changes.Signal(true)
l.logger.Info("leadership acquired")
<-ctx.Done()
l.logger.Info("leadership lost")
release()
l.changes.Signal(false)
return ctx.Err()
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Release lock immediately upon context cancellation

There is a potential delay in releasing the lock when the context is canceled. If ctx.Done() is signaled, the lock should be released promptly to avoid holding onto leadership longer than necessary.

Consider restructuring the code to ensure the lock is released immediately upon context cancellation:

	if acquired {
		l.changes.Signal(true)
		l.logger.Info("leadership acquired")
-		<-ctx.Done()
+		select {
+		case <-ctx.Done():
+			l.logger.Info("leadership lost")
+			release()
+			l.changes.Signal(false)
+			return ctx.Err()
+		}
	} else {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(l.retryPeriod):
		}
	}

Committable suggestion skipped: line range outside the PR's diff.

)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

context.WithoutCancel is not a standard function

At line 19, context.WithoutCancel(ctx) is used, but this function does not exist in the standard library. This will result in a compilation error.

Modify the code to use a valid context function. If the intention is to create a child context that isn't canceled when the parent is, consider using context.Background(). However, be cautious as this may have implications for context propagation.

	func(ctx context.Context) error {
-		ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
+		ctx, cancel = context.WithCancel(context.Background())

Alternatively, if you want the child context to inherit cancellation, simply use:

	func(ctx context.Context) error {
-		ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
+		ctx, cancel = context.WithCancel(ctx)

Ensure that the choice aligns with the desired behavior of the application.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
ctx, cancel = context.WithCancel(context.Background())
Suggested change
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
ctx, cancel = context.WithCancel(ctx)

Comment on lines +33 to +35
if err := ret.Scan(&acquired); err != nil {
_ = conn.Close()
panic(err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using panic in library code

Using panic in a library function can cause the entire application to crash unexpectedly. It's better to return an error to the caller for proper handling.

Replace the panic with an error return:

	if err := ret.Scan(&acquired); err != nil {
		_ = conn.Close()
-		panic(err)
+		return false, nil, fmt.Errorf("error scanning result: %w", err)
	}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +27 to +29
if ret.Err() != nil {
_ = conn.Close()
return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error handling after QueryRowContext

Using ret.Err() after QueryRowContext is incorrect because sql.Row does not have an Err() method. Errors should be captured from the Scan method.

Modify the code to check for errors correctly:

	ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey)
-	if ret.Err() != nil {
-		_ = conn.Close()
-		return false, nil, fmt.Errorf("error acquiring lock: %w", ret.Err())
-	}
	
	var acquired bool
	if err := ret.Scan(&acquired); err != nil {
		_ = conn.Close()
-		panic(err)
+		return false, nil, fmt.Errorf("error scanning result: %w", err)
	}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 31 to 29
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}

data, err := json.Marshal(*h.t)
if err != nil {
panic(err)
}

var ret bool
if err := json.Unmarshal(data, &ret); err != nil {
panic(err)
}

return &ret
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a simpler value copying mechanism.

The current implementation uses JSON marshaling/unmarshaling for copying a boolean value, which is unnecessarily complex and can panic.

Apply this diff for a simpler solution:

 func (h *Signal) copyValue() *bool {
 	if h.t == nil {
 		return nil
 	}
-
-	data, err := json.Marshal(*h.t)
-	if err != nil {
-		panic(err)
-	}
-
-	var ret bool
-	if err := json.Unmarshal(data, &ret); err != nil {
-		panic(err)
-	}
-
+	ret := *h.t
 	return &ret
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}
data, err := json.Marshal(*h.t)
if err != nil {
panic(err)
}
var ret bool
if err := json.Unmarshal(data, &ret); err != nil {
panic(err)
}
return &ret
}
func (h *Signal) copyValue() *bool {
if h.t == nil {
return nil
}
ret := *h.t
return &ret
}

Comment on lines 49 to 54
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()

newChannel := make(chan bool, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}

return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()

if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race condition in listener removal.

The current implementation might have a race condition if multiple goroutines try to remove listeners concurrently, as the index-based removal assumes the slice hasn't been modified.

Apply this diff to implement safer listener removal:

 func (h *Signal) Listen() (<-chan bool, func()) {
 	h.mu.Lock()
 	defer h.mu.Unlock()
 
 	newChannel := make(chan bool, 1)
-	index := len(h.inner)
+	listener := listener{
+		channel: newChannel,
+	}
 	h.inner = append(h.inner, listener{
 		channel: newChannel,
 	})
 	if h.t != nil {
 		newChannel <- *h.copyValue()
 	}
 
 	return newChannel, func() {
 		h.mu.Lock()
 		defer h.mu.Unlock()
 
-		if index < len(h.inner)-1 {
-			h.inner = append(h.inner[:index], h.inner[index+1:]...)
-		} else {
-			h.inner = h.inner[:index]
+		for i, l := range h.inner {
+			if l.channel == newChannel {
+				h.inner = append(h.inner[:i], h.inner[i+1:]...)
+				break
+			}
 		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan bool, 1)
index := len(h.inner)
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
if index < len(h.inner)-1 {
h.inner = append(h.inner[:index], h.inner[index+1:]...)
} else {
h.inner = h.inner[:index]
}
}
}
func (h *Signal) Listen() (<-chan bool, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan bool, 1)
listener := listener{
channel: newChannel,
}
h.inner = append(h.inner, listener{
channel: newChannel,
})
if h.t != nil {
newChannel <- *h.copyValue()
}
return newChannel, func() {
h.mu.Lock()
defer h.mu.Unlock()
for i, l := range h.inner {
if l.channel == newChannel {
h.inner = append(h.inner[:i], h.inner[i+1:]...)
break
}
}
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
internal/leadership/signal.go (1)

44-53: ⚠️ Potential issue

Fix potential race condition in listener removal

Using the index variable to remove listeners assumes that the inner slice hasn't been modified between appending the listener and removing it. This can lead to incorrect listener removal if other operations modify the slice concurrently.

Apply this diff to safely remove the listener based on the channel reference:

 return newChannel, func() {
 	h.mu.Lock()
 	defer h.mu.Unlock()

-	if index < len(h.inner)-1 {
-		h.inner = append(h.inner[:index], h.inner[index+1:]...)
-	} else {
-		h.inner = h.inner[:index]
-	}
+	for i, l := range h.inner {
+		if l.channel == newChannel {
+			h.inner = append(h.inner[:i], h.inner[i+1:]...)
+			break
+		}
+	}
 }

This modification searches for the listener by comparing channel references, ensuring the correct listener is removed even if the slice has been altered.

🧹 Nitpick comments (1)
internal/leadership/signal.go (1)

13-13: Simplify the Mutex field by embedding sync.Mutex directly

Currently, the mu field is a pointer to a sync.Mutex, which introduces unnecessary pointer indirection. Embedding sync.Mutex directly is the idiomatic Go approach and simplifies the code.

Apply this diff to simplify the Mutex field:

 type Signal struct {
-	mu *sync.Mutex
+	mu sync.Mutex
 	t  *bool

 	inner []listener
-	outer chan bool
 }

Update the mutex usage in your methods accordingly (the usage remains the same).

Additionally, in the NewSignal function, you no longer need to initialize mu:

 func NewSignal() *Signal {
 	return &Signal{
-		mu:    &sync.Mutex{},
 	}
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 61adcea and 6ef70af.

📒 Files selected for processing (8)
  • cmd/serve.go (3 hunks)
  • internal/README.md (99 hunks)
  • internal/leadership/leadership.go (1 hunks)
  • internal/leadership/leadership_test.go (1 hunks)
  • internal/leadership/locker.go (1 hunks)
  • internal/leadership/locker_generated_test.go (1 hunks)
  • internal/leadership/module.go (1 hunks)
  • internal/leadership/signal.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • internal/leadership/leadership_test.go
  • internal/leadership/module.go
  • cmd/serve.go
  • internal/leadership/leadership.go
  • internal/leadership/locker_generated_test.go
  • internal/leadership/locker.go
  • internal/README.md

t *bool

inner []listener
outer chan bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove the unused outer channel field

The outer field in the Signal struct is declared but never used within the code. Removing it will clean up the struct and prevent any confusion about its purpose.

Apply this diff to remove the unused field:

 type Signal struct {
 	mu sync.Mutex
 	t  *bool

 	inner []listener
-	outer chan bool
 }

Also, remove the initialization of outer in the NewSignal function:

 func NewSignal() *Signal {
 	return &Signal{
-		outer: make(chan bool),
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
outer chan bool
type Signal struct {
mu sync.Mutex
t *bool
inner []listener
}

Comment on lines +62 to +64
for _, inner := range h.inner {
inner.channel <- t
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential blocking in Signal() when sending to listeners

If a listener's channel buffer is full (which can happen if the listener is slow to consume messages), the send operation inner.channel <- t will block indefinitely, possibly causing a deadlock.

Consider modifying the send operation to a non-blocking send using a select statement:

 for _, inner := range h.inner {
-	inner.channel <- t
+	select {
+	case inner.channel <- t:
+		// Sent successfully
+	default:
+		// Listener's channel is full; skip to prevent blocking
+	}
 }

Alternatively, you could increase the buffer size of the channels to accommodate more messages, but be cautious as this only mitigates the issue rather than fully resolving it.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, inner := range h.inner {
inner.channel <- t
}
for _, inner := range h.inner {
select {
case inner.channel <- t:
// Sent successfully
default:
// Listener's channel is full; skip to prevent blocking
}
}

Comment on lines +67 to +74
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for _, inner := range h.inner {
close(inner.channel)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent sending on closed channels after Close() is called

After Close() is invoked, the listener channels are closed. If Signal() is subsequently called, it will attempt to send on these closed channels, resulting in a panic.

Introduce an isClosed flag to track the closed state of the Signal and prevent signaling after closure:

 type Signal struct {
 	mu sync.Mutex
 	t  *bool

 	inner []listener
-	outer chan bool
+	isClosed bool
 }

 func (h *Signal) Signal(t bool) {
 	h.mu.Lock()
 	defer h.mu.Unlock()

+	if h.isClosed {
+		// Signal has been closed; ignore further signals
+		return
+	}

 	h.t = &t

 	for _, inner := range h.inner {
 		select {
 		case inner.channel <- t:
 			// Sent successfully
 		default:
 			// Listener's channel is full; skip to prevent blocking
 		}
 	}
 }

 func (h *Signal) Close() {
 	h.mu.Lock()
 	defer h.mu.Unlock()

+	h.isClosed = true

 	for _, inner := range h.inner {
 		close(inner.channel)
 	}
 }

This ensures that no signals are sent after closure, preventing panics from sending on closed channels.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()
for _, inner := range h.inner {
close(inner.channel)
}
}
type Signal struct {
mu sync.Mutex
t *bool
inner []listener
isClosed bool
}
func (h *Signal) Signal(t bool) {
h.mu.Lock()
defer h.mu.Unlock()
if h.isClosed {
// Signal has been closed; ignore further signals
return
}
h.t = &t
for _, inner := range h.inner {
select {
case inner.channel <- t:
// Sent successfully
default:
// Listener's channel is full; skip to prevent blocking
}
}
}
func (h *Signal) Close() {
h.mu.Lock()
defer h.mu.Unlock()
h.isClosed = true
for _, inner := range h.inner {
close(inner.channel)
}
}

Copy link

codecov bot commented Jan 23, 2025

Codecov Report

Attention: Patch coverage is 78.70968% with 33 lines in your changes missing coverage. Please review.

Project coverage is 81.50%. Comparing base (c10b32b) to head (96c9f3e).

Files with missing lines Patch % Lines
internal/leadership/signal.go 58.18% 21 Missing and 2 partials ⚠️
internal/leadership/locker.go 69.56% 5 Missing and 2 partials ⚠️
internal/leadership/module.go 91.66% 2 Missing ⚠️
internal/leadership/leadership.go 97.77% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #661      +/-   ##
==========================================
- Coverage   81.71%   81.50%   -0.21%     
==========================================
  Files         131      135       +4     
  Lines        7059     7207     +148     
==========================================
+ Hits         5768     5874     +106     
- Misses        990     1025      +35     
- Partials      301      308       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant