1. 名詞解釋
Shards : 每一個shard包括一個或多個服務和存儲數據的mongod進程(mongod是MongoDB數據的核心進程)典型的每個shard開啟多個服務來提高服務的可用性。這些服務/mongod進程在shard中組成一個復制集
Chunks: Chunk是一個來自特殊集合中的一個數據范圍,(collection,minKey,maxKey)描敘一個chunk,它介於minKey和maxKey范圍之間。例如chunks 的maxsize大小是100M,如果一個文件達到或超過這個范圍時,會被切分到2個新的chunks中。當一個shard的數據過量時,chunks將會被遷移到其他的shards上。同樣,chunks也可以遷移到其他的shards上
Config Servers : Config服務器存儲着集群的metadata信息,包括每個服務器,每個shard的基本信息和chunk信息Config服務器主要存儲的是chunk信息。每一個config服務器都復制了完整的chunk信息。
一個完整的MongoDB集群應該包含多個shards,每個shard包含多個replSets互相做備份。單個數據有大小之分,被分配到不同的Chunk之中。當一個shard的存儲空間不夠時,會將Chunks分配到其他Shard上。這些信息被分開后都要有所記錄,這些記錄存儲在Congfig Servers上做查詢的依據。
從這里開始源碼研究的范疇變為mongos。
2. 代碼結構
shard.h chunk.h config.h 以上三個對應結構的類。 strategy.h //分配調整策略基類 strategy_shard.cpp //shard分配調整策略的實現 parallel.h //並行傳遞消息,分片等操作的工具類 server.h //啟動configserver的管理類 balance.h //chunk分塊的策划者 connpool.h //管理mongos中的各種連接
3. 各部分結構圖
a) Chunk相關的類圖
Chunk類:存儲數據的起點、終點以及在哪一個shard上,實現了分割chunk,計算塊大小等方法。
ChunkRange類:記錄在同一個shard上的連續的chunk節點的起點和終點。
ChunkRangeManager類:存儲ChunkRange的集合。
ChunkManager類:存儲了當前shard上的所有的Chunk集合和一個ChunkRangeManager集合。
b) Shrad相關的類
Shard類:記錄了shard的名稱、地址、大小等信息。
ShardStatus類:存儲了Shard對象,和服務器的一些信息。
StaticShradInfo類:內部類(單件),記錄了當前體系中的所有Shard 節點信息及狀態,接口供Shard對象調用。
c) Config server相關的類
DBConfig:記錄各個Shard之間的連接狀態。
ConfigServer:繼承DBConfig
4. Mongos server初始化
Mongos的入口在server.cpp文件中。
程序首先對各個參數進行分割存儲,之后執行函數runMongosServer。
static bool runMongosServer( bool doUpgrade ) { //…… // set some global state //添加連接回調 pool.addHook( new ShardingConnectionHook( false ) ); pool.setName( "mongos connectionpool" ); shardConnectionPool.addHook( new ShardingConnectionHook( true ) ); shardConnectionPool.setName( "mongos shardconnection connectionpool" ); // Mongos shouldn't lazily kill cursors, otherwise we can end up with extras from migration DBClientConnection::setLazyKillCursor( false ); ReplicaSetMonitor::setConfigChangeHook( boost::bind( &ConfigServer::replicaSetChange , &configServer , _1 ) ); //初始化ConfigServer,將參數中的配置的config server添加到列表中 if ( ! configServer.init( configdbs ) ) { log() << "couldn't resolve config db address" << endl; return false; } //檢查主shard地址,以及各個config server節點的連接狀況 if ( ! configServer.ok( true ) ) { log() << "configServer connection startup check failed" << endl; return false; } //每60秒檢查檢查一回各節點連接情況 { class CheckConfigServers : public task::Task { virtual string name() const { return "CheckConfigServers"; } virtual void doWork() { configServer.ok(true); } }; task::repeat(new CheckConfigServers, 60*1000); } //檢查config sever的版本,下面詳細解釋 int configError = configServer.checkConfigVersion( doUpgrade ); //傳出的configError並不一定是代表錯誤,而是表示當前server的狀態 //總體說來,當configError為0的時候,需要將自己的作為configServer的管理者 //其他時候就直接退出了。一次mongos的工作到此結束。 if ( configError ) { if ( configError > 0 ) { log() << "upgrade success!" << endl; } else { log() << "config server error: " << configError << endl; } return false; } configServer.reloadSettings(); //設置響應系統信號的函數 init(); #if !defined(_WIN32) CmdLine::launchOk(); #endif if ( !noHttpInterface ) boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) ); //啟動消息Server,監聽各端口socket連接 MessageServer::Options opts; opts.port = cmdLine.port; opts.ipList = cmdLine.bind_ip; start(opts); // listen() will return when exit code closes its socket. dbexit( EXIT_NET_ERROR ); return true; }
5. 檢查config server的版本(ConfigServer::checkConfigVersion)
int ConfigServer::checkConfigVersion( bool upgrade ) { //訪問shard服務器返回config version:其中0為初始化,1為已經連接上shard或者數據庫,2為自己需要更新,3表示環境中有主config server,且不需要更新 int cur = dbConfigVersion(); if ( cur == VERSION ) return 0; //如正在初始化,即環境里沒有config server,則自己變為主config server,並通知shard if ( cur == 0 ) { scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getInternalScopedDbConnection( _primary.getConnString() ) ); // If the cluster has not previously been initialized, we need to set the version before using so // subsequent mongoses use the config data the same way. This requires all three config servers online // initially. try { conn->get()->insert( "config.version" , BSON( "_id" << 1 << "version" << VERSION ) ); } catch( DBException& ){ error() << "All config servers must initially be reachable for the cluster to be initialized." << endl; throw; } pool.flush(); verify( VERSION == dbConfigVersion( conn->conn() ) ); conn->done(); return 0; } //需要更新 if ( cur == 2 ) { // need to upgrade verify( VERSION == 3 ); if ( ! upgrade ) { log() << "newer version of mongo meta data\n" << "need to --upgrade after shutting all mongos down" << endl; return -9; } scoped_ptr<ScopedDbConnection> connPtr( ScopedDbConnection::getInternalScopedDbConnection( _primary.getConnString() ) ); ScopedDbConnection& conn = *connPtr; // do a backup string backupName; { stringstream ss; ss << "config-backup-" << terseCurrentTime(false); backupName = ss.str(); } log() << "backing up config to: " << backupName << endl; conn->copyDatabase( "config" , backupName ); //...... //更新shard,database和chunk conn->update( "config.version" , BSONObj() , BSON( "_id" << 1 << "version" << VERSION ) ); conn.done(); pool.flush(); return 1; } log() << "don't know how to upgrade " << cur << " to " << VERSION << endl; return -8; }