Skip to content

Commit

Permalink
optimize connection pool and test
Browse files Browse the repository at this point in the history
* fixes data-race between Get() and Close() of channelPool
* using sync.RWMutex instead of sync.Mutex in boundedPool
* refactor idle timeout in pool
* add connection pool test
* replace fatih/pool.v2 with custom pool
  • Loading branch information
chengshiwen committed Sep 6, 2024
1 parent 1805395 commit f18eec0
Show file tree
Hide file tree
Showing 7 changed files with 630 additions and 75 deletions.
1 change: 0 additions & 1 deletion DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
- golang.org/x/sys [BSD LICENSE](https://github.com/golang/sys/blob/master/LICENSE)
- golang.org/x/text [BSD LICENSE](https://github.com/golang/text/blob/master/LICENSE)
- golang.org/x/time [BSD LICENSE](https://github.com/golang/time/blob/master/LICENSE)
- gopkg.in/fatih/pool.v2 [MIT LICENSE](https://github.com/fatih/pool/blob/v2.0.0/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
- github.com/xlab/treeprint [MIT LICENSE](https://github.com/xlab/treeprint/blob/master/LICENSE)

Expand Down
12 changes: 5 additions & 7 deletions coordinator/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,26 @@ package coordinator
import (
"net"
"sync"

"gopkg.in/fatih/pool.v2"
)

type clientPool struct {
mu sync.RWMutex
pool map[uint64]pool.Pool
pool map[uint64]Pool
}

func newClientPool() *clientPool {
return &clientPool{
pool: make(map[uint64]pool.Pool),
pool: make(map[uint64]Pool),
}
}

func (c *clientPool) setPool(nodeID uint64, p pool.Pool) {
func (c *clientPool) setPool(nodeID uint64, p Pool) {
c.mu.Lock()
c.pool[nodeID] = p
c.mu.Unlock()
}

func (c *clientPool) getPool(nodeID uint64) (pool.Pool, bool) {
func (c *clientPool) getPool(nodeID uint64) (Pool, bool) {
c.mu.RLock()
p, ok := c.pool[nodeID]
c.mu.RUnlock()
Expand All @@ -35,7 +33,7 @@ func (c *clientPool) size() int {
c.mu.RLock()
var size int
for _, p := range c.pool {
size += p.Len()
size += p.Size()
}
c.mu.RUnlock()
return size
Expand Down
3 changes: 0 additions & 3 deletions coordinator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ const (
// remains idle in the connection pool.
DefaultPoolMaxIdleTime = time.Minute

// DefaultPoolWaitTimeout is the default timeout waiting for free connection.
DefaultPoolWaitTimeout = 5 * time.Second

// DefaultWriteTimeout is the default timeout for a complete write to succeed.
DefaultWriteTimeout = 10 * time.Second

Expand Down
211 changes: 150 additions & 61 deletions coordinator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,36 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)

var (
// ErrClosed is the error resulting if the pool is closed via pool.Close().
ErrClosed = errors.New("pool is closed")

"gopkg.in/fatih/pool.v2"
// PoolWaitTimeout is the timeout waiting for free connection.
PoolWaitTimeout = 5 * time.Second
)

// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is thread-safe and easy to use.
type Pool interface {
// Get returns a new connection from the pool. Closing the connections puts
// it back to the Pool. Closing it when the pool is destroyed or full will
// be counted as an error.
Get() (net.Conn, error)

// Close closes the pool and all its connections. After Close() the pool is
// no longer usable.
Close()

// Len returns the current number of idle connections of the pool.
Len() int

// Size returns the total number of alive connections of the pool.
Size() int
}

// idleConn implements idle connection.
type idleConn struct {
c net.Conn
Expand All @@ -20,13 +44,11 @@ type idleConn struct {
// boundedPool implements the Pool interface based on buffered channels.
type boundedPool struct {
// storage for our net.Conn connections
mu sync.Mutex
mu sync.RWMutex
conns chan *idleConn

idleTimeout time.Duration
waitTimeout time.Duration

total int32
total chan struct{}
done chan struct{}
// net.Conn generator
factory Factory
}
Expand All @@ -35,23 +57,23 @@ type boundedPool struct {
type Factory func() (net.Conn, error)

// NewBoundedPool returns a new pool based on buffered channels with an initial
// capacity, maximum capacity, idle timeout and timeout to wait for a connection
// from the pool. Factory is used when initial capacity is
// capacity, maximum capacity and maximum idle time that a connection remains
// idle in the connection pool. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool and total connections is less than the max, a new connection
// available in the pool and total connections is less than maxCap, a new connection
// will be created via the Factory() method. Otherwise, the call will block until
// a connection is available or the timeout is reached.
func NewBoundedPool(initialCap, maxCap int, idleTimeout, waitTimeout time.Duration, factory Factory) (pool.Pool, error) {
func NewBoundedPool(initialCap, maxCap int, idleTime time.Duration, factory Factory) (Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}

c := &boundedPool{
conns: make(chan *idleConn, maxCap),
factory: factory,
idleTimeout: idleTimeout,
waitTimeout: waitTimeout,
conns: make(chan *idleConn, maxCap),
total: make(chan struct{}, maxCap),
done: make(chan struct{}),
factory: factory,
}

// create initial connections, if something goes wrong,
Expand All @@ -63,67 +85,66 @@ func NewBoundedPool(initialCap, maxCap int, idleTimeout, waitTimeout time.Durati
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- &idleConn{c: conn, t: time.Now()}
atomic.AddInt32(&c.total, 1)
c.total <- struct{}{}
}

go c.pruneIdleConns(idleTime)
return c, nil
}

func (c *boundedPool) getConns() chan *idleConn {
c.mu.Lock()
func (c *boundedPool) getConnsAndFactory() (chan *idleConn, Factory) {
c.mu.RLock()
conns := c.conns
c.mu.Unlock()
return conns
factory := c.factory
c.mu.RUnlock()
return conns, factory
}

// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *boundedPool) Get() (net.Conn, error) {
conns := c.getConns()
conns, factory := c.getConnsAndFactory()
if conns == nil {
return nil, pool.ErrClosed
return nil, ErrClosed
}

// Try and grab a connection from the pool
for {
// Wrap our connections with our custom net.Conn implementation (wrapConn
// method) that puts the connection back to the pool if it's closed.
select {
case conn := <-conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn.c), nil
default:
// Could not get connection, can we create a new one?
c.mu.RLock()
select {
case conn := <-conns:
if conn == nil {
return nil, pool.ErrClosed
case c.total <- struct{}{}:
c.mu.RUnlock()
conn, err := factory()
if err != nil {
<-c.total
return nil, err
}
if timeout := c.idleTimeout; timeout > 0 {
if conn.t.Add(timeout).Before(time.Now()) {
// Close the connection when idle longer than the specified duration
conn.c.Close()
atomic.AddInt32(&c.total, -1)
continue
}
}
return c.wrapConn(conn.c), nil
return c.wrapConn(conn), nil
default:
// Could not get connection, can we create a new one?
if atomic.LoadInt32(&c.total) < maxConnections {
conn, err := c.factory()
if err != nil {
return nil, err
}
atomic.AddInt32(&c.total, 1)
return c.wrapConn(conn), nil
}
c.mu.RUnlock()
}
}

// The pool was empty and we couldn't create a new one to
// retry until one is free or we timeout
select {
case conn := <-conns:
if conn == nil {
return nil, pool.ErrClosed
}
return c.wrapConn(conn.c), nil
case <-time.After(c.waitTimeout):
return nil, fmt.Errorf("timed out waiting for free connection")
// The pool was empty and we couldn't create a new one to
// retry until one is free or we timeout
select {
case conn := <-conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn.c), nil
case <-time.After(PoolWaitTimeout):
return nil, errors.New("timed out waiting for free connection")
}
}

Expand All @@ -134,8 +155,8 @@ func (c *boundedPool) put(conn net.Conn) error {
return errors.New("connection is nil. rejecting")
}

c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()

if c.conns == nil {
// pool is closed, close passed connection
Expand All @@ -149,15 +170,17 @@ func (c *boundedPool) put(conn net.Conn) error {
return nil
default:
// pool is full, close passed connection
atomic.AddInt32(&c.total, -1)
<-c.total
return conn.Close()
}
}

func (c *boundedPool) Close() {
c.mu.Lock()
conns := c.conns
conns, total, done := c.conns, c.total, c.done
c.conns = nil
c.total = nil
c.done = nil
c.factory = nil
c.mu.Unlock()

Expand All @@ -169,11 +192,71 @@ func (c *boundedPool) Close() {
for conn := range conns {
conn.c.Close()
}
close(total)
close(done)
}

func (c *boundedPool) Len() int {
conns, _ := c.getConnsAndFactory()
return len(conns)
}

func (c *boundedPool) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.total)
}

func (c *boundedPool) Len() int { return len(c.getConns()) }
// pruneIdleConns prunes idle connections.
func (c *boundedPool) pruneIdleConns(idleTime time.Duration) {
if idleTime <= 0 {
return
}
ticker := time.NewTicker(idleTime)
defer ticker.Stop()
for {
c.mu.RLock()
done := c.done
c.mu.RUnlock()
select {
case <-done:
return
case <-ticker.C:
conns, _ := c.getConnsAndFactory()
if conns == nil {
return
}
if len(conns) == 0 {
continue
}
var newConns []*idleConn
for {
select {
case conn := <-conns:
if conn.t.Add(idleTime).Before(time.Now()) {
<-c.total
conn.c.Close()
} else {
newConns = append(newConns, conn)
}
default:
goto DONE
}
}
DONE:
if len(newConns) > 0 {
c.mu.RLock()
for _, conn := range newConns {
c.conns <- conn
}
c.mu.RUnlock()
newConns = nil
}
}
}
}

// newConn wraps a standard net.Conn to a poolConn net.Conn.
// wrapConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *boundedPool) wrapConn(conn net.Conn) net.Conn {
p := &pooledConn{c: c}
p.Conn = conn
Expand All @@ -196,6 +279,7 @@ func (p *pooledConn) Close() error {

if p.unusable {
if p.Conn != nil {
<-p.c.total
return p.Conn.Close()
}
return nil
Expand All @@ -208,5 +292,10 @@ func (p *pooledConn) MarkUnusable() {
p.mu.Lock()
p.unusable = true
p.mu.Unlock()
atomic.AddInt32(&p.c.total, -1)
}

func MarkUnusable(conn net.Conn) {
if pc, ok := conn.(*pooledConn); ok {
pc.MarkUnusable()
}
}
Loading

0 comments on commit f18eec0

Please sign in to comment.