MongoDB源碼閱讀之ReplSet源碼分析


1. ReplSet源碼結構

rs_config.h replSet間同步設置的工具類

rs_member.h 心跳檢測類和replSet成員狀態的定義

rs_sync.h 同步數據類

rs.h 定義了幾乎所有replSet相關的類(Member:replSet中的節點成員,

GhostSync:備份同步類,ReplSet:管理所有Member的核心類)

2. ReplSet的結構分析

60L`9Y@OK2F{ESE(L~V{D@P

RSBase定義了線程鎖。

ReplSetImpl完成了大部分Replication Set的操作:心跳檢測,選舉,設置主節點,查詢節點信息。

Member記錄其他節點的信息,Manager查詢節點信息,ReplSetHealthPoolTask心跳檢測,ReplSetConfig同步設置,Consensus選舉主節點,StateBox設置主節點。

ReplSet封裝了很多方便的操作。

3. 創建並設置ReplSet

在mongod開始運行后(mongoDBMain),會啟動監聽程序同時,根據運行參數中是否標明了”--replSet”,調用startReplication來創建ReplSet。

     void startReplication() {
         /*  if we are going to be a replica set, we aren't doing other forms of replication.  */
         if( !cmdLine._replSet.empty() ) {   // 看看參數里面有沒有--replSet
             if( replSettings.slave || replSettings.master ) {    // 這個參數不能與—slave與—master共存
                log() <<  " *** " << endl;
                log() <<  " ERROR: can't use --slave or --master replication options with --replSet " << endl;
                log() <<  " *** " << endl;
            }
            newRepl();  // 將自己標示成為一個新的Replication節點

            replSet =  true;
            ReplSetCmdline *replSetCmdline =  new ReplSetCmdline(cmdLine._replSet);
            boost::thread t( boost::bind( &startReplSets, replSetCmdline) );   // 啟動線程完成真正的ReplSet創建

             return;
        }
         // ……
   }

     void startReplSets(ReplSetCmdline *replSetCmdline) {   // 線程中執行的函數
        Client::initThread( " rsStart ");
         try {
            verify( theReplSet ==  0 );
             if( replSetCmdline ==  0 ) {
                verify(!replSet);
                 return;
            }
            replLocalAuth();
            (theReplSet =  new ReplSet(*replSetCmdline))->go();  // 創建一個ReplSet賦值到一個全局的指針中,隨即調用go啟動
        }
         catch(std::exception& e) {
            log() <<  " replSet caught exception in startReplSets thread:  " << e.what() << rsLog;
             if( theReplSet )
                theReplSet->fatal();
        }
        cc().shutdown();
    }

 

在構造ReplSetImpl時,會調用ReplSetImpl::loadConfig去設置ReplSet。

 

     void ReplSetImpl::loadConfig() {
        startupStatus = LOADINGCONFIG;  // 標注狀態為讀取config
        startupStatusMsg. set( " loading  " + rsConfigNs +  "  config (LOADINGCONFIG) ");
        LOG( 1) <<  " loadConfig()  " << rsConfigNs << endl;

         while1 ) {    // 循環獲取配置
             try {
                vector<ReplSetConfig> configs;
                 try {
                    configs.push_back( ReplSetConfig(HostAndPort::me()) );   // 嘗試從本機端口獲取配置
                }
                 catch(DBException& e) {
                    log() <<  " replSet exception loading our local replset configuration object :  " << e.toString() << rsLog;
                }

                 // 這里的種子是用戶配置的
                 for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {   
                     try {
                        configs.push_back( ReplSetConfig(*i) );    // 嘗試從其他設備上獲取配置
                    }
                     catch( DBException& e ) {
                        log() <<  " replSet exception trying to load config from  " << *i <<  "  :  " << e.toString() << rsLog;
                    }
                }
                {
                    scoped_lock lck( replSettings.discoveredSeeds_mx );
                     if( replSettings.discoveredSeeds.size() >  0 ) {     // 其他線程搜索的其他設備(心跳檢測中提供的節點)
                         for ( set< string>::iterator i = replSettings.discoveredSeeds.begin(); 
                             i != replSettings.discoveredSeeds.end(); 
                             i++) {
                             try {
                                configs.push_back( ReplSetConfig(HostAndPort(*i)) );    // 從新搜索的設備中獲取配置
                            }
                             catch( DBException& ) {
                                log( 1) <<  " replSet exception trying to load config from discovered seed  " << *i << rsLog;
                                replSettings.discoveredSeeds.erase(*i);
                            }
                        }
                    }
                }

                 if (!replSettings.reconfig.isEmpty()) {
                     try {
                        configs.push_back(ReplSetConfig(replSettings.reconfig,  true));  // 從shell獲取配置
                    }
                     catch( DBException& re) {
                        log() <<  " replSet couldn't load reconfig:  " << re.what() << rsLog;
                        replSettings.reconfig = BSONObj();
                    }
                }

                 int nok =  0;
                 int nempty =  0;
                 for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
                     if( i->ok() )    // 判斷得到的設置是否可用
                        nok++;
                     if( i->empty() )    // 判斷得到配置是否為空
                        nempty++;    
                }
                 if( nok ==  0 ) {

                     if( nempty == ( int) configs.size() ) {    // 如果沒有獲取到配置信息
                        startupStatus = EMPTYCONFIG;    // 標注狀態
                        startupStatusMsg. set( " can't get  " + rsConfigNs +  "  config from self or any seed (EMPTYCONFIG) ");
                        log() <<  " replSet can't get  " << rsConfigNs <<  "  config from self or any seed (EMPTYCONFIG) " << rsLog;
                         static unsigned once;
                         if( ++once ==  1 ) {
                            log() <<  " replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done " << rsLog;
                        }
                         if( _seeds->size() ==  0 ) {
                            LOG( 1) <<  " replSet info no seed hosts were specified on the --replSet command line " << rsLog;
                        }
                    }
                     else {
                        startupStatus = EMPTYUNREACHABLE;
                        startupStatusMsg. set( " can't currently get  " + rsConfigNs +  "  config from self or any seed (EMPTYUNREACHABLE) ");
                        log() <<  " replSet can't get  " << rsConfigNs <<  "  config from self or any seed (yet) " << rsLog;
                    }

                    sleepsecs( 10);   // 十秒后重試
                     continue;
                }

                 if( !_loadConfigFinish(configs) ) {      // 找到了配置信息但是配置失敗
                    log() <<  " replSet info Couldn't load config yet. Sleeping 20sec and will try again. " << rsLog;
                    sleepsecs( 20);   // 二十秒后重試
                     continue;
                }
            }
             catch(DBException& e) {
                startupStatus = BADCONFIG;
                startupStatusMsg. set( " replSet error loading set config (BADCONFIG) ");
                log() <<  " replSet error loading configurations  " << e.toString() << rsLog;
                log() <<  " replSet error replication will not start " << rsLog;
                sethbmsg( " error loading set config ");
                _fatal();
                 throw;
            }
             break;    // 設置成功跳出循環
        }
        startupStatusMsg. set( " ? started ");
        startupStatus = STARTED;    // 標注狀態為開始運行
    }

從代碼上看,設置過程為先去各個地方找到可用的配置。之后,統計其中可用的配置,如果沒有可用的配置也沒有不為空的配置,則輸出log提示用戶設置。當找到了可用的配置之后,將其數組發送到_loadConfigFinish函數進行設置。

ReplSet的設置工作:

     //  Our own config must be the first one.
     bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
         int v = - 1;     // 當前版本號
        ReplSetConfig *highest =  0;    
         int myVersion = - 2000;    
         int n =  0;
         for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {    // 循環取出配置對象
            ReplSetConfig& cfg = *i;
            DEV log( 1) << n+ 1 <<  "  config shows version  " << cfg.version << rsLog; 
             if( ++n ==  1 ) myVersion = cfg.version;
             if( cfg.ok() && cfg.version > v ) {     // 如果可用,並且配置版本比當前高,則存儲起來
                highest = &cfg;
                v = cfg.version;
            }
        }
        verify( highest );

         if( !initFromConfig(*highest) )    // 將最高配置對象,進行初始化
             return  false;

         if( highest->version > myVersion && highest->version >=  0 ) {
            log() <<  " replSet got config version  " << highest->version <<  "  from a remote, saving locally " << rsLog;
            highest->saveConfigLocally(BSONObj());
        }
         return  true;
    }

 

4. ReplSet間的心跳檢測

在配置ReplSet的過程中,通過調用initFromConfig來更新配置。這個函數中會先去找到新添加的節點,如果有新的節點,為新的節點添加心跳檢測的task,如果沒有新的節點,則認為是更新配置,這樣停掉之前所有的心跳檢測task開啟新的task for health。

MWI[ZFN(_[}9}V3ZJ`PU)FA

心跳檢測過程的示意圖:

// 開啟一個心跳檢測task
     void ReplSetImpl::startHealthTaskFor(Member *m) {
        DEV log() <<  " starting rsHealthPoll for  " << m->fullName() << endl;
        ReplSetHealthPollTask *task =  new ReplSetHealthPollTask(m->h(), m->hbinfo());    // 建立心跳檢測task
        healthTasks.insert(task);      // 插入心跳檢測task集合
        task::repeat(task,  2000);      // 每隔兩秒心跳檢測一次 (在一次檢測完成以后等待兩秒后再次檢測)
    }

 

以發送請求方說明:

心跳檢測的目是查看其他Repl上的節點是不是存活。

存活的話調用up函數如下:

 

         void ReplSetHealthPollTask::up( const BSONObj& info, HeartbeatInfo& mem) {
            HeartbeatInfo::numPings++;
            mem.authIssue =  false;

             if( mem.upSince ==  0 ) {
                log() <<  " replSet member  " << h.toString() <<  "  is up " << rsLog;
                mem.upSince = mem.lastHeartbeat;
            }
            mem.health =  1.0;
            mem.lastHeartbeatMsg = info[ " hbmsg "].String();
             if( info.hasElement( " opTime ") )
                mem.opTime = info[ " opTime "].Date();

             //  see if this member is in the electable set
            
// 如果返回的心跳包含投票信息做下面的操作
             if( info[ " e "].eoo() ) {    
                 //  for backwards compatibility
                 const Member *member = theReplSet->findById(mem.id());
                 if (member && member->config().potentiallyHot()) {    // 對方是否為活着的節點
                    theReplSet->addToElectable(mem.id());      // 目標節點獲得投票權
                }
                 else {
                    theReplSet->rmFromElectable(mem.id());     // 目標節點失去投票權
                }
            }
             //  add this server to the electable set if it is within 10
            
//  seconds of the latest optime we know of
             else  if( info[ " e "].trueValue() &&
                     mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() -  10) {
                unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
                 if (lastOp >  0 && mem.opTime >= lastOp -  10) {
                    theReplSet->addToElectable(mem.id());
                }
            }
             else {
                theReplSet->rmFromElectable(mem.id());
            }

             // 如果返回的信息中包含設置信息,做同步設置的工作
            be cfg = info[ " config "];
             if( cfg.ok() ) {
                 //  received a new config
                boost::function< void()> f =
                    boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
                theReplSet->mgr->send(f);
            }
        }
    };

連接失敗時,表示目標節點離線。

 

         void ReplSetHealthPollTask::down(HeartbeatInfo& mem,  string msg) {
             // 設置各種健康狀態
            mem.authIssue =  false;
            mem.health =  0.0;
            mem.ping =  0;
             // 打印log信息
             if( mem.upSince || mem.downSince ==  0 ) {
                mem.upSince =  0;
                mem.downSince = jsTime();
                mem.hbstate = MemberState::RS_DOWN;
                log() <<  " replSet info  " << h.toString() <<  "  is down (or slow to respond):  " << msg << rsLog;
            }
             // 剝奪節點投票權
            mem.lastHeartbeatMsg = msg;
            theReplSet->rmFromElectable(mem.id());
        }

如果在配置階段接收到了不是自己列表中節點發出的心跳檢測請求,可以檢測新節點的配置信息。具體代碼在heartbeat.cpp中的CmdReplSetHeartbeat(line:104)中實現。

5. ReplSet間的投票過程

投票過程是伴隨在心跳檢測和消息傳遞的過程中的,下面的代碼閱讀都本着只關心頭片相關過程的代碼閱讀,其他枝節部分就略去了。

投票過程的序列圖:

0{DW)`W4H1ZRLTE2WGTAA0I

a) 消息傳遞

消息傳遞的過程是由心跳檢測來完成的,在心跳檢測時,單個節點既是發送方也是接收方。

作為發送方,向其他節點發出:

 

// heartbeat.cpp line 134
     bool requestHeartbeat( string setName,  string  fromstring memberFullName, BSONObj& result,
                           int myCfgVersion,  int& theirCfgVersion,  bool checkEmpty) {
         if( replSetBlind ) {
             return  false;
        }

         // 從這段代碼中看出,節點向外傳出的是節點名稱、節點的設置版本號和路徑
        BSONObj cmd = BSON(  " replSetHeartbeat " << setName <<
                             " v " << myCfgVersion <<
                             " pv " <<  1 <<
                             " checkEmpty " << checkEmpty <<
                             " from " <<  from );

         //  generally not a great idea to do outbound waiting calls in a
        
//  write lock. heartbeats can be slow (multisecond to respond), so
        
//  generally we don't want to be locked, at least not without
        
//  thinking acarefully about it first.
        massert( 15900" can't heartbeat: too much lock ",
                !Lock::somethingWriteLocked() || theReplSet ==  0 || !theReplSet->lockedByMe() );

        ScopedConn conn(memberFullName);
         return conn.runCommand( " admin ",
                               cmd,
                               result,
                                0,
                               &AuthenticationTable::getInternalSecurityAuthenticationTable());
    }


// 消息返回后
         bool ReplSetHealthPollTask::_requestHeartbeat(HeartbeatInfo& mem, BSONObj& info,  int& theirConfigVersion) {
             if (tries++ % threshold == (threshold -  1)) {
                ScopedConn conn(h.toString());
                conn.reconnect();
            }

            Timer timer;
            time_t before = curTimeMicros64() /  1000000;

             // 調用函數傳出消息
             bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
                                       h.toString(), info, theReplSet->config().version, theirConfigVersion);
         // info是傳出心跳檢測后的返回值
       
// 根據返回值同步記錄同步的偏差時間
            mem.ping = (unsigned  int)timer.millis();

             //  we set this on any response - we don't get this far if
            
//  couldn't connect because exception is thrown
            time_t after = mem.lastHeartbeat = before + (mem.ping /  1000);

             if ( info[ " time "].isNumber() ) {
                 long  long t = info[ " time "].numberLong();
                 if( t > after )
                    mem.skew = ( int) (t - after);
                 else  if( t < before )
                    mem.skew = ( int) (t - before);  //  negative
            }
             else {
                 //  it won't be there if remote hasn't initialized yet
                 if( info.hasElement( " time ") )
                    warning() <<  " heatbeat.time isn't a number:  " << info << endl;
                mem.skew = INT_MIN;
            }

            {
                 // 記錄下其他節點的狀態
                be state = info[ " state "];
                 if( state.ok() )
                    mem.hbstate = MemberState(state.Int());
            }

             return ok;
        }

 

 

         // 消息返回后的另一部分狀態處理
         void ReplSetHealthPollTask::doWork() {
             if ( !theReplSet ) {
                LOG( 2) <<  " replSet not initialized yet, skipping health poll this round " << rsLog;
                 return;
            }

            HeartbeatInfo mem = m;
            HeartbeatInfo old = mem;
             try {
                BSONObj info;
                 int theirConfigVersion = - 10000;

                 bool ok = _requestHeartbeat(mem, info, theirConfigVersion);

                 //  weight new ping with old pings
                
//  on the first ping, just use the ping value
                 if (old.ping !=  0) {
                    mem.ping = (unsigned  int)((old.ping * . 8) + (mem.ping * . 2));
                }

                 // ……
            }
             catch(...) {
                 // ……
            }
            m = mem;

             // 通知更新節點信息
            theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );

             static time_t last =  0;
            time_t now = time( 0);
             // 判斷節點信息是否被改變
             bool changed = mem.changed(old);
             if( changed ) {
                 if( old.hbstate != mem.hbstate )
                    log() <<  " replSet member  " << h.toString() <<  "  is now in state  " << mem.hbstate.toString() << rsLog;
            }
             // 當其他節點信息信息改變或者前后兩次連接服務器的時間大於4秒,則更新一下自己節點的狀態(在函數中發出投票請求)
             if( changed || now-last> 4 ) {
                last = now;
                theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
            }
        }

作為接收方:

         // 接收到數據后會調用這個函數
         virtual  bool CmdReplSetHeartbeat::run( const  string& , BSONObj& cmdObj,  intstring& errmsg, BSONObjBuilder& result,  bool fromRepl) {
             // 略過鑒權等工作
            
// ……
            result.append( " set ", theReplSet->name());        // 名稱
            result.append( " state ", theReplSet->state().s);      // 節點狀態
            result.append( " e ", theReplSet->iAmElectable());   // 是否可以參加投票
            result.append( " hbmsg ", theReplSet->hbmsg());    // 心跳的一些信息(具體還沒看太明白)
            result.append( " time ", ( long  long) time( 0));        // 當前的服務器時間
            result.appendDate( " opTime ", theReplSet->lastOpTimeWritten.asDate());  // 最后一次寫操作的時間
             int v = theReplSet->config().version;     // 設置的版本信息
            result.append( " v ", v);
             if( v > cmdObj[ " v "].Int() )
                result <<  " config " << theReplSet->config().asBson();

             return  true;
        }

 

b) 檢測那些節點可以投票

結合之前的心跳檢測和后面的消息傳遞,ReplSet會根據心跳檢測的結果調用addToElectable和rmFromElectable來添加和刪除投票節點。

值得一提的是,在Manager::msgCheckNewState()被調用時,會去判斷當前節點是否可以參與投票:

     void Manager::checkElectableSet() {
        unsigned otherOp = rs->lastOtherOpTime().getSecs();
        
         //  make sure the electable set is up-to-date
         if (rs->elect.aMajoritySeemsToBeUp() &&           // 看看是否有選上的可能
            rs->iAmPotentiallyHot() &&                   // 看看自己是不是活躍節點
            (otherOp ==  0 || rs->lastOpTimeWritten.getSecs() >= otherOp -  10)) {        // 上次寫操作是否在10秒以內
            theReplSet->addToElectable(rs->selfId());
        }
         else {
            theReplSet->rmFromElectable(rs->selfId());
        }

// ……
}

 c) 確定是不是要發起投票

當確定了那些節點可以投票以后,就要判斷是不是要發起投票了。

     /* * called as the health threads get new results  */
     void Manager::msgCheckNewState() {
        {
             // 判斷之前的節點狀態
            
// ……
             const Member *p = rs->box.getPrimary();          //
             if( p && p != rs->_self ) {
                 if( !p->hbinfo().up() ||
                        !p->hbinfo().hbstate.primary() ) {
                    p =  0;
                    rs->box.setOtherPrimary( 0);
                }
            }

             const Member *p2;
            {
                 bool two;
                 // 判斷一下是不是還有別的主節點
                
// 如果有兩個主節點說明就等待其他節點自己解決誰是主節點的問題
                
// 返回的節點是確定的主節點
                p2 = findOtherPrimary(two);
                 if( two ) {
                     /*  two other nodes think they are primary (asynchronously polled) -- wait for things to settle down.  */
                    log() <<  " replSet info two primaries (transiently) " << rsLog;
                     return;
                }
            }

             if( p2 ) {
                noteARemoteIsPrimary(p2);
                 return;
            }

             /*  didn't find anyone who wants to be primary  */
            // 如果包含一個主節點
             if( p ) {
                 /*  we are already primary  */
                 // 主節點不是自己,說明有人能做主
                 if( p != rs->_self ) {
                    rs->sethbmsg( " error p != rs->self in checkNewState ");
                    log() <<  " replSet  " << p->fullName() << rsLog;
                    log() <<  " replSet  " << rs->_self->fullName() << rsLog;
                     return;
                }
                
                 // 如果自己是主節點,需要將自己降級
                 if( rs->elect.shouldRelinquish() ) {
                    log() <<  " can't see a majority of the set, relinquishing primary " << rsLog;
                    rs->relinquish();
                }

                 return;
            }

             if( !rs->iAmPotentiallyHot() ) {  //  if not we never try to be primary
                OCCASIONALLY log() <<  " replSet I don't see a primary and I can't elect myself " << endl;
                 return;
            }

             // 看自己有沒有可能成為主節點 
             /*  no one seems to be primary.  shall we try to elect ourself?  */
             if( !rs->elect.aMajoritySeemsToBeUp() ) {
                 static time_t last;
                 static  int n;
                 int ll =  0;
                 if( ++n >  5 ) ll++;
                 if( last +  60 > time( 0 ) ) ll++;
                log(ll) <<  " replSet can't see a majority, will not try to elect self " << rsLog;
                last = time( 0);
                 return;
            }

             if( !rs->iAmElectable() ) {
                 return;
            }

            busyWithElectSelf =  true//  don't try to do further elections & such while we are already working on one.
        }
         try {
             // 開始投票
            rs->elect.electSelf();
        }
         catch(RetryAfterSleepException&) {
             /*  we want to process new inbounds before trying this again.  so we just put a checkNewstate in the queue for eval later.  */
            requeue();
        }
         catch(...) {
            log() <<  " replSet error unexpected assertion in rs manager " << rsLog;
        }
        busyWithElectSelf =  false;
    }

 

d) 投票

作為心跳檢測的接收方,當其他節點信息做了改變或者對某個節點前后連接時差大於4秒,就有可能調用Manager::msgCheckNewState更改自己狀態並且發出投票請求。

與其說是投票不如說是就是一次詢問的過程,就是節點向其他節點詢問自己的狀態是否符合當主機的條件。

     // 詢問其他節點
     bool Consensus::weAreFreshest( bool& allUp,  int& nTies) {
         const OpTime ord = theReplSet->lastOpTimeWritten;
        nTies =  0;
        verify( !ord.isNull() );
         // 組織請求數據
        BSONObj cmd = BSON(
                           " replSetFresh " <<  1 <<          
                           " set " << rs.name() <<                  // 當前節點名稱
                           " opTime " << Date_t(ord.asDate()) <<     // 最后一次寫入的時間
                           " who " << rs._self->fullName() <<        // 當前節點路徑
                           " cfgver " << rs._cfg->version <<          // 當前節點設置的版本號
                           " id " << rs._self->id());                  // 當前節點的id
        list<Target> L;
         int ver;
         /*  the following queries arbiters, even though they are never fresh.  wonder if that makes sense.
           it doesn't, but it could, if they "know" what freshness it one day.  so consider removing
           arbiters from getTargets() here.  although getTargets is used elsewhere for elections; there
           arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
           not fetching them herein happen.
           
*/
         // 獲得投票列表
        rs.getTargets(L, ver);

         // 發出請求
        multiCommand(cmd, L);
         int nok =  0;
        allUp =  true;

         // 處理請求(稍后分析)
        
// ……
    }
// 收到請求后的處理,忽略了驗證處理和比較版本配置版本號的代碼
         bool CmdReplSetFresh::shouldVeto( const BSONObj& cmdObj,  string& errmsg) {
             //  don't veto older versions
             if (cmdObj[ " id "].eoo()) {
                 //  they won't be looking for the veto field
                 return  false;
            }

            unsigned id = cmdObj[ " id "].Int();
             const Member* primary = theReplSet->box.getPrimary();            // 當前的主服務器
             const Member* hopeful = theReplSet->findById(id);                // 希望成為主機的服務器(從上面的代碼看是發送請求方服務器)
             const Member *highestPriority = theReplSet->getMostElectable();     // 當前節點心目中的主機

            
// 以下判斷發現不符合條件的就否決投票
             if( !hopeful ) {   // 沒有目標服務器
                errmsg = str::stream() <<  " replSet couldn't find member with id  " << id;
                 return  true;
            }
             // 如果當前的服務器是主機,而自己剛剛進行級別的調整
             else  if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) {  
                 //  hbinfo is not updated, so we have to check the primary's last optime separately
                errmsg = str::stream() <<  " I am already primary,  " << hopeful->fullName() <<
                     "  can try again once I've stepped down ";
                 return  true;
            }
             // 如果主服務器,剛剛進行級別調整
             else  if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
                 //  other members might be aware of more up-to-date nodes
                errmsg = str::stream() << hopeful->fullName() <<  "  is trying to elect itself but  " <<
                    primary->fullName() <<  "  is already primary and more up-to-date ";
                 return  true;
            }
             // 當前節點記錄的最高優先級節點的優先級比目標節點高
             else  if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
                errmsg = str::stream() << hopeful->fullName() <<  "  has lower priority than  " << highestPriority->fullName();
                 return  true;
            }

             // 當前節點不允許投票或者當前節點記錄的最高優先級節點的優先級比目標節點高(不明白為什么又判斷一遍)
             if ( !theReplSet->isElectable(id) ||
                (highestPriority && highestPriority->config().priority > hopeful->config().priority)) {
                 return  true;
            }

             return  false;
        }
// 得到投票數據以后解析
bool Consensus::weAreFreshest( bool& allUp,  int& nTies) {
        // 省略發送請求的部分
       
// 請求返回后存儲在list<Target>中
         int nok =  0;
        allUp =  true;
         for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
         // i存儲這其他節點返回的結果
             if( i->ok ) {
                nok++;
                 if( i->result[ " fresher "].trueValue() ) {       // 當前服務器不是最新的
                    log() <<  " not electing self, we are not freshest " << rsLog;
                     return  false;
                }
                 // 根據返回的操作時間,統計與自己時間相同的節點(這部分從代碼上是這個意思,不過后面的解析沒有太懂,從其他地方的注釋看,是個正在開發的功能)
                OpTime remoteOrd( i->result[ " opTime "].Date() );
                 if( remoteOrd == ord )
                    nTies++;     
                verify( remoteOrd <= ord );

                 // 查看是當前節點是否否決自己成為主節點
                 if( i->result[ " veto "].trueValue() ) {
                    BSONElement msg = i->result[ " errmsg "];
                     if (!msg.eoo()) {
                        log() <<  " not electing self,  " << i->toHost <<  "  would veto with ' " <<
                            msg.String() <<  " ' " << rsLog;
                    }
                     else {
                        log() <<  " not electing self,  " << i->toHost <<  "  would veto " << rsLog;
                    }
                     return  false;
                }
            }
             else {
                DEV log() <<  " replSet freshest returns  " << i->result.toString() << rsLog;
                allUp =  false;
            }
        }
        LOG( 1) <<  " replSet dev we are freshest of up nodes, nok: " << nok <<  "  nTies: " << nTies << rsLog;
        verify( ord <= theReplSet->lastOpTimeWritten );  //  <= as this may change while we are working...
         return  true;
     }

 如果沒有其他服務器投反對票,那么就向其他服務器發送設置自己為主節點的信息。之后,會向其他節點發送消息通知自己當選的消息,其他節點返回是否贊成,當贊成票過半的時候,自己當選。

            void Consensus::_electSelf() {
            // 略去投票部分
            
// ……

             
// 分發投票結果的確認部分
            BSONObj electCmd = BSON(
                                    " replSetElect " <<  1 <<
                                    " set " << rs.name() <<               // 節點名稱
                                    " who " << me.fullName() <<         // 節點路徑
                                    " whoid " << me.hbinfo().id() <<       // 節點的id
                                    " cfgver " << rs._cfg->version <<       // 節點的配置信息
                                    " round " << OID::gen()  /*  this is just for diagnostics  */
                               );

             int configVersion;
            list<Target> L;
            rs.getTargets(L, configVersion);
            multiCommand(electCmd, L);     // 請求發送

 

       // 處理確認請求
     void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
        BSONObjBuilder& b = *_b;
        DEV log() <<  " replSet received elect msg  " << cmd.toString() << rsLog;
         else LOG( 2) <<  " replSet received elect msg  " << cmd.toString() << rsLog;
         string  set = cmd[ " set "].String();
        unsigned whoid = cmd[ " whoid "].Int();
         int cfgver = cmd[ " cfgver "].Int();
        OID round = cmd[ " round "].OID();
         int myver = rs.config().version;

         const Member* primary = rs.box.getPrimary();
         const Member* hopeful = rs.findById(whoid);
         const Member* highestPriority = rs.getMostElectable();

         int vote =  0;
         ifset != rs.name() ) {
            log() <<  " replSet error received an elect request for ' " <<  set <<  " ' but our set name is ' " << rs.name() <<  " ' " << rsLog;
        }
         else  if( myver < cfgver ) {     // 如果接收方的版本號小於目標節點的版本號
            
//  we are stale.  don't vote  不做其他處理
        }
         else  if( myver > cfgver ) {     // 如果接收方的版本號大於目標節點版本號,對方版本號過期,不贊成
            
//  they are stale!
            log() <<  " replSet electCmdReceived info got stale version # during election " << rsLog;
            vote = - 10000;
        }
         else  if( !hopeful ) {     // 目標節點id沒有在接收方節點中掛名,不贊成
            log() <<  " replSet electCmdReceived couldn't find member with id  " << whoid << rsLog;
            vote = - 10000;
        }
         // 如果主節點是自己,並且自己的最后寫入時間比目標節點新,不贊成
         else  if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
             //  hbinfo is not updated, so we have to check the primary's last optime separately
            log() <<  " I am already primary,  " << hopeful->fullName()
                  <<  "  can try again once I've stepped down " << rsLog;
            vote = - 10000;
        }
         // 如果接收方認為的主節點的寫入時間比目標節點的寫入時間新,不贊成
         else  if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
             //  other members might be aware of more up-to-date nodes
            log() << hopeful->fullName() <<  "  is trying to elect itself but  " <<
                  primary->fullName() <<  "  is already primary and more up-to-date " << rsLog;
            vote = - 10000;
        }
         // 接收方認為當選呼聲最高的節點比目標節點的當選呼聲高,不贊成
         else  if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
            log() << hopeful->fullName() <<  "  has lower priority than  " << highestPriority->fullName();
            vote = - 10000;
        }
         else {
             // 如果滿足上面的所有條件
             try {
                vote = yea(whoid);      // 算出贊成度
                dassert( hopeful->id() == whoid );
                rs.relinquish();
                log() <<  " replSet info voting yea for  " <<  hopeful->fullName() <<  "  ( " << whoid <<  ' ) ' << rsLog;
            }
             catch(VoteException&) {
                log() <<  " replSet voting no for  " << hopeful->fullName() <<  "  already voted for another " << rsLog;
            }
        }
         // 組織返回數據
        b.append( " vote ", vote);  
        b.append( " round ", round);
    }
            // 處理返回結果 
           {
                 for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
                    DEV log() <<  " replSet elect res:  " << i->result.toString() << rsLog;
                     if( i->ok ) {
                         int v = i->result[ " vote "].Int();
                        tally += v;    // 統計贊成結果
                    }
                }
                 if( tally* 2 <= totalVotes() ) {   // 贊成結果小於投票總數的一半,終止成為主節點
                    log() <<  " replSet couldn't elect self, only received  " << tally <<  "  votes " << rsLog;
                }
                 else  if( time( 0) - start >  30 ) {   // 投票時間大於三十秒,終止成為主節點
                    
//  defensive; should never happen as we have timeouts on connection and operation for our conn
                    log() <<  " replSet too much time passed during our election, ignoring result " << rsLog;
                }
                 else  if( configVersion != rs.config().version ) {    // 傳出的版本號與返回的版本號不一致(說明投票過程中有版本修改),終止成為主節點
                    log() <<  " replSet config version changed during our election, ignoring result " << rsLog;
                }
                 else {
                     /*  succeeded.  */
                    log( 1) <<  " replSet election succeeded, assuming primary role " << rsLog;
                    success =  true;
                    rs.assumePrimary();   // 當選為主節點
                }
            }
 e) 總結投票過程

在心跳檢測中,節點間互相傳遞着信息。通過這些信息,節點能了解到其他節點的情況(配置版本,是否能連接上等等)。單個節點統計着這些信息,當某個節點設置發生改變,或者網絡連接出現異常的時候,開始發送投票事件。

投票時,首先根據心跳檢測記錄的信息判斷哪些節點可以被連接到,即有投票權。之后向所有可以連接到的節點,發出自己能否成為主節點的請求。其他節點投是否否決票,如果一致通過,該節點進行下一步操作:向其他節點發出自己當選的請求,其他節點根據情況確定請求,如果返回的贊成數大於投票總數的一半,該節點當選。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM