Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce pool unused limit #29

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
continue
}
err = cluster.isMaster(socket, &result)
socket.Release()
socket.Release(0)
if err != nil {
tryerr = err
logf("SYNC Command 'ismaster' to %s failed: %v", addr, err)
Expand Down Expand Up @@ -583,7 +583,7 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit, poolUnusedLimit int) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
warnedLimit := false
Expand Down Expand Up @@ -647,7 +647,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
err := cluster.isMaster(s, &result)
if err != nil || !result.IsMaster {
logf("Cannot confirm server %s as master (%v)", server.Addr, err)
s.Release()
s.Release(poolUnusedLimit)
cluster.syncServers()
time.Sleep(100 * time.Millisecond)
continue
Expand Down
24 changes: 18 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (
// closed in the meantime
if server.closed {
server.Unlock()
socket.Release()
socket.Release(0)
socket.Close()
return nil, abended, errServerClosed
}
Expand Down Expand Up @@ -205,12 +205,24 @@ func (server *mongoServer) Close() {
}

// RecycleSocket puts socket back into the unused cache.
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
func (server *mongoServer) RecycleSocket(socket *mongoSocket, poolUnusedLimit int) {
server.Lock()
if !server.closed {
server.unusedSockets = append(server.unusedSockets, socket)
defer server.Unlock()

if server.closed {
return
}
server.Unlock()

// If the number of unused sockets is too high, we just close this one.
// This won't close existing connections, and it's possible that we could
// still end up with a high watermark of unused sockets, but it should lead
// to a reduction if there is a sustained period of low usage.
if poolUnusedLimit > 0 && len(server.unusedSockets) > poolUnusedLimit {
socket.Close()
return
}

server.unusedSockets = append(server.unusedSockets, socket)
}

func removeSocket(sockets []*mongoSocket, socket *mongoSocket) []*mongoSocket {
Expand Down Expand Up @@ -319,7 +331,7 @@ func (server *mongoServer) pinger(loop bool) {
max = server.pingWindow[i]
}
}
socket.Release()
socket.Release(0)
server.Lock()
if server.closed {
loop = false
Expand Down
68 changes: 49 additions & 19 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Session struct {
dialCred *Credential
creds []Credential
poolLimit int
poolUnusedLimit int
bypassValidation bool
sessionId bson.Binary
nextTransactionNumber int64
Expand Down Expand Up @@ -415,6 +416,10 @@ type DialInfo struct {
// See Session.SetPoolLimit for details.
PoolLimit int

// PoolUnusedLimit defines the limit for the number of unused sockets
// in the pool. Defaults to 0, meaning no limit.
PoolUnusedLimit int

// DialServer optionally specifies the dial function for establishing
// connections with the MongoDB servers.
DialServer func(addr *ServerAddr) (net.Conn, error)
Expand Down Expand Up @@ -491,6 +496,9 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
if info.PoolLimit > 0 {
session.poolLimit = info.PoolLimit
}
if info.PoolUnusedLimit > 0 {
session.poolUnusedLimit = info.PoolUnusedLimit
}
cluster.Release()

// People get confused when we return a session that is not actually
Expand Down Expand Up @@ -674,22 +682,21 @@ func (db *Database) GridFS(prefix string) *GridFS {
// use an ordering-preserving document, such as a struct value or an
// instance of bson.D. For instance:
//
// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}})
//
// For privilleged commands typically run on the "admin" database, see
// the Run method in the Session type.
//
// Relevant documentation:
//
// http://www.mongodb.org/display/DOCS/Commands
// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
//
// http://www.mongodb.org/display/DOCS/Commands
// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips
func (db *Database) Run(cmd interface{}, result interface{}) error {
socket, err := db.Session.acquireSocket(true)
if err != nil {
return err
}
defer socket.Release()
defer socket.Release(0)

// This is an optimized form of db.C("$cmd").Find(cmd).One(result).
return db.run(socket, cmd, result)
Expand Down Expand Up @@ -738,7 +745,7 @@ func (s *Session) Login(cred *Credential) error {
if err != nil {
return err
}
defer socket.Release()
defer socket.Release(0)

credCopy := *cred
if cred.Source == "" {
Expand Down Expand Up @@ -1804,6 +1811,16 @@ func (s *Session) SetPoolLimit(limit int) {
s.m.Unlock()
}

// SetPoolUnusedLimit sets the maximum number of unused sockets in the pool
// before the pool starts closing the least recently used sockets.
// The default limit is 0, which means that sockets are never closed until
// the pool limit is reached.
func (s *Session) SetPoolUnusedLimit(limit int) {
s.m.Lock()
s.poolUnusedLimit = limit
s.m.Unlock()
}

// SetBypassValidation sets whether the server should bypass the registered
// validation expressions executed when documents are inserted or modified,
// in the interest of preserving invariants in the collection being modified.
Expand Down Expand Up @@ -3219,13 +3236,14 @@ func (q *Query) One(result interface{}) (err error) {
session.m.RLock()
txn := session.transaction
startTxn := txn != nil && !txn.started
poolUnusedLimit := session.poolUnusedLimit
session.m.RUnlock()

socket, err := session.acquireSocket(true)
if err != nil {
return err
}
defer socket.Release()
defer socket.Release(poolUnusedLimit)

op.limit = -1

Expand Down Expand Up @@ -3593,6 +3611,7 @@ func (q *Query) Iter() *Iter {
session.m.RLock()
txn := session.transaction
startTxn := txn != nil && !txn.started
poolUnusedLimit := session.poolUnusedLimit
session.m.RUnlock()

iter := &Iter{
Expand All @@ -3612,7 +3631,7 @@ func (q *Query) Iter() *Iter {
iter.err = err
return iter
}
defer socket.Release()
defer socket.Release(poolUnusedLimit)

session.prepareQuery(&op)
op.replyFunc = iter.op.replyFunc
Expand Down Expand Up @@ -3690,6 +3709,7 @@ func (q *Query) Tail(timeout time.Duration) *Iter {
session := q.session
op := q.op
prefetch := q.prefetch
poolUnusedLimit := session.poolUnusedLimit
q.m.Unlock()

iter := &Iter{session: session, prefetch: prefetch}
Expand All @@ -3715,7 +3735,7 @@ func (q *Query) Tail(timeout time.Duration) *Iter {
iter.err = err
iter.m.Unlock()
}
socket.Release()
socket.Release(poolUnusedLimit)
}
return iter
}
Expand Down Expand Up @@ -3777,7 +3797,7 @@ func (iter *Iter) Close() error {
if err == nil {
// TODO Batch kills.
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
socket.Release()
socket.Release(iter.session.poolUnusedLimit)
}

iter.m.Lock()
Expand Down Expand Up @@ -4040,15 +4060,16 @@ func (iter *Iter) acquireSocket() (*mongoSocket, error) {
// to primary. Our cursor is in a specific server, though.
iter.session.m.Lock()
sockTimeout := iter.session.sockTimeout
poolUnusedLimit := iter.session.poolUnusedLimit
iter.session.m.Unlock()
socket.Release()
socket.Release(poolUnusedLimit)
socket, _, err = iter.server.AcquireSocket(0, sockTimeout)
if err != nil {
return nil, err
}
err := iter.session.socketLogin(socket)
if err != nil {
socket.Release()
socket.Release(poolUnusedLimit)
return nil, err
}
}
Expand All @@ -4066,7 +4087,10 @@ func (iter *Iter) getMore() {
iter.err = err
return
}
defer socket.Release()
iter.session.m.RLock()
poolUnusedLimit := iter.session.poolUnusedLimit
iter.session.m.RUnlock()
defer socket.Release(poolUnusedLimit)

debugf("Iter %p requesting more documents", iter)
if iter.limit > 0 {
Expand Down Expand Up @@ -4611,14 +4635,14 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
}

// Still not good. We need a new socket.
sock, err := cluster.AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
sock, err := cluster.AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolUnusedLimit)
if err != nil {
return nil, err
}

// Authenticate the new socket.
if err = s.socketLogin(sock); err != nil {
sock.Release()
sock.Release(s.poolUnusedLimit)
return nil, err
}

Expand Down Expand Up @@ -4657,7 +4681,7 @@ func (s *Session) handleNotPrimaryWrite(on *mongoSocket) {
s.m.RUnlock()
s.m.Lock()
if s.masterSocket == on {
s.masterSocket.Release()
s.masterSocket.Release(s.poolUnusedLimit)
s.masterSocket = nil
}
s.m.Unlock()
Expand Down Expand Up @@ -4687,11 +4711,14 @@ func (s *Session) setSocket(socket *mongoSocket) {

// unsetSocket releases any slave and/or master sockets reserved.
func (s *Session) unsetSocket() {
s.m.RLock()
poolUnusedLimit := s.poolUnusedLimit
s.m.RUnlock()
if s.masterSocket != nil {
s.masterSocket.Release()
s.masterSocket.Release(poolUnusedLimit)
}
if s.slaveSocket != nil {
s.slaveSocket.Release()
s.slaveSocket.Release(poolUnusedLimit)
}
s.masterSocket = nil
s.slaveSocket = nil
Expand Down Expand Up @@ -4808,7 +4835,10 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
if err != nil {
return nil, err
}
defer socket.Release()
s.m.RLock()
poolUnusedLimit := s.poolUnusedLimit
s.m.RUnlock()
defer socket.Release(poolUnusedLimit)
defer func() {
if IsNotPrimaryError(err) {
s.handleNotPrimaryWrite(socket)
Expand Down
4 changes: 2 additions & 2 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (socket *mongoSocket) Acquire() (info *mongoServerInfo) {

// Release decrements a socket reference. The socket will be
// recycled once its released as many times as it's been acquired.
func (socket *mongoSocket) Release() {
func (socket *mongoSocket) Release(poolUnusedLimit int) {
socket.Lock()
if socket.references == 0 {
panic("socket.Release() with references == 0")
Expand All @@ -269,7 +269,7 @@ func (socket *mongoSocket) Release() {
socket.LogoutAll()
// If the socket is dead server is nil.
if server != nil {
server.RecycleSocket(socket)
server.RecycleSocket(socket, poolUnusedLimit)
}
} else {
socket.Unlock()
Expand Down