MongoDB源碼閱讀之Shard源碼分析--CongfigServer啟動


1.      名詞解釋

Shards : 每一個shard包括一個或多個服務和存儲數據的mongod進程(mongod是MongoDB數據的核心進程)典型的每個shard開啟多個服務來提高服務的可用性。這些服務/mongod進程在shard中組成一個復制集

Chunks: Chunk是一個來自特殊集合中的一個數據范圍,(collection,minKey,maxKey)描敘一個chunk,它介於minKey和maxKey范圍之間。例如chunks 的maxsize大小是100M,如果一個文件達到或超過這個范圍時,會被切分到2個新的chunks中。當一個shard的數據過量時,chunks將會被遷移到其他的shards上。同樣,chunks也可以遷移到其他的shards上

Config Servers : Config服務器存儲着集群的metadata信息,包括每個服務器,每個shard的基本信息和chunk信息Config服務器主要存儲的是chunk信息。每一個config服務器都復制了完整的chunk信息。

 

一個完整的MongoDB集群應該包含多個shards,每個shard包含多個replSets互相做備份。單個數據有大小之分,被分配到不同的Chunk之中。當一個shard的存儲空間不夠時,會將Chunks分配到其他Shard上。這些信息被分開后都要有所記錄,這些記錄存儲在Congfig Servers上做查詢的依據。

 

從這里開始源碼研究的范疇變為mongos。

2.      代碼結構

shard.h

chunk.h

config.h

以上三個對應結構的類。

strategy.h        //分配調整策略基類

strategy_shard.cpp       //shard分配調整策略的實現

parallel.h        //並行傳遞消息,分片等操作的工具類

server.h         //啟動configserver的管理類

balance.h        //chunk分塊的策划者

connpool.h       //管理mongos中的各種連接

3.      各部分結構圖

a)      Chunk相關的類圖

 

Chunk類:存儲數據的起點、終點以及在哪一個shard上,實現了分割chunk,計算塊大小等方法。

ChunkRange類:記錄在同一個shard上的連續的chunk節點的起點和終點。

ChunkRangeManager類:存儲ChunkRange的集合。

ChunkManager類:存儲了當前shard上的所有的Chunk集合和一個ChunkRangeManager集合。

b)     Shrad相關的類

Shard類:記錄了shard的名稱、地址、大小等信息。

ShardStatus類:存儲了Shard對象,和服務器的一些信息。

StaticShradInfo類:內部類(單件),記錄了當前體系中的所有Shard 節點信息及狀態,接口供Shard對象調用。

c)      Config server相關的類

DBConfig:記錄各個Shard之間的連接狀態。

ConfigServer:繼承DBConfig

4.      Mongos server初始化

Mongos的入口在server.cpp文件中。

程序首先對各個參數進行分割存儲,之后執行函數runMongosServer。

static bool runMongosServer( bool doUpgrade ) {

 

//……

 

    // set some global state

 

    //添加連接回調

    pool.addHook( new ShardingConnectionHook( false ) );

    pool.setName( "mongos connectionpool" );

 

    shardConnectionPool.addHook( new ShardingConnectionHook( true ) );

    shardConnectionPool.setName( "mongos shardconnection connectionpool" );

 

    // Mongos shouldn't lazily kill cursors, otherwise we can end up with extras from migration

    DBClientConnection::setLazyKillCursor( false );

 

    ReplicaSetMonitor::setConfigChangeHook( boost::bind( &ConfigServer::replicaSetChange , &configServer , _1 ) );

 

    //初始化ConfigServer,將參數中的配置的config server添加到列表中

    if ( ! configServer.init( configdbs ) ) {

        log() << "couldn't resolve config db address" << endl;

        return false;

    }

 

    //檢查主shard地址,以及各個config server節點的連接狀況

    if ( ! configServer.ok( true ) ) {

        log() << "configServer connection startup check failed" << endl;

        return false;

    }

 

    //每60秒檢查檢查一回各節點連接情況

    {

        class CheckConfigServers : public task::Task {

            virtual string name() const { return "CheckConfigServers"; }

            virtual void doWork() { configServer.ok(true); }

        };

 

        task::repeat(new CheckConfigServers, 60*1000);

    }

 

    //檢查config sever的版本,下面詳細解釋

int configError = configServer.checkConfigVersion( doUpgrade );

 

//傳出的configError並不一定是代表錯誤,而是表示當前server的狀態

//總體說來,當configError為0的時候,需要將自己的作為configServer的管理者

//其他時候就直接退出了。一次mongos的工作到此結束。

    if ( configError ) {

        if ( configError > 0 ) {

            log() << "upgrade success!" << endl;

        }

        else {

            log() << "config server error: " << configError << endl;

        }

        return false;

}

    configServer.reloadSettings();

 

    //設置響應系統信號的函數

init();

 

#if !defined(_WIN32)

    CmdLine::launchOk();

#endif

 

    if ( !noHttpInterface )

        boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) );

 

    //啟動消息Server,監聽各端口socket連接

    MessageServer::Options opts;

    opts.port = cmdLine.port;

opts.ipList = cmdLine.bind_ip;

    start(opts);

 

    // listen() will return when exit code closes its socket.

    dbexit( EXIT_NET_ERROR );

    return true;

}

 

5.      檢查config server的版本(ConfigServer::checkConfigVersion)

int ConfigServer::checkConfigVersion( bool upgrade ) {

    //訪問shard服務器返回config version:其中0為初始化,1為已經連接上shard或者數據庫,2為自己需要更新,3表示環境中有主config server,且不需要更新

        int cur = dbConfigVersion();

        if ( cur == VERSION )

            return 0;

 

        //如正在初始化,即環境里沒有config server,則自己變為主config server,並通知shard

        if ( cur == 0 ) {

            scoped_ptr<ScopedDbConnection> conn(

                    ScopedDbConnection::getInternalScopedDbConnection( _primary.getConnString() ) );

 

            // If the cluster has not previously been initialized, we need to set the version before using so

            // subsequent mongoses use the config data the same way.  This requires all three config servers online

            // initially.

            try {

                conn->get()->insert( "config.version" , BSON( "_id" << 1 << "version" << VERSION ) );

            }

            catch( DBException& ){

                error() << "All config servers must initially be reachable for the cluster to be initialized." << endl;

                throw;

            }

 

            pool.flush();

            verify( VERSION == dbConfigVersion( conn->conn() ) );

            conn->done();

            return 0;

        }

 

        //需要更新

        if ( cur == 2 ) {

 

            // need to upgrade

            verify( VERSION == 3 );

            if ( ! upgrade ) {

                log() << "newer version of mongo meta data\n"

                      << "need to --upgrade after shutting all mongos down"

                      << endl;

                return -9;

            }

 

            scoped_ptr<ScopedDbConnection> connPtr(

                    ScopedDbConnection::getInternalScopedDbConnection( _primary.getConnString() ) );

            ScopedDbConnection& conn = *connPtr;

 

            // do a backup

            string backupName;

            {

                stringstream ss;

                ss << "config-backup-" << terseCurrentTime(false);

                backupName = ss.str();

            }

            log() << "backing up config to: " << backupName << endl;

            conn->copyDatabase( "config" , backupName );

 

            //......

            //更新shard,database和chunk

            conn->update( "config.version" , BSONObj() , BSON( "_id" << 1 << "version" << VERSION ) );

            conn.done();

            pool.flush();

            return 1;

        }

 

        log() << "don't know how to upgrade " << cur << " to " << VERSION << endl;

        return -8;

    }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM