golang mgo的mongo連接池設置:必須手動加上maxPoolSize


本司禮物系統使用了golang的 mongo庫 mgo,中間踩了一些坑,總結下避免大家再踩坑

golang的mgo庫說明里是說明了開啟連接復用的,但觀察實驗發現,這並沒有根本實現連接的控制,連接復用僅在有空閑連接時生效,高並發時無可用連接會不斷創建新連接,所以最終還是需要程序員自行去限制最大連接才行。

廢話不多說,開始上代碼

GlobalMgoSession, err := mgo.Dial(host)
 
func (m *MongoBaseDao) Get(tablename string, id string, result  interface {})  interface {} {
     session := GlobalMgoSession.Clone()
     defer session.Close()
 
     collection := session.DB(globalMgoDbName).C(tablename)
     err := collection.FindId(bson.ObjectIdHex(id)).One(result)
 
     if  err != nil {
         logkit.Logger.Error( "mongo_base method:Get "  + err.Error())
     }
     return  result
}

 

golang main入口啟動時,我們會創建一個全局session,然后每次使用時clone session的信息和連接,用於本次請求,使用后調用session.Close() 釋放連接。

// Clone works just like Copy, but also reuses the same socket as the original
// session, in case it had already reserved one due to its consistency
// guarantees.  This behavior ensures that writes performed in the old session
// are necessarily observed when using the new session, as long as it was a
// strong or monotonic session.  That said, it also means that long operations
// may cause other goroutines using the original session to wait.
func (s *Session) Clone() *Session {
     s.m.Lock()
     scopy := copySession(s, true)
     s.m.Unlock()
     return  scopy
}
 
 
// Close terminates the session.  It's a runtime error to use a session
// after it has been closed.
func (s *Session) Close() {
     s.m.Lock()
     if  s.cluster_ != nil {
         debugf( "Closing session %p" , s)
         s.unsetSocket()   //釋放當前線程占用的socket 置為nil
         s.cluster_.Release()
         s.cluster_ = nil
     }
     s.m.Unlock()
}

 

Clone的方法注釋里說明會重用原始session的socket連接,但是並發請求一大,其他協程來不及釋放連接,當前協程會怎么辦?

 

func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
     // Read-only lock to check for previously reserved socket.
     s.m.RLock()
     // If there is a slave socket reserved and its use is acceptable, take it as long
     // as there isn't a master socket which would be preferred by the read preference mode.
     if  s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
         socket := s.slaveSocket
         socket.Acquire()
         s.m.RUnlock()
         logkit.Logger.Info( "sgp_test 1 acquireSocket slave is ok!" )
         return  socket, nil
     }
     if  s.masterSocket != nil {
         socket := s.masterSocket
         socket.Acquire()
         s.m.RUnlock()
         logkit.Logger.Info( "sgp_test 1  acquireSocket master is ok!" )
         return  socket, nil
     }
 
     s.m.RUnlock()
 
     // No go.  We may have to request a new socket and change the session,
     // so try again but with an exclusive lock now.
     s.m.Lock()
     defer s.m.Unlock()
     if  s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
         s.slaveSocket.Acquire()
         logkit.Logger.Info( "sgp_test 2  acquireSocket slave is ok!" )
         return  s.slaveSocket, nil
     }
     if  s.masterSocket != nil {
         s.masterSocket.Acquire()
         logkit.Logger.Info( "sgp_test 2  acquireSocket master is ok!" )
         return  s.masterSocket, nil
     }
 
     // Still not good.  We need a new socket.
     sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
 
......
     logkit.Logger.Info( "sgp_test 3   acquireSocket cluster AcquireSocket is ok!" )
     return  sock, nil
 
}

在源碼中加debug,結果日志說明一切:

Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok!
Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1 acquireSocket slave is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!

不斷的創建連接  AcquireSocket

 $  netstat -nat|grep -i 27017|wc -l

400

如果每個session 不調用close,會達到恐怖的4096,並堵死其他請求,所以clone或copy session時一定要defer close掉

啟用maxPoolLimit 參數則會限制總連接大小,連接到限制則當前協程會sleep等待  直到可以創建連接,高並發時鎖有問題,會導致多創建幾個連接

src/gopkg.in/mgo.v2/cluster.go 
     s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
         if  err == errPoolLimit {
             if  !warnedLimit {
                 warnedLimit = true
                 logkit.Logger.Error( "sgp_test WARNING: Per-server connection limit reached. "  + err.Error())
                 log( "WARNING: Per-server connection limit reached." )
             }
             time.Sleep(100 * time.Millisecond)
             continue
         }
 
session.go:
// SetPoolLimit sets the maximum number of sockets in use in a single server
   // before this session will block waiting for a socket to be available.
   // The default limit is 4096.
   //
   // This limit must be set to cover more than any expected workload of the
   // application. It is a bad practice and an unsupported use case to use the
   // database driver to define the concurrency limit of an application. Prevent
   // such concurrency "at the door" instead, by properly restricting the amount
   // of used resources and number of goroutines before they are created.
   func (s *Session) SetPoolLimit(limit int) {
       s.m.Lock()
       s.poolLimit = limit
       s.m.Unlock()
   }

連接池設置方法:

1、配置中 增加 

[host]:[port]?maxPoolSize=10

2、代碼中 :

dao.GlobalMgoSession.SetPoolLimit(10)

再做壓測:

 $  netstat -nat|grep -i 27017|wc -l

15

 

 

結論:

每次clone session之后,操作結束時如果調用 session.Close 則會unset Socket  ,socket refer數減少,如果不設置上限,每個協程請求到來發現無空閑連接就會創建socket連接,直到達到最大值4096,而mongo的連接數上限一般也就是1萬,也就是一個端口你只能啟動一兩個進程保證連接不被撐爆,過多的連接數客戶端效率不高,server端更會耗費內存和CPU,所以需要啟用自定義連接池 , 啟用連接池也需要注意如果有pooMaxLimit個協程執行過長或者死循環不釋放socket連接,也會悲劇。

mgo底層socket連接池只在maxPooMaxLimit 范圍內實現復用,需要自行優化。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM