在CAP理論與MongoDB一致性、可用性的一些思考一文中提到,MongoDB提供了一些選項,如Read Preference、Read Concern、Write Concern,對MongoDB的一致性、可用性、可靠性(durability)、性能會有較大的影響。與Read Concern、Write Concern不同的是,Read Preference基本上完全由MongoDb Driver實現,因此,本文通過PyMongo來看看Read Preference具體是如何實現的。
本文分析的PyMongo版本是PyMongo3.6,該版本兼容MongoDB3.6及以下的MongoDB。
本文地址:https://www.cnblogs.com/xybaby/p/10256812.html
Read Preference
Read preference describes how MongoDB clients route read operations to the members of a replica set.
Read Prefenrece決定了使用復制集(replica set)時,讀操作路由到哪個mongod節點,如果使用Sharded Cluster,路由選擇由Mongos決定,如果直接使用replica set,那么路由選擇由driver決定。如下圖所示:
MongoDB提供了以下Read Preference Mode:
- primary:默認模式,一切讀操作都路由到replica set的primary節點
- primaryPreferred:正常情況下都是路由到primary節點,只有當primary節點不可用(failover)的時候,才路由到secondary節點。
- secondary:一切讀操作都路由到replica set的secondary節點
- secondaryPreferred:正常情況下都是路由到secondary節點,只有當secondary節點不可用的時候,才路由到primary節點。
- nearest:從延時最小的節點讀取數據,不管是primary還是secondary。對於分布式應用且MongoDB是多數據中心部署,nearest能保證最好的data locality。
這五種模式還受到maxStalenessSeconds和tagsets的影響。
不同的read Preference mode適合不同的應用場景,如果數據的一致性很重要,比如必須保證read-after-write一致性,那么就需要從primary讀,因為secondary的數據有一定的滯后。如果能接受一定程度的stale data,那么從secondary讀數據可以減輕primary的壓力,且在primary failover期間也能提供服務,可用性更高。如果對延時敏感,那么適合nearest。另外,通過tagsets,還可以有更豐富的定制化讀取策略,比如指定從某些datacenter讀取。
PyMongo
首先給出pymongo中與read preference相關的類,方便后面的分析。
上圖中實線箭頭表示強引用(復合),虛線箭頭表示弱引用(聚合)
connect to replica set
PyMongo的文檔給出了如何連接到復制集:指定復制集的名字,以及一個或多個該復制集內的節點。如:
MongoClient('localhost', replicaset='foo')
上述操作是non-blocking,立即返回,通過后台線程去連接指定節點,PyMongo連接到節點后,會從mongod節點獲取到復制集內其他節點的信息,然后再連接到復制集內的其他節點。
from time import sleep
c = MongoClient('localhost', replicaset='foo'); print(c.nodes); sleep(0.1); print(c.nodes)
frozenset([])
frozenset([(u'localhost', 27019), (u'localhost', 27017), (u'localhost', 27018)])
可以看到,剛初始化MongoClient實例時,並沒有連接到任何節點(c.nodes)為空;過了一段時間,再查看,那么會發現已經連上了復制集內的三個節點。
那么問題來了,創建MongoClient后,尚未連接到復制集節點之前,能否立即操作數據庫?
If you need to do any operation with a MongoClient, such as a find() or an insert_one(), the client waits to discover a suitable member before it attempts the operation.
通過后續的代碼分析可以看到,會通過一個條件變量(threading.Condition)去協調。
PyMongo Monitor
上面提到,初始化MongoClient對象的時候,會通過指定的mognod節點去發現復制集內的其他節點,這個就是通過monitor.Monitor
來實現的。從上面的類圖可以看到,每一個server(與一個mongod節點對應)都有一個monitor。Monitor的作用在於:
- Health: detect when a member goes down or comes up, or if a different member becomes primary
- Configuration: detect when members are added or removed, and detect changes in members’ tags
- Latency: track a moving average of each member’s ping time
Monitor會啟動一個后台線程 PeriodExecutor
,定時(默認10s)通過socket連接Pool
給對應的mongod節點發送 ismaster 消息。核心代碼(略作調整)如下
def _run(self):
self._server_description = self._check_with_retry()
self._topology.on_change(self._server_description)
def _check_with_retry(self):
address = self._server_description.address
response, round_trip_time = self._check_with_socket(
sock_info, metadata=metadata)
self._avg_round_trip_time.add_sample(round_trip_time) # 更新rtt
sd = ServerDescription(
address=address,
ismaster=response,
round_trip_time=self._avg_round_trip_time.get())
return sd
def _check_with_socket(self, sock_info, metadata=None):
"""Return (IsMaster, round_trip_time).
Can raise ConnectionFailure or OperationFailure.
"""
cmd = SON([('ismaster', 1)])
if metadata is not None:
cmd['client'] = metadata
if self._server_description.max_wire_version >= 6:
cluster_time = self._topology.max_cluster_time()
if cluster_time is not None:
cmd['$clusterTime'] = cluster_time
start = _time()
request_id, msg, max_doc_size = message.query(
0, 'admin.$cmd', 0, -1, cmd,
None, DEFAULT_CODEC_OPTIONS)
# TODO: use sock_info.command()
sock_info.send_message(msg, max_doc_size)
reply = sock_info.receive_message(request_id)
return IsMaster(reply.command_response()), _time() - start
類IsMaster
是對ismaster command reponse的封裝,比較核心的屬性包括:
- replica_set_name:從mongod節點看來,復制集的名字
- primary:從mongod節點看來,誰是Priamry
- all_hosts: 從mongod節點看來,復制集中的所有節點
- last_write_date: mongod節點最后寫入數據的時間,用來判斷secondary節點的staleness
- set_version:config version
- election_id:只有當mongod是primary時才會設置,表示最新的primary選舉編號
當某個server的monitor獲取到了在server對應的mongod上的復制集信息信息時,調用Tolopogy.on_change
更新復制集的拓撲信息:
def on_change(self, server_description):
"""Process a new ServerDescription after an ismaster call completes."""
if self._description.has_server(server_description.address):
self._description = updated_topology_description(
self._description, server_description)
self._update_servers() # 根據信息,連接到新增的節點,移除(斷開)已經不存在的節點
self._receive_cluster_time_no_lock(
server_description.cluster_time)
# Wake waiters in select_servers().
self._condition.notify_all()
核心在updated_topology_description
, 根據本地記錄的topology信息,以及收到的server_description(來自IsMaster- ismaster command response),來調整本地的topology信息。以一種情況為例:收到一個ismaster command response,對方自稱自己是primary,不管當前topology有沒有primary,都會進入調用以下函數
def _update_rs_from_primary(
sds,
replica_set_name,
server_description,
max_set_version,
max_election_id):
"""Update topology description from a primary's ismaster response.
Pass in a dict of ServerDescriptions, current replica set name, the
ServerDescription we are processing, and the TopologyDescription's
max_set_version and max_election_id if any.
Returns (new topology type, new replica_set_name, new max_set_version,
new max_election_id).
"""
if replica_set_name is None:
replica_set_name = server_description.replica_set_name
elif replica_set_name != server_description.replica_set_name: # 不是來自同一個復制集
# We found a primary but it doesn't have the replica_set_name
# provided by the user.
sds.pop(server_description.address)
return (_check_has_primary(sds),
replica_set_name,
max_set_version,
max_election_id)
max_election_tuple = max_set_version, max_election_id
if None not in server_description.election_tuple:
if (None not in max_election_tuple and
max_election_tuple > server_description.election_tuple): # 節點是priamry,但比topology中記錄的舊
# Stale primary, set to type Unknown.
address = server_description.address
sds[address] = ServerDescription(address) # 傳入空dict,則server-type為UnKnown
return (_check_has_primary(sds),
replica_set_name,
max_set_version,
max_election_id)
max_election_id = server_description.election_id
if (server_description.set_version is not None and # 節點的config version版本更高
(max_set_version is None or
server_description.set_version > max_set_version)):
max_set_version = server_description.set_version
# We've heard from the primary. Is it the same primary as before?
for server in sds.values():
if (server.server_type is SERVER_TYPE.RSPrimary
and server.address != server_description.address):
# Reset old primary's type to Unknown.
sds[server.address] = ServerDescription(server.address)
# There can be only one prior primary.
break
# Discover new hosts from this primary's response.
for new_address in server_description.all_hosts:
if new_address not in sds:
sds[new_address] = ServerDescription(new_address)
# Remove hosts not in the response.
for addr in set(sds) - server_description.all_hosts:
sds.pop(addr)
# If the host list differs from the seed list, we may not have a primary
# after all.
return (_check_has_primary(sds),
replica_set_name,
max_set_version,
max_election_id)
注意看docstring中的Returns,都是返回新的復制集信息
那么整個函數從上往下檢查
- 是不是同一個復制集
- 新節點(自認為是primary)與topology記錄的primary相比,誰更新。比較(set_version, election_id)
- 比較set_servion
- 如果topology中已經有stale primary,那么將其server-type改成Unknown
- 從Primary節點的all_hosts中取出新加入復制集的節點
- 移除已經不存在於復制集中的節點
PyMongo關於復制集的狀態都來自於所有節點的ismaster消息,Source of Truth在於復制集,而且這個Truth來自於majority 節點。因此,某個節點返回給driver的信息可能是過期的、錯誤的,driver通過有限的信息判斷復制集的狀態,如果判斷失誤,比如將寫操作發到了stale primary上,那么會在復制集上再次判斷,保證正確性。
PyMongo read preference
前面詳細介紹了PyMongo是如何更新復制集的信息,那么這一部分來看看基於拓撲信息具體是如何根據read preference路由到某個節點上的。
我們從Collection.find出發,一路跟蹤, 會調用MongoClient._send_message_with_response
def _send_message_with_response(self, operation, read_preference=None,
exhaust=False, address=None):
topology = self._get_topology()
if address:
server = topology.select_server_by_address(address)
if not server:
raise AutoReconnect('server %s:%d no longer available'
% address)
else:
selector = read_preference or writable_server_selector
server = topology.select_server(selector)
return self._reset_on_error(
server,
server.send_message_with_response,
operation,
set_slave_ok,
self.__all_credentials,
self._event_listeners,
exhaust)
代碼很清晰,根據指定的address或者read_preference, 選擇出server,然后通過server發請求,等待回復。topology.select_server一路調用到下面這個函數
def _select_servers_loop(self, selector, timeout, address):
"""select_servers() guts. Hold the lock when calling this."""
now = _time()
end_time = now + timeout
server_descriptions = self._description.apply_selector( # _description是TopologyDescription
selector, address)
while not server_descriptions:
# No suitable servers.
if timeout == 0 or now > end_time:
raise ServerSelectionTimeoutError(
self._error_message(selector))
self._ensure_opened()
self._request_check_all()
# Release the lock and wait for the topology description to
# change, or for a timeout. We won't miss any changes that
# came after our most recent apply_selector call, since we've
# held the lock until now.
self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) # Conditional.wait
self._description.check_compatible()
now = _time()
server_descriptions = self._description.apply_selector(
selector, address)
self._description.check_compatible()
return server_descriptions
可以看到,不一定能一次選出來,如果選不出server,意味着此時還沒有連接到足夠多的mongod節點,那么等待一段時間(_condition.wait
)重試。在上面Topology.on_change 可以看到,會調用_condition.notify_all
喚醒。
def apply_selector(self, selector, address):
def apply_local_threshold(selection):
if not selection:
return []
settings = self._topology_settings
# Round trip time in seconds.
fastest = min(
s.round_trip_time for s in selection.server_descriptions)
threshold = settings.local_threshold_ms / 1000.0
return [s for s in selection.server_descriptions
if (s.round_trip_time - fastest) <= threshold]
# 省略了無關代碼...
return apply_local_threshold(
selector(Selection.from_topology_description(self)))
上面selector就是read_preference._ServerMode
的某一個子類,以Nearest
為例
class Nearest(_ServerMode):
def __call__(self, selection):
"""Apply this read preference to Selection."""
return member_with_tags_server_selector(
self.tag_sets,
max_staleness_selectors.select(
self.max_staleness, selection))
首先要受到maxStalenessSeconds的約束,然后再用tagsets過濾一遍,這里只關注前者。
關於maxStalenessSeconds
The read preference maxStalenessSeconds option lets you specify a maximum replication lag, or “staleness”, for reads from secondaries. When a secondary’s estimated staleness exceeds maxStalenessSeconds, the client stops using it for read operations.
怎么計算的,如果節點有primary,則調用下面這個函數
def _with_primary(max_staleness, selection):
"""Apply max_staleness, in seconds, to a Selection with a known primary."""
primary = selection.primary
sds = []
for s in selection.server_descriptions:
if s.server_type == SERVER_TYPE.RSSecondary:
# See max-staleness.rst for explanation of this formula.
staleness = (
(s.last_update_time - s.last_write_date) -
(primary.last_update_time - primary.last_write_date) +
selection.heartbeat_frequency)
if staleness <= max_staleness:
sds.append(s)
else:
sds.append(s)
return selection.with_server_descriptions(sds)
上面的代碼用到了IsMaster的last_write_date屬性,正是用這個屬性來判斷staleness。
公式的解釋可參考max-staleness.rst
個人覺得可以這么理解:假設網絡延時一致,如果在同一時刻收到心跳回復,那么只用P.lastWriteDate - S.lastWriteDate就行了,但心跳時間不同,所以得算上時間差。我會寫成(P.lastWriteDate - S.lastWriteDate) + (S.lastUpdateTime - P.lastUpdateTime) 。加上 心跳間隔是基於悲觀假設,如果剛心跳完之后secondary就停止復制,那么在下一次心跳之前最多的stale程度就得加上 心跳間隔。
從代碼可以看到Nearest找出了所有可讀的節點,然后通過apply_local_threshold
函數來刷選出最近的。