1. ReplSet源碼結構
rs_config.h replSet間同步設置的工具類 rs_member.h 心跳檢測類和replSet成員狀態的定義 rs_sync.h 同步數據類 rs.h 定義了幾乎所有replSet相關的類(Member:replSet中的節點成員, GhostSync:備份同步類,ReplSet:管理所有Member的核心類) |
2. ReplSet的結構分析
RSBase定義了線程鎖。
ReplSetImpl完成了大部分Replication Set的操作:心跳檢測,選舉,設置主節點,查詢節點信息。
Member記錄其他節點的信息,Manager查詢節點信息,ReplSetHealthPoolTask心跳檢測,ReplSetConfig同步設置,Consensus選舉主節點,StateBox設置主節點。
ReplSet封裝了很多方便的操作。
3. 創建並設置ReplSet
在mongod開始運行后(mongoDBMain),會啟動監聽程序同時,根據運行參數中是否標明了”--replSet”,調用startReplication來創建ReplSet。
/* if we are going to be a replica set, we aren't doing other forms of replication. */
if( !cmdLine._replSet.empty() ) { // 看看參數里面有沒有--replSet
if( replSettings.slave || replSettings.master ) { // 這個參數不能與—slave與—master共存
log() << " *** " << endl;
log() << " ERROR: can't use --slave or --master replication options with --replSet " << endl;
log() << " *** " << endl;
}
newRepl(); // 將自己標示成為一個新的Replication節點
replSet = true;
ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet);
boost::thread t( boost::bind( &startReplSets, replSetCmdline) ); // 啟動線程完成真正的ReplSet創建
return;
}
// ……
}
void startReplSets(ReplSetCmdline *replSetCmdline) { // 線程中執行的函數
Client::initThread( " rsStart ");
try {
verify( theReplSet == 0 );
if( replSetCmdline == 0 ) {
verify(!replSet);
return;
}
replLocalAuth();
(theReplSet = new ReplSet(*replSetCmdline))->go(); // 創建一個ReplSet賦值到一個全局的指針中,隨即調用go啟動
}
catch(std::exception& e) {
log() << " replSet caught exception in startReplSets thread: " << e.what() << rsLog;
if( theReplSet )
theReplSet->fatal();
}
cc().shutdown();
}
在構造ReplSetImpl時,會調用ReplSetImpl::loadConfig去設置ReplSet。
startupStatus = LOADINGCONFIG; // 標注狀態為讀取config
startupStatusMsg. set( " loading " + rsConfigNs + " config (LOADINGCONFIG) ");
LOG( 1) << " loadConfig() " << rsConfigNs << endl;
while( 1 ) { // 循環獲取配置
try {
vector<ReplSetConfig> configs;
try {
configs.push_back( ReplSetConfig(HostAndPort::me()) ); // 嘗試從本機端口獲取配置
}
catch(DBException& e) {
log() << " replSet exception loading our local replset configuration object : " << e.toString() << rsLog;
}
// 這里的種子是用戶配置的
for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {
try {
configs.push_back( ReplSetConfig(*i) ); // 嘗試從其他設備上獲取配置
}
catch( DBException& e ) {
log() << " replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog;
}
}
{
scoped_lock lck( replSettings.discoveredSeeds_mx );
if( replSettings.discoveredSeeds.size() > 0 ) { // 其他線程搜索的其他設備(心跳檢測中提供的節點)
for ( set< string>::iterator i = replSettings.discoveredSeeds.begin();
i != replSettings.discoveredSeeds.end();
i++) {
try {
configs.push_back( ReplSetConfig(HostAndPort(*i)) ); // 從新搜索的設備中獲取配置
}
catch( DBException& ) {
log( 1) << " replSet exception trying to load config from discovered seed " << *i << rsLog;
replSettings.discoveredSeeds.erase(*i);
}
}
}
}
if (!replSettings.reconfig.isEmpty()) {
try {
configs.push_back(ReplSetConfig(replSettings.reconfig, true)); // 從shell獲取配置
}
catch( DBException& re) {
log() << " replSet couldn't load reconfig: " << re.what() << rsLog;
replSettings.reconfig = BSONObj();
}
}
int nok = 0;
int nempty = 0;
for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
if( i->ok() ) // 判斷得到的設置是否可用
nok++;
if( i->empty() ) // 判斷得到配置是否為空
nempty++;
}
if( nok == 0 ) {
if( nempty == ( int) configs.size() ) { // 如果沒有獲取到配置信息
startupStatus = EMPTYCONFIG; // 標注狀態
startupStatusMsg. set( " can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG) ");
log() << " replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG) " << rsLog;
static unsigned once;
if( ++once == 1 ) {
log() << " replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done " << rsLog;
}
if( _seeds->size() == 0 ) {
LOG( 1) << " replSet info no seed hosts were specified on the --replSet command line " << rsLog;
}
}
else {
startupStatus = EMPTYUNREACHABLE;
startupStatusMsg. set( " can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE) ");
log() << " replSet can't get " << rsConfigNs << " config from self or any seed (yet) " << rsLog;
}
sleepsecs( 10); // 十秒后重試
continue;
}
if( !_loadConfigFinish(configs) ) { // 找到了配置信息但是配置失敗
log() << " replSet info Couldn't load config yet. Sleeping 20sec and will try again. " << rsLog;
sleepsecs( 20); // 二十秒后重試
continue;
}
}
catch(DBException& e) {
startupStatus = BADCONFIG;
startupStatusMsg. set( " replSet error loading set config (BADCONFIG) ");
log() << " replSet error loading configurations " << e.toString() << rsLog;
log() << " replSet error replication will not start " << rsLog;
sethbmsg( " error loading set config ");
_fatal();
throw;
}
break; // 設置成功跳出循環
}
startupStatusMsg. set( " ? started ");
startupStatus = STARTED; // 標注狀態為開始運行
}
從代碼上看,設置過程為先去各個地方找到可用的配置。之后,統計其中可用的配置,如果沒有可用的配置也沒有不為空的配置,則輸出log提示用戶設置。當找到了可用的配置之后,將其數組發送到_loadConfigFinish函數進行設置。
ReplSet的設置工作:
bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
int v = - 1; // 當前版本號
ReplSetConfig *highest = 0;
int myVersion = - 2000;
int n = 0;
for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { // 循環取出配置對象
ReplSetConfig& cfg = *i;
DEV log( 1) << n+ 1 << " config shows version " << cfg.version << rsLog;
if( ++n == 1 ) myVersion = cfg.version;
if( cfg.ok() && cfg.version > v ) { // 如果可用,並且配置版本比當前高,則存儲起來
highest = &cfg;
v = cfg.version;
}
}
verify( highest );
if( !initFromConfig(*highest) ) // 將最高配置對象,進行初始化
return false;
if( highest->version > myVersion && highest->version >= 0 ) {
log() << " replSet got config version " << highest->version << " from a remote, saving locally " << rsLog;
highest->saveConfigLocally(BSONObj());
}
return true;
}
4. ReplSet間的心跳檢測
在配置ReplSet的過程中,通過調用initFromConfig來更新配置。這個函數中會先去找到新添加的節點,如果有新的節點,為新的節點添加心跳檢測的task,如果沒有新的節點,則認為是更新配置,這樣停掉之前所有的心跳檢測task開啟新的task for health。
心跳檢測過程的示意圖:
void ReplSetImpl::startHealthTaskFor(Member *m) {
DEV log() << " starting rsHealthPoll for " << m->fullName() << endl;
ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo()); // 建立心跳檢測task
healthTasks.insert(task); // 插入心跳檢測task集合
task::repeat(task, 2000); // 每隔兩秒心跳檢測一次 (在一次檢測完成以后等待兩秒后再次檢測)
}
以發送請求方說明:
心跳檢測的目是查看其他Repl上的節點是不是存活。
存活的話調用up函數如下:
HeartbeatInfo::numPings++;
mem.authIssue = false;
if( mem.upSince == 0 ) {
log() << " replSet member " << h.toString() << " is up " << rsLog;
mem.upSince = mem.lastHeartbeat;
}
mem.health = 1.0;
mem.lastHeartbeatMsg = info[ " hbmsg "].String();
if( info.hasElement( " opTime ") )
mem.opTime = info[ " opTime "].Date();
// see if this member is in the electable set
// 如果返回的心跳包含投票信息做下面的操作
if( info[ " e "].eoo() ) {
// for backwards compatibility
const Member *member = theReplSet->findById(mem.id());
if (member && member->config().potentiallyHot()) { // 對方是否為活着的節點
theReplSet->addToElectable(mem.id()); // 目標節點獲得投票權
}
else {
theReplSet->rmFromElectable(mem.id()); // 目標節點失去投票權
}
}
// add this server to the electable set if it is within 10
// seconds of the latest optime we know of
else if( info[ " e "].trueValue() &&
mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
if (lastOp > 0 && mem.opTime >= lastOp - 10) {
theReplSet->addToElectable(mem.id());
}
}
else {
theReplSet->rmFromElectable(mem.id());
}
// 如果返回的信息中包含設置信息,做同步設置的工作
be cfg = info[ " config "];
if( cfg.ok() ) {
// received a new config
boost::function< void()> f =
boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
theReplSet->mgr->send(f);
}
}
};
連接失敗時,表示目標節點離線。
// 設置各種健康狀態
mem.authIssue = false;
mem.health = 0.0;
mem.ping = 0;
// 打印log信息
if( mem.upSince || mem.downSince == 0 ) {
mem.upSince = 0;
mem.downSince = jsTime();
mem.hbstate = MemberState::RS_DOWN;
log() << " replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
}
// 剝奪節點投票權
mem.lastHeartbeatMsg = msg;
theReplSet->rmFromElectable(mem.id());
}
如果在配置階段接收到了不是自己列表中節點發出的心跳檢測請求,可以檢測新節點的配置信息。具體代碼在heartbeat.cpp中的CmdReplSetHeartbeat(line:104)中實現。
5. ReplSet間的投票過程
投票過程是伴隨在心跳檢測和消息傳遞的過程中的,下面的代碼閱讀都本着只關心頭片相關過程的代碼閱讀,其他枝節部分就略去了。
投票過程的序列圖:
a) 消息傳遞
消息傳遞的過程是由心跳檢測來完成的,在心跳檢測時,單個節點既是發送方也是接收方。
作為發送方,向其他節點發出:
bool requestHeartbeat( string setName, string from, string memberFullName, BSONObj& result,
int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
if( replSetBlind ) {
return false;
}
// 從這段代碼中看出,節點向外傳出的是節點名稱、節點的設置版本號和路徑
BSONObj cmd = BSON( " replSetHeartbeat " << setName <<
" v " << myCfgVersion <<
" pv " << 1 <<
" checkEmpty " << checkEmpty <<
" from " << from );
// generally not a great idea to do outbound waiting calls in a
// write lock. heartbeats can be slow (multisecond to respond), so
// generally we don't want to be locked, at least not without
// thinking acarefully about it first.
massert( 15900, " can't heartbeat: too much lock ",
!Lock::somethingWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
ScopedConn conn(memberFullName);
return conn.runCommand( " admin ",
cmd,
result,
0,
&AuthenticationTable::getInternalSecurityAuthenticationTable());
}
bool ReplSetHealthPollTask::_requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
if (tries++ % threshold == (threshold - 1)) {
ScopedConn conn(h.toString());
conn.reconnect();
}
Timer timer;
time_t before = curTimeMicros64() / 1000000;
// 調用函數傳出消息
bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
h.toString(), info, theReplSet->config().version, theirConfigVersion);
// info是傳出心跳檢測后的返回值
// 根據返回值同步記錄同步的偏差時間
mem.ping = (unsigned int)timer.millis();
// we set this on any response - we don't get this far if
// couldn't connect because exception is thrown
time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
if ( info[ " time "].isNumber() ) {
long long t = info[ " time "].numberLong();
if( t > after )
mem.skew = ( int) (t - after);
else if( t < before )
mem.skew = ( int) (t - before); // negative
}
else {
// it won't be there if remote hasn't initialized yet
if( info.hasElement( " time ") )
warning() << " heatbeat.time isn't a number: " << info << endl;
mem.skew = INT_MIN;
}
{
// 記錄下其他節點的狀態
be state = info[ " state "];
if( state.ok() )
mem.hbstate = MemberState(state.Int());
}
return ok;
}
void ReplSetHealthPollTask::doWork() {
if ( !theReplSet ) {
LOG( 2) << " replSet not initialized yet, skipping health poll this round " << rsLog;
return;
}
HeartbeatInfo mem = m;
HeartbeatInfo old = mem;
try {
BSONObj info;
int theirConfigVersion = - 10000;
bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
// weight new ping with old pings
// on the first ping, just use the ping value
if (old.ping != 0) {
mem.ping = (unsigned int)((old.ping * . 8) + (mem.ping * . 2));
}
// ……
}
catch(...) {
// ……
}
m = mem;
// 通知更新節點信息
theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
static time_t last = 0;
time_t now = time( 0);
// 判斷節點信息是否被改變
bool changed = mem.changed(old);
if( changed ) {
if( old.hbstate != mem.hbstate )
log() << " replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog;
}
// 當其他節點信息信息改變或者前后兩次連接服務器的時間大於4秒,則更新一下自己節點的狀態(在函數中發出投票請求)
if( changed || now-last> 4 ) {
last = now;
theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
}
}
作為接收方:
virtual bool CmdReplSetHeartbeat::run( const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
// 略過鑒權等工作
// ……
result.append( " set ", theReplSet->name()); // 名稱
result.append( " state ", theReplSet->state().s); // 節點狀態
result.append( " e ", theReplSet->iAmElectable()); // 是否可以參加投票
result.append( " hbmsg ", theReplSet->hbmsg()); // 心跳的一些信息(具體還沒看太明白)
result.append( " time ", ( long long) time( 0)); // 當前的服務器時間
result.appendDate( " opTime ", theReplSet->lastOpTimeWritten.asDate()); // 最后一次寫操作的時間
int v = theReplSet->config().version; // 設置的版本信息
result.append( " v ", v);
if( v > cmdObj[ " v "].Int() )
result << " config " << theReplSet->config().asBson();
return true;
}
b) 檢測那些節點可以投票
結合之前的心跳檢測和后面的消息傳遞,ReplSet會根據心跳檢測的結果調用addToElectable和rmFromElectable來添加和刪除投票節點。
值得一提的是,在Manager::msgCheckNewState()被調用時,會去判斷當前節點是否可以參與投票:
unsigned otherOp = rs->lastOtherOpTime().getSecs();
// make sure the electable set is up-to-date
if (rs->elect.aMajoritySeemsToBeUp() && // 看看是否有選上的可能
rs->iAmPotentiallyHot() && // 看看自己是不是活躍節點
(otherOp == 0 || rs->lastOpTimeWritten.getSecs() >= otherOp - 10)) { // 上次寫操作是否在10秒以內
theReplSet->addToElectable(rs->selfId());
}
else {
theReplSet->rmFromElectable(rs->selfId());
}
// ……
}
c) 確定是不是要發起投票
當確定了那些節點可以投票以后,就要判斷是不是要發起投票了。
void Manager::msgCheckNewState() {
{
// 判斷之前的節點狀態
// ……
const Member *p = rs->box.getPrimary(); //
if( p && p != rs->_self ) {
if( !p->hbinfo().up() ||
!p->hbinfo().hbstate.primary() ) {
p = 0;
rs->box.setOtherPrimary( 0);
}
}
const Member *p2;
{
bool two;
// 判斷一下是不是還有別的主節點
// 如果有兩個主節點說明就等待其他節點自己解決誰是主節點的問題
// 返回的節點是確定的主節點
p2 = findOtherPrimary(two);
if( two ) {
/* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
log() << " replSet info two primaries (transiently) " << rsLog;
return;
}
}
if( p2 ) {
noteARemoteIsPrimary(p2);
return;
}
/* didn't find anyone who wants to be primary */
// 如果包含一個主節點
if( p ) {
/* we are already primary */
// 主節點不是自己,說明有人能做主
if( p != rs->_self ) {
rs->sethbmsg( " error p != rs->self in checkNewState ");
log() << " replSet " << p->fullName() << rsLog;
log() << " replSet " << rs->_self->fullName() << rsLog;
return;
}
// 如果自己是主節點,需要將自己降級
if( rs->elect.shouldRelinquish() ) {
log() << " can't see a majority of the set, relinquishing primary " << rsLog;
rs->relinquish();
}
return;
}
if( !rs->iAmPotentiallyHot() ) { // if not we never try to be primary
OCCASIONALLY log() << " replSet I don't see a primary and I can't elect myself " << endl;
return;
}
// 看自己有沒有可能成為主節點
/* no one seems to be primary. shall we try to elect ourself? */
if( !rs->elect.aMajoritySeemsToBeUp() ) {
static time_t last;
static int n;
int ll = 0;
if( ++n > 5 ) ll++;
if( last + 60 > time( 0 ) ) ll++;
log(ll) << " replSet can't see a majority, will not try to elect self " << rsLog;
last = time( 0);
return;
}
if( !rs->iAmElectable() ) {
return;
}
busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one.
}
try {
// 開始投票
rs->elect.electSelf();
}
catch(RetryAfterSleepException&) {
/* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */
requeue();
}
catch(...) {
log() << " replSet error unexpected assertion in rs manager " << rsLog;
}
busyWithElectSelf = false;
}
d) 投票
作為心跳檢測的接收方,當其他節點信息做了改變或者對某個節點前后連接時差大於4秒,就有可能調用Manager::msgCheckNewState更改自己狀態並且發出投票請求。
與其說是投票不如說是就是一次詢問的過程,就是節點向其他節點詢問自己的狀態是否符合當主機的條件。
bool Consensus::weAreFreshest( bool& allUp, int& nTies) {
const OpTime ord = theReplSet->lastOpTimeWritten;
nTies = 0;
verify( !ord.isNull() );
// 組織請求數據
BSONObj cmd = BSON(
" replSetFresh " << 1 <<
" set " << rs.name() << // 當前節點名稱
" opTime " << Date_t(ord.asDate()) << // 最后一次寫入的時間
" who " << rs._self->fullName() << // 當前節點路徑
" cfgver " << rs._cfg->version << // 當前節點設置的版本號
" id " << rs._self->id()); // 當前節點的id
list<Target> L;
int ver;
/* the following queries arbiters, even though they are never fresh. wonder if that makes sense.
it doesn't, but it could, if they "know" what freshness it one day. so consider removing
arbiters from getTargets() here. although getTargets is used elsewhere for elections; there
arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
not fetching them herein happen.
*/
// 獲得投票列表
rs.getTargets(L, ver);
// 發出請求
multiCommand(cmd, L);
int nok = 0;
allUp = true;
// 處理請求(稍后分析)
// ……
}
bool CmdReplSetFresh::shouldVeto( const BSONObj& cmdObj, string& errmsg) {
// don't veto older versions
if (cmdObj[ " id "].eoo()) {
// they won't be looking for the veto field
return false;
}
unsigned id = cmdObj[ " id "].Int();
const Member* primary = theReplSet->box.getPrimary(); // 當前的主服務器
const Member* hopeful = theReplSet->findById(id); // 希望成為主機的服務器(從上面的代碼看是發送請求方服務器)
const Member *highestPriority = theReplSet->getMostElectable(); // 當前節點心目中的主機
// 以下判斷發現不符合條件的就否決投票
if( !hopeful ) { // 沒有目標服務器
errmsg = str::stream() << " replSet couldn't find member with id " << id;
return true;
}
// 如果當前的服務器是主機,而自己剛剛進行級別的調整
else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
// hbinfo is not updated, so we have to check the primary's last optime separately
errmsg = str::stream() << " I am already primary, " << hopeful->fullName() <<
" can try again once I've stepped down ";
return true;
}
// 如果主服務器,剛剛進行級別調整
else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
// other members might be aware of more up-to-date nodes
errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " <<
primary->fullName() << " is already primary and more up-to-date ";
return true;
}
// 當前節點記錄的最高優先級節點的優先級比目標節點高
else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
return true;
}
// 當前節點不允許投票或者當前節點記錄的最高優先級節點的優先級比目標節點高(不明白為什么又判斷一遍)
if ( !theReplSet->isElectable(id) ||
(highestPriority && highestPriority->config().priority > hopeful->config().priority)) {
return true;
}
return false;
}
bool Consensus::weAreFreshest( bool& allUp, int& nTies) {
// 省略發送請求的部分
// 請求返回后存儲在list<Target>中
int nok = 0;
allUp = true;
for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
// i存儲這其他節點返回的結果
if( i->ok ) {
nok++;
if( i->result[ " fresher "].trueValue() ) { // 當前服務器不是最新的
log() << " not electing self, we are not freshest " << rsLog;
return false;
}
// 根據返回的操作時間,統計與自己時間相同的節點(這部分從代碼上是這個意思,不過后面的解析沒有太懂,從其他地方的注釋看,是個正在開發的功能)
OpTime remoteOrd( i->result[ " opTime "].Date() );
if( remoteOrd == ord )
nTies++;
verify( remoteOrd <= ord );
// 查看是當前節點是否否決自己成為主節點
if( i->result[ " veto "].trueValue() ) {
BSONElement msg = i->result[ " errmsg "];
if (!msg.eoo()) {
log() << " not electing self, " << i->toHost << " would veto with ' " <<
msg.String() << " ' " << rsLog;
}
else {
log() << " not electing self, " << i->toHost << " would veto " << rsLog;
}
return false;
}
}
else {
DEV log() << " replSet freshest returns " << i->result.toString() << rsLog;
allUp = false;
}
}
LOG( 1) << " replSet dev we are freshest of up nodes, nok: " << nok << " nTies: " << nTies << rsLog;
verify( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
return true;
}
如果沒有其他服務器投反對票,那么就向其他服務器發送設置自己為主節點的信息。之后,會向其他節點發送消息通知自己當選的消息,其他節點返回是否贊成,當贊成票過半的時候,自己當選。
// 略去投票部分
// ……
// 分發投票結果的確認部分
BSONObj electCmd = BSON(
" replSetElect " << 1 <<
" set " << rs.name() << // 節點名稱
" who " << me.fullName() << // 節點路徑
" whoid " << me.hbinfo().id() << // 節點的id
" cfgver " << rs._cfg->version << // 節點的配置信息
" round " << OID::gen() /* this is just for diagnostics */
);
int configVersion;
list<Target> L;
rs.getTargets(L, configVersion);
multiCommand(electCmd, L); // 請求發送
void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
BSONObjBuilder& b = *_b;
DEV log() << " replSet received elect msg " << cmd.toString() << rsLog;
else LOG( 2) << " replSet received elect msg " << cmd.toString() << rsLog;
string set = cmd[ " set "].String();
unsigned whoid = cmd[ " whoid "].Int();
int cfgver = cmd[ " cfgver "].Int();
OID round = cmd[ " round "].OID();
int myver = rs.config().version;
const Member* primary = rs.box.getPrimary();
const Member* hopeful = rs.findById(whoid);
const Member* highestPriority = rs.getMostElectable();
int vote = 0;
if( set != rs.name() ) {
log() << " replSet error received an elect request for ' " << set << " ' but our set name is ' " << rs.name() << " ' " << rsLog;
}
else if( myver < cfgver ) { // 如果接收方的版本號小於目標節點的版本號
// we are stale. don't vote 不做其他處理
}
else if( myver > cfgver ) { // 如果接收方的版本號大於目標節點版本號,對方版本號過期,不贊成
// they are stale!
log() << " replSet electCmdReceived info got stale version # during election " << rsLog;
vote = - 10000;
}
else if( !hopeful ) { // 目標節點id沒有在接收方節點中掛名,不贊成
log() << " replSet electCmdReceived couldn't find member with id " << whoid << rsLog;
vote = - 10000;
}
// 如果主節點是自己,並且自己的最后寫入時間比目標節點新,不贊成
else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
// hbinfo is not updated, so we have to check the primary's last optime separately
log() << " I am already primary, " << hopeful->fullName()
<< " can try again once I've stepped down " << rsLog;
vote = - 10000;
}
// 如果接收方認為的主節點的寫入時間比目標節點的寫入時間新,不贊成
else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
// other members might be aware of more up-to-date nodes
log() << hopeful->fullName() << " is trying to elect itself but " <<
primary->fullName() << " is already primary and more up-to-date " << rsLog;
vote = - 10000;
}
// 接收方認為當選呼聲最高的節點比目標節點的當選呼聲高,不贊成
else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
vote = - 10000;
}
else {
// 如果滿足上面的所有條件
try {
vote = yea(whoid); // 算出贊成度
dassert( hopeful->id() == whoid );
rs.relinquish();
log() << " replSet info voting yea for " << hopeful->fullName() << " ( " << whoid << ' ) ' << rsLog;
}
catch(VoteException&) {
log() << " replSet voting no for " << hopeful->fullName() << " already voted for another " << rsLog;
}
}
// 組織返回數據
b.append( " vote ", vote);
b.append( " round ", round);
}
{
for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
DEV log() << " replSet elect res: " << i->result.toString() << rsLog;
if( i->ok ) {
int v = i->result[ " vote "].Int();
tally += v; // 統計贊成結果
}
}
if( tally* 2 <= totalVotes() ) { // 贊成結果小於投票總數的一半,終止成為主節點
log() << " replSet couldn't elect self, only received " << tally << " votes " << rsLog;
}
else if( time( 0) - start > 30 ) { // 投票時間大於三十秒,終止成為主節點
// defensive; should never happen as we have timeouts on connection and operation for our conn
log() << " replSet too much time passed during our election, ignoring result " << rsLog;
}
else if( configVersion != rs.config().version ) { // 傳出的版本號與返回的版本號不一致(說明投票過程中有版本修改),終止成為主節點
log() << " replSet config version changed during our election, ignoring result " << rsLog;
}
else {
/* succeeded. */
log( 1) << " replSet election succeeded, assuming primary role " << rsLog;
success = true;
rs.assumePrimary(); // 當選為主節點
}
}
e) 總結投票過程
在心跳檢測中,節點間互相傳遞着信息。通過這些信息,節點能了解到其他節點的情況(配置版本,是否能連接上等等)。單個節點統計着這些信息,當某個節點設置發生改變,或者網絡連接出現異常的時候,開始發送投票事件。
投票時,首先根據心跳檢測記錄的信息判斷哪些節點可以被連接到,即有投票權。之后向所有可以連接到的節點,發出自己能否成為主節點的請求。其他節點投是否否決票,如果一致通過,該節點進行下一步操作:向其他節點發出自己當選的請求,其他節點根據情況確定請求,如果返回的贊成數大於投票總數的一半,該節點當選。