golang操作mongo使用的包是"gopkg.in/mgo.v2",coding過程中需要並發讀寫mongo數據庫,簡單觀摩了下源碼,記錄下自己的一些理解,如有錯誤,敬請斧正。
一般來說,我們直接這樣創建一個session:
Session, err = mgo.Dial(URL)
if err != nil {
log.Println(err)
}
來看看Dial這個函數做了什么:
func Dial(url string) (*Session, error) {
session, err := DialWithTimeout(url, 10*time.Second)
if err == nil {
session.SetSyncTimeout(1 * time.Minute)
session.SetSocketTimeout(1 * time.Minute)
}
return session, err
}
調用DialWithTimeout函數設置默認的超時時間是10秒。該函數中調用了DialWithInfo這個函數,而DialWithInfo函數中比較重要是是調用了newSession,看看這個函數做了什么操作:
func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
cluster.Acquire()
session = &Session{
cluster_: cluster,
syncTimeout: timeout,
sockTimeout: timeout,
poolLimit: 4096,
}
debugf("New session %p on cluster %p", session, cluster)
session.SetMode(consistency, true)
session.SetSafe(&Safe{})
session.queryConfig.prefetch = defaultPrefetch
return session
}
返回的session設置了一些默認的參數,暫時先忽略,直接看看Session的數據結構:
type Session struct {
m sync.RWMutex
cluster_ *mongoCluster
slaveSocket *mongoSocket
masterSocket *mongoSocket
slaveOk bool
consistency Mode
queryConfig query
safeOp *queryOp
syncTimeout time.Duration
sockTimeout time.Duration
defaultdb string
sourcedb string
dialCred *Credential
creds []Credential
poolLimit int
bypassValidation bool
}
m是mgo.Session的並發鎖,因此所有的Session實例都是線程安全的。slaveSocket,masterSocket代表了該Session到mongodb主節點和從節點的一個物理連接的緩存。而Session的策略總是優先使用緩存的連接。是否緩存連接,由consistency也就是該Session的模式決定。假設在並發程序中,使用同一個Session實例,不使用Copy,而該Session實例的模式又恰好會緩存連接,那么,所有的通過該Session實例的操作,都會通過同一條連接到達mongodb。雖然mongodb本身的網絡模型是非阻塞通信,請求可以通過一條鏈路,非阻塞地處理,但是會影響效率。
其次mgo.Session緩存的一主一從連接,實例本身不負責維護。也就是說,當slaveSocket,masterSocket任意其一,連接斷開,Session自己不會重置緩存,該Session的使用者如果不主動重置緩存,調用者得到的將永遠是EOF。這種情況在主從切換時就會發生,在網絡抖動時也會發生。
mgo的DB句柄需要你做一個copy操作:
// Copy works just like New, but preserves the exact authentication
// information from the original session.
func (s *Session) Copy() *Session {
s.m.Lock()
scopy := copySession(s, true)
s.m.Unlock()
scopy.Refresh()
return scopy
}
copySession將源Session淺拷貝到臨時Session中,這樣源Session的配置就拷貝到了臨時Session中。關鍵的Refresh,將源Session淺拷貝到臨時Session的連接緩存指針,也就是slaveSocket,masterSocket置為空,這樣臨時Session就不存在緩存連接,而轉為去嘗試獲取一個空閑的連接。
mgo自身維護了一套到mongodb集群的連接池。這套連接池機制以mongodb數據庫服務器為最小單位,每個mongodb都會在mgo內部,對應一個mongoServer結構體的實例,一個實例代表着mgo持有的到該數據庫的連接。看看這個連接池的定義:
type mongoServer struct {
sync.RWMutex
Addr string
ResolvedAddr string
tcpaddr *net.TCPAddr
unusedSockets []*mongoSocket
liveSockets []*mongoSocket
closed bool
abended bool
sync chan bool
dial dialer
pingValue time.Duration
pingIndex int
pingCount uint32
pingWindow [6]time.Duration
info *mongoServerInfo
}
info代表了該實例對應的數據庫服務器在集群中的信息——是否master,ReplicaSetName等。而兩個Slice,就是傳說中的連接池。unusedSockets存儲當前空閑的連接,liveSockets存儲當前活躍中的連接,Session緩存的連接就同時存放在liveSockets切片中,而臨時Session獲取到的連接就位於unusedSockets切片中。
每個mongoServer都會隸屬於一個mongoCluster結構,相當於mgo在內部,模擬出了mongo數據庫集群的模型。
type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
}
mongoCluster持有一系列mongoServer的實例,以主從結構分散到兩個數組中。 每個Session都會存儲自己對應的,要操作的mongoCluster的引用。
前面的描述可以總結成下面這張圖:
那么我們在使用的時候就可以創建一個Session,然后clone操作,用clone得到的copysession完成操作,結束后關閉這個copysession就可以了。