diff --git a/cluster.go b/cluster.go index 9287c68f..43694e71 100644 --- a/cluster.go +++ b/cluster.go @@ -194,7 +194,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI continue } err = cluster.isMaster(socket, &result) - socket.Release() + socket.Release(0) if err != nil { tryerr = err logf("SYNC Command 'ismaster' to %s failed: %v", addr, err) @@ -583,7 +583,7 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { // AcquireSocket returns a socket to a server in the cluster. If slaveOk is // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. -func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { +func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit, poolUnusedLimit int) (s *mongoSocket, err error) { var started time.Time var syncCount uint warnedLimit := false @@ -647,7 +647,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout err := cluster.isMaster(s, &result) if err != nil || !result.IsMaster { logf("Cannot confirm server %s as master (%v)", server.Addr, err) - s.Release() + s.Release(poolUnusedLimit) cluster.syncServers() time.Sleep(100 * time.Millisecond) continue diff --git a/server.go b/server.go index 71919ce9..b38d6b37 100644 --- a/server.go +++ b/server.go @@ -133,7 +133,7 @@ func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) ( // closed in the meantime if server.closed { server.Unlock() - socket.Release() + socket.Release(0) socket.Close() return nil, abended, errServerClosed } @@ -205,12 +205,24 @@ func (server *mongoServer) Close() { } // RecycleSocket puts socket back into the unused cache. -func (server *mongoServer) RecycleSocket(socket *mongoSocket) { +func (server *mongoServer) RecycleSocket(socket *mongoSocket, poolUnusedLimit int) { server.Lock() - if !server.closed { - server.unusedSockets = append(server.unusedSockets, socket) + defer server.Unlock() + + if server.closed { + return } - server.Unlock() + + // If the number of unused sockets is too high, we just close this one. + // This won't close existing connections, and it's possible that we could + // still end up with a high watermark of unused sockets, but it should lead + // to a reduction if there is a sustained period of low usage. + if poolUnusedLimit > 0 && len(server.unusedSockets) > poolUnusedLimit { + socket.Close() + return + } + + server.unusedSockets = append(server.unusedSockets, socket) } func removeSocket(sockets []*mongoSocket, socket *mongoSocket) []*mongoSocket { @@ -319,7 +331,7 @@ func (server *mongoServer) pinger(loop bool) { max = server.pingWindow[i] } } - socket.Release() + socket.Release(0) server.Lock() if server.closed { loop = false diff --git a/session.go b/session.go index 9f326d93..e7ca1be7 100644 --- a/session.go +++ b/session.go @@ -90,6 +90,7 @@ type Session struct { dialCred *Credential creds []Credential poolLimit int + poolUnusedLimit int bypassValidation bool sessionId bson.Binary nextTransactionNumber int64 @@ -415,6 +416,10 @@ type DialInfo struct { // See Session.SetPoolLimit for details. PoolLimit int + // PoolUnusedLimit defines the limit for the number of unused sockets + // in the pool. Defaults to 0, meaning no limit. + PoolUnusedLimit int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -491,6 +496,9 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + if info.PoolUnusedLimit > 0 { + session.poolUnusedLimit = info.PoolUnusedLimit + } cluster.Release() // People get confused when we return a session that is not actually @@ -674,22 +682,21 @@ func (db *Database) GridFS(prefix string) *GridFS { // use an ordering-preserving document, such as a struct value or an // instance of bson.D. For instance: // -// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) +// db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) // // For privilleged commands typically run on the "admin" database, see // the Run method in the Session type. // // Relevant documentation: // -// http://www.mongodb.org/display/DOCS/Commands -// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips -// +// http://www.mongodb.org/display/DOCS/Commands +// http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips func (db *Database) Run(cmd interface{}, result interface{}) error { socket, err := db.Session.acquireSocket(true) if err != nil { return err } - defer socket.Release() + defer socket.Release(0) // This is an optimized form of db.C("$cmd").Find(cmd).One(result). return db.run(socket, cmd, result) @@ -738,7 +745,7 @@ func (s *Session) Login(cred *Credential) error { if err != nil { return err } - defer socket.Release() + defer socket.Release(0) credCopy := *cred if cred.Source == "" { @@ -1804,6 +1811,16 @@ func (s *Session) SetPoolLimit(limit int) { s.m.Unlock() } +// SetPoolUnusedLimit sets the maximum number of unused sockets in the pool +// before the pool starts closing the least recently used sockets. +// The default limit is 0, which means that sockets are never closed until +// the pool limit is reached. +func (s *Session) SetPoolUnusedLimit(limit int) { + s.m.Lock() + s.poolUnusedLimit = limit + s.m.Unlock() +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. @@ -3219,13 +3236,14 @@ func (q *Query) One(result interface{}) (err error) { session.m.RLock() txn := session.transaction startTxn := txn != nil && !txn.started + poolUnusedLimit := session.poolUnusedLimit session.m.RUnlock() socket, err := session.acquireSocket(true) if err != nil { return err } - defer socket.Release() + defer socket.Release(poolUnusedLimit) op.limit = -1 @@ -3593,6 +3611,7 @@ func (q *Query) Iter() *Iter { session.m.RLock() txn := session.transaction startTxn := txn != nil && !txn.started + poolUnusedLimit := session.poolUnusedLimit session.m.RUnlock() iter := &Iter{ @@ -3612,7 +3631,7 @@ func (q *Query) Iter() *Iter { iter.err = err return iter } - defer socket.Release() + defer socket.Release(poolUnusedLimit) session.prepareQuery(&op) op.replyFunc = iter.op.replyFunc @@ -3690,6 +3709,7 @@ func (q *Query) Tail(timeout time.Duration) *Iter { session := q.session op := q.op prefetch := q.prefetch + poolUnusedLimit := session.poolUnusedLimit q.m.Unlock() iter := &Iter{session: session, prefetch: prefetch} @@ -3715,7 +3735,7 @@ func (q *Query) Tail(timeout time.Duration) *Iter { iter.err = err iter.m.Unlock() } - socket.Release() + socket.Release(poolUnusedLimit) } return iter } @@ -3777,7 +3797,7 @@ func (iter *Iter) Close() error { if err == nil { // TODO Batch kills. err = socket.Query(&killCursorsOp{[]int64{cursorId}}) - socket.Release() + socket.Release(iter.session.poolUnusedLimit) } iter.m.Lock() @@ -4040,15 +4060,16 @@ func (iter *Iter) acquireSocket() (*mongoSocket, error) { // to primary. Our cursor is in a specific server, though. iter.session.m.Lock() sockTimeout := iter.session.sockTimeout + poolUnusedLimit := iter.session.poolUnusedLimit iter.session.m.Unlock() - socket.Release() + socket.Release(poolUnusedLimit) socket, _, err = iter.server.AcquireSocket(0, sockTimeout) if err != nil { return nil, err } err := iter.session.socketLogin(socket) if err != nil { - socket.Release() + socket.Release(poolUnusedLimit) return nil, err } } @@ -4066,7 +4087,10 @@ func (iter *Iter) getMore() { iter.err = err return } - defer socket.Release() + iter.session.m.RLock() + poolUnusedLimit := iter.session.poolUnusedLimit + iter.session.m.RUnlock() + defer socket.Release(poolUnusedLimit) debugf("Iter %p requesting more documents", iter) if iter.limit > 0 { @@ -4611,14 +4635,14 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { } // Still not good. We need a new socket. - sock, err := cluster.AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) + sock, err := cluster.AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolUnusedLimit) if err != nil { return nil, err } // Authenticate the new socket. if err = s.socketLogin(sock); err != nil { - sock.Release() + sock.Release(s.poolUnusedLimit) return nil, err } @@ -4657,7 +4681,7 @@ func (s *Session) handleNotPrimaryWrite(on *mongoSocket) { s.m.RUnlock() s.m.Lock() if s.masterSocket == on { - s.masterSocket.Release() + s.masterSocket.Release(s.poolUnusedLimit) s.masterSocket = nil } s.m.Unlock() @@ -4687,11 +4711,14 @@ func (s *Session) setSocket(socket *mongoSocket) { // unsetSocket releases any slave and/or master sockets reserved. func (s *Session) unsetSocket() { + s.m.RLock() + poolUnusedLimit := s.poolUnusedLimit + s.m.RUnlock() if s.masterSocket != nil { - s.masterSocket.Release() + s.masterSocket.Release(poolUnusedLimit) } if s.slaveSocket != nil { - s.slaveSocket.Release() + s.slaveSocket.Release(poolUnusedLimit) } s.masterSocket = nil s.slaveSocket = nil @@ -4808,7 +4835,10 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err if err != nil { return nil, err } - defer socket.Release() + s.m.RLock() + poolUnusedLimit := s.poolUnusedLimit + s.m.RUnlock() + defer socket.Release(poolUnusedLimit) defer func() { if IsNotPrimaryError(err) { s.handleNotPrimaryWrite(socket) diff --git a/socket.go b/socket.go index 0df7939a..d792f618 100644 --- a/socket.go +++ b/socket.go @@ -255,7 +255,7 @@ func (socket *mongoSocket) Acquire() (info *mongoServerInfo) { // Release decrements a socket reference. The socket will be // recycled once its released as many times as it's been acquired. -func (socket *mongoSocket) Release() { +func (socket *mongoSocket) Release(poolUnusedLimit int) { socket.Lock() if socket.references == 0 { panic("socket.Release() with references == 0") @@ -269,7 +269,7 @@ func (socket *mongoSocket) Release() { socket.LogoutAll() // If the socket is dead server is nil. if server != nil { - server.RecycleSocket(socket) + server.RecycleSocket(socket, poolUnusedLimit) } } else { socket.Unlock()