Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: etcd discovery mechanism on grpc with idle manager #4589

Merged
merged 3 commits into from
Jan 22, 2025

Conversation

kevwan
Copy link
Contributor

@kevwan kevwan commented Jan 19, 2025

No description provided.

Copy link

codecov bot commented Jan 19, 2025

Codecov Report

Attention: Patch coverage is 85.78680% with 28 lines in your changes missing coverage. Please review.

Project coverage is 94.63%. Comparing base (8690859) to head (1475f7b).
Report is 220 commits behind head on master.

Files with missing lines Patch % Lines
core/discov/internal/registry.go 89.53% 15 Missing and 3 partials ⚠️
zrpc/resolver/internal/discovbuilder.go 0.00% 7 Missing ⚠️
zrpc/resolver/internal/kubebuilder.go 0.00% 2 Missing ⚠️
core/discov/subscriber.go 93.75% 1 Missing ⚠️
Additional details and impacted files
Files with missing lines Coverage Δ
zrpc/resolver/internal/resolver.go 100.00% <ø> (ø)
core/discov/subscriber.go 84.87% <93.75%> (+0.05%) ⬆️
zrpc/resolver/internal/kubebuilder.go 16.17% <0.00%> (-4.58%) ⬇️
zrpc/resolver/internal/discovbuilder.go 30.30% <0.00%> (-8.16%) ⬇️
core/discov/internal/registry.go 86.96% <89.53%> (+8.06%) ⬆️

... and 7 files with indirect coverage changes

@kevwan
Copy link
Contributor Author

kevwan commented Jan 21, 2025

1. Code Structure and Readability

  • Overall Structure: The code is well-organized into logical components (e.g., Registry, cluster, watchKey, etc.). The use of interfaces (EtcdClient, UpdateListener) and encapsulation is good.
  • Readability: The code is generally readable, with meaningful variable names and comments where necessary. However, some areas could benefit from additional comments to explain complex logic (e.g., handleChanges, watchStream).

2. Concurrency and Thread Safety

  • Locks: The code uses sync.RWMutex extensively to protect shared resources (e.g., Registry.clusters, cluster.watchers). This is good practice for thread safety.
  • Potential Race Conditions:
    • In handleChanges, the watcher.values map is updated outside the lock after calculating add and remove slices. This could lead to race conditions if another goroutine modifies watcher.values concurrently. Consider holding the lock until all updates are complete.
    • In monitor, the watcher.listeners slice is updated without holding the lock in some cases. Ensure all modifications to shared state are protected by locks.

3. Error Handling

  • General Error Handling: The code handles errors appropriately in most places (e.g., logging errors and retrying operations like load and watchStream).
  • Improvements:
    • In watchStream, the error errClosed is returned when the watch channel is closed. However, this error is not logged, making it harder to debug. Consider logging this error.
    • In DialClient, errors from clientv3.New are not wrapped with additional context. Use fmt.Errorf to provide more context about the failure.

4. Resource Management

  • Connection Management: The use of syncx.NewResourceManager() to manage etcd client connections is a good practice. It ensures that connections are reused and properly closed.
  • Memory Leaks:
    • In Unmonitor, when a watcher is removed, its cancel function is called, but the associated resources (e.g., goroutines) are not explicitly cleaned up. Ensure all resources are released properly.
    • In reload, the old done channel is closed, but the new one is created without ensuring the old one is fully drained. This could lead to goroutine leaks.

5. Performance

  • Efficiency: The code is generally efficient, with minimal allocations and efficient use of locks.
  • Improvements:
    • In handleChanges, the add and remove slices are created with append, which can lead to multiple allocations. Consider pre-allocating these slices with a reasonable capacity.
    • In load, the Get request is retried in a loop with a fixed sleep interval. Consider using exponential backoff to reduce load on the etcd server.

6. Testing and Edge Cases

  • Edge Cases:
    • The code handles etcd compaction errors (rpctypes.ErrCompacted) by reloading the state. This is good.
    • However, there is no handling for network partitions or long-term etcd unavailability. Consider adding retry logic with a maximum retry count or timeout.
  • Testing:
    • The code lacks unit tests. Adding tests for key functionality (e.g., Monitor, Unmonitor, handleChanges) would improve reliability.

7. Specific Code Improvements

a. handleChanges

  • The logic for calculating add and remove slices is complex and could be simplified. Consider breaking it into helper functions.
  • Example:
    func calculateChanges(oldVals, newVals map[string]string) (add, remove []KV) {
        for k, v := range newVals {
            if oldVal, ok := oldVals[k]; !ok || oldVal != v {
                add = append(add, KV{Key: k, Val: v})
            }
        }
        for k, v := range oldVals {
            if _, ok := newVals[k]; !ok {
                remove = append(remove, KV{Key: k, Val: v})
            }
        }
        return
    }

b. watchStream

  • The watchStream function could be simplified by separating the watch setup logic from the event handling logic.
  • Example:
    func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (clientv3.WatchChan, context.CancelFunc, error) {
        ctx, cancel := context.WithCancel(cli.Ctx())
        if watcher, ok := c.watchers[key]; ok {
            watcher.cancel = cancel
        } else {
            val := newWatchValue()
            val.cancel = cancel
            c.lock.Lock()
            c.watchers[key] = val
            c.lock.Unlock()
        }
    
        wkey := key.key
        ops := []clientv3.OpOption{clientv3.WithRequireLeader(ctx)}
        if !key.exactMatch {
            wkey = makeKeyPrefix(key.key)
            ops = append(ops, clientv3.WithPrefix())
        }
        if rev != 0 {
            ops = append(ops, clientv3.WithRev(rev+1))
        }
    
        return cli.Watch(ctx, wkey, ops...), cancel, nil
    }

c. reload

  • The reload function could be optimized by batching the reload operations for all keys instead of processing them sequentially.

8. Documentation

  • Public Functions: Add doc comments for public functions (e.g., GetRegistry, Monitor, Unmonitor) to describe their purpose, parameters, and return values.
  • Complex Logic: Add comments to explain complex logic (e.g., handleChanges, watchStream).

9. Logging

  • The code uses logx for logging, which is good. However, some critical errors (e.g., errClosed) are not logged. Ensure all errors are logged with sufficient context.

Summary of Recommendations

  1. Improve thread safety by ensuring all shared state modifications are protected by locks.
  2. Add error logging for critical errors (e.g., errClosed).
  3. Simplify complex logic (e.g., handleChanges, watchStream) by breaking it into helper functions.
  4. Add unit tests for key functionality.
  5. Document public functions and complex logic.
  6. Optimize resource management to prevent leaks.
  7. Handle edge cases like network partitions and long-term etcd unavailability.

@kevwan kevwan force-pushed the fix/etcd-discovery branch from 8e44511 to 15861b6 Compare January 21, 2025 16:26
@kevwan kevwan force-pushed the fix/etcd-discovery branch from 15861b6 to bf98035 Compare January 21, 2025 16:54
@kevwan kevwan merged commit bf88310 into zeromicro:master Jan 22, 2025
5 of 6 checks passed
@kevwan kevwan deleted the fix/etcd-discovery branch January 22, 2025 06:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant