ssdb,一個高性能的支持豐富數據結構的 NoSQL 數據庫, 用於替代 Redis。——這是其官網的自我介紹。
ssdb在leveldb存儲庫的基礎上進行改造和豐富,添加了類似redis操作的接口,實現了數據的高可用。所以ssdb是基於leveldb實現了redis功能的nosql數據庫,可以直接使用redis的客戶端訪問ssdb。
安裝
wget --no-check-certificate https://github.com/ideawu/ssdb/archive/master.zip unzip master cd ssdb-master make # optional, install ssdb in /usr/local/ssdb sudo make install
啟動
# start master ./ssdb-server ssdb.conf # or start as daemon ./ssdb-server -d ssdb.conf
本文主要是基於ssdb的高可用功能,對照源碼(版本:1.9.2)進行簡要分析。好,進入主題。
入口代碼,在ssdb-server.cpp文件中:
1 int main(int argc, char **argv){ 2 MyApplication app; 3 return app.main(argc, argv); 4 } 5 6 7 void MyApplication::run(){ 8 Options option; 9 option.load(*conf); 10 11 std::string data_db_dir = app_args.work_dir + "/data"; // 這個db是存放用戶數據的 12 std::string meta_db_dir = app_args.work_dir + "/meta"; // 這個db是ssdb內部使用的,存放一些屬性數據,為同步數據提供服務 13 14 // 省略日志 15 16 SSDB *data_db = NULL; 17 SSDB *meta_db = NULL; 18 data_db = SSDB::open(option, data_db_dir); 19 if(!data_db){ 20 log_fatal("could not open data db: %s", data_db_dir.c_str()); 21 fprintf(stderr, "could not open data db: %s\n", data_db_dir.c_str()); 22 exit(1); 23 } 24 25 meta_db = SSDB::open(Options(), meta_db_dir); 26 if(!meta_db){ 27 log_fatal("could not open meta db: %s", meta_db_dir.c_str()); 28 fprintf(stderr, "could not open meta db: %s\n", meta_db_dir.c_str()); 29 exit(1); 30 } 31 // 由此可見ssdb會打開兩個db,一個名叫data,另一個名叫meta 32 33 34 NetworkServer *net = NULL; 35 SSDBServer *server; 36 net = NetworkServer::init(*conf); 37 server = new SSDBServer(data_db, meta_db, *conf, net); 38 39 log_info("pidfile: %s, pid: %d", app_args.pidfile.c_str(), (int)getpid()); 40 log_info("ssdb server started."); 41 net->serve(); 42 43 delete net; 44 delete server; 45 delete meta_db; 46 delete data_db; 47 48 log_info("%s exit.", APP_NAME); 49 }
ssdb的所有數據先通過Binlog進行封裝,再通過Binlog操作leveldb引擎,而Binlog的操作又通過BinlogQueue管理。先看看ssdb的binlog數據格式:
Binlog里面只保存key,沒有value,數據格式為: head + key,其中head的組成是(uint64_t)seq + (char)type + (char)cmd。
//binlog.h class Binlog{ private:
// 存儲數據的buf,Binlog里面只保存key,沒有value,數據格式為: head + key
std::string buf;
// Binlog數據由頭部和body組成。頭部包含uint64_t類型的seq和一個字符的數據同步類型以及一個字節的cmd號。即head:(uint64_t)seq + (char)type + (char)cmd
static const unsigned int HEADER_LEN = sizeof(uint64_t) + 2;
public: Binlog(){} Binlog(uint64_t seq, char type, char cmd, const leveldb::Slice &key); // 看構造傳入的參數 int load(const Bytes &s); // 從Bytes加載數據 int load(const leveldb::Slice &s); // 從Slice加載數據 int load(const std::string &s); // 從string加載數據 uint64_t seq() const; // 從Binlog字節流里面取seq char type() const; // 從Binlog字節流里面取type char cmd() const; // 從Binlog字節流里面取cmd const Bytes key() const; // 從Binlog字節流里面取key const char* data() const{ return buf.data(); } int size() const{ return (int)buf.size(); } const std::string repr() const{ // 取內容 return this->buf; } std::string dumps() const; // 格式化成可顯示的字符串 }; //binlog.cpp Binlog::Binlog(uint64_t seq, char type, char cmd, const leveldb::Slice &key){ buf.append((char *)(&seq), sizeof(uint64_t)); buf.push_back(type); buf.push_back(cmd); buf.append(key.data(), key.size()); } uint64_t Binlog::seq() const{ return *((uint64_t *)(buf.data())); } char Binlog::type() const{ return buf[sizeof(uint64_t)]; } char Binlog::cmd() const{ return buf[sizeof(uint64_t) + 1]; } const Bytes Binlog::key() const{ return Bytes(buf.data() + HEADER_LEN, buf.size() - HEADER_LEN); } int Binlog::load(const Bytes &s){ if(s.size() < HEADER_LEN){ return -1; } buf.assign(s.data(), s.size()); return 0; } int Binlog::load(const leveldb::Slice &s){ if(s.size() < HEADER_LEN){ return -1; } buf.assign(s.data(), s.size()); return 0; } int Binlog::load(const std::string &s){ if(s.size() < HEADER_LEN){ return -1; } buf.assign(s.data(), s.size()); return 0; } std::string Binlog::dumps() const{ std::string str; if(buf.size() < HEADER_LEN){ return str; } char buf[20]; snprintf(buf, sizeof(buf), "%" PRIu64 " ", this->seq()); str.append(buf); switch(this->type()){ case BinlogType::NOOP: str.append("noop "); break; case BinlogType::SYNC: str.append("sync "); break; case BinlogType::MIRROR: str.append("mirror "); break; case BinlogType::COPY: str.append("copy "); break; } switch(this->cmd()){ case BinlogCommand::NONE: str.append("none "); break; case BinlogCommand::KSET: str.append("set "); break; case BinlogCommand::KDEL: str.append("del "); break; case BinlogCommand::HSET: str.append("hset "); break; case BinlogCommand::HDEL: str.append("hdel "); break; case BinlogCommand::ZSET: str.append("zset "); break; case BinlogCommand::ZDEL: str.append("zdel "); break; case BinlogCommand::BEGIN: str.append("begin "); break; case BinlogCommand::END: str.append("end "); break; case BinlogCommand::QPUSH_BACK: str.append("qpush_back "); break; case BinlogCommand::QPUSH_FRONT: str.append("qpush_front "); break; case BinlogCommand::QPOP_BACK: str.append("qpop_back "); break; case BinlogCommand::QPOP_FRONT: str.append("qpop_front "); case BinlogCommand::QSET: str.append("qset "); break; } Bytes b = this->key(); str.append(hexmem(b.data(), b.size())); return str; }
再看看數據是如何寫入到db的,主要看BinlogQueue,BinlogQueue是一個循環隊列,封裝了對數據的操作接口,上層業務調用BinlogQueue提供的接口操作db。需要注意的是,在數據同步時,其管理db中最新的一段數據,充當一個緩沖區的作用,如果同步的seq在[min_seq, last_seq]范圍內,就執行SYNC,否則執行COPY。先看BinlogQueue代碼:
// circular queue class BinlogQueue{ private: #ifdef NDEBUG static const int LOG_QUEUE_SIZE = 20 * 1000 * 1000; #else static const int LOG_QUEUE_SIZE = 10000; #endif leveldb::DB *db; // 這個就是要操作的存儲數據的db uint64_t min_seq; // 隊列中的最小seq,用於同步數據時標識起點。注意該seq不是db中的最小seq。 uint64_t last_seq; // db中最新seq,標識當前數據的最大seq uint64_t tran_seq; int capacity; leveldb::WriteBatch batch; // leveldb的批量寫 volatile bool thread_quit; static void* log_clean_thread_func(void *arg); // 定量清理log的線程函數,在獨立線程維護清理工作 int del(uint64_t seq); // 按seq刪除數據 // [start, end] includesive int del_range(uint64_t start, uint64_t end); // 按seq的范圍刪除數據 void merge(); bool enabled; public: Mutex mutex; BinlogQueue(leveldb::DB *db, bool enabled=true); ~BinlogQueue(); void begin(); void rollback(); leveldb::Status commit(); // 將batch里面的數據寫到db // leveldb put void Put(const leveldb::Slice& key, const leveldb::Slice& value); // 寫數據提交到batch // leveldb delete void Delete(const leveldb::Slice& key); // 刪數據提交到batch void add_log(char type, char cmd, const leveldb::Slice &key); // 添加一行Binlog日志到batch void add_log(char type, char cmd, const std::string &key); // 添加一行Binlog日志到batch int get(uint64_t seq, Binlog *log) const; // 從db獲取一條Binlog日志 int update(uint64_t seq, char type, char cmd, const std::string &key);// 直接操作db,寫一條Binlog日志到db void flush(); // 清除db中當前管理的Binlog日志 /** @returns 1 : log.seq greater than or equal to seq 0 : not found -1: error */ int find_next(uint64_t seq, Binlog *log) const; // 根據seq查找下一條Binlog日志 int find_last(Binlog *log) const; // 查找最新的Binlog日志 std::string stats() const; // 合並Binlog日志 }; BinlogQueue::BinlogQueue(leveldb::DB *db, bool enabled){ this->db = db; this->min_seq = 0; this->last_seq = 0; this->tran_seq = 0; this->capacity = LOG_QUEUE_SIZE; this->enabled = enabled; // 下面的邏輯可以表明min_seq並非db中的最小seq,而只是隊列范圍中的最小seq Binlog log; if(this->find_last(&log) == 1){ this->last_seq = log.seq(); } if(this->last_seq > LOG_QUEUE_SIZE){ this->min_seq = this->last_seq - LOG_QUEUE_SIZE; }else{ this->min_seq = 0; } // TODO: use binary search to find out min_seq if(this->find_next(this->min_seq, &log) == 1){ this->min_seq = log.seq(); } if(this->enabled){ log_info("binlogs capacity: %d, min: %" PRIu64 ", max: %" PRIu64 ",", capacity, min_seq, last_seq); } // 啟動清理Binlog的線程,定量清理Binlog if(this->enabled){ thread_quit = false; pthread_t tid; int err = pthread_create(&tid, NULL, &BinlogQueue::log_clean_thread_func, this); if(err != 0){ log_fatal("can't create thread: %s", strerror(err)); exit(0); } } }
上述的BinlogQueue只保存LOG_QUEUE_SIZE條記錄的范圍,若同步的seq在此范圍內,就執行SYNC,否則執行COPY。
SSDB 的主從同步策略非常簡單, 就是把主(Master)上的所有寫操作(Binlogs), 在從(Slave)上再執行一遍.
MySQL 的主從同步也是一樣. 而多主可以理解為互為主從.
把 Master 上的所有操作(Binlogs)在 Slave 上執行一遍, 說來很簡單, 但還是會遇到一些難題,
例如 Binlogs 不可能無限地永久保留. SSDB 只保留最新的 1000 萬次寫操作.
對於熟悉 MySQL 的同學可能也知道這樣的例子: 在有 Binlogs 之前, 數據庫內已經有了一部分數據.
也就是說, 這部分數據是無法通過 Binlog 來獲得的.
SSDB 數據庫中的所有數據都是排好序的, 所以你可以把整個數據庫理解為一個鏈表,
SSDB 從表頭開始 Copy, 一次一個節點, 游標一直往后. 這時, 如果有新的 Binlog,
SSDB 會先判斷這條 Binlog 對應的節點在鏈表中的什么位置, 是在游標的前面還是后面?
如果在游標的前面, 那么會把這條 Binlog 發給 Slave 執行.
如果在游標的后面, 就會直接忽略掉, 因為游標最終會移動到更新的位置.
從這個描述也可以知道, 處於 Copy 階段的 Slave, 有可能無法立即知道 Master 上的更新.
當游標移動到了鏈表的末端之后, Copy 過程就結束了, 主從同步流程進入到 Sync 階段, 也就是即時(毫秒級)更新階段.
__引自 http://www.ideawu.net/blog/archives/849.html
master接收slave連接時,master啟動線程處理同步邏輯。看看master的BackendSync行為:
void* BackendSync::_run_thread(void *arg){ pthread_detach(pthread_self()); struct run_arg *p = (struct run_arg*)arg; BackendSync *backend = (BackendSync *)p->backend; Link *link = (Link *)p->link; delete p; // set Link non block link->noblock(false); SSDBImpl *ssdb = (SSDBImpl *)backend->ssdb; BinlogQueue *logs = ssdb->binlogs; Client client(backend); client.link = link; client.init(); { pthread_t tid = pthread_self(); Locking l(&backend->mutex); backend->workers[tid] = &client; } // sleep longer to reduce logs.find #define TICK_INTERVAL_MS 300 #define NOOP_IDLES (3000/TICK_INTERVAL_MS) int idle = 0; while(!backend->thread_quit){ if(client.status == Client::OUT_OF_SYNC){ client.reset(); continue; } bool is_empty = true;
// 先調sync,sync里面會更新last_seq。而且sync里面會判斷,如果是COPY階段,新的Binlog操作的key在COPY的key之后,就忽略本條Binlog
// 等待COPY操作順序復制key-value if(client.sync(logs)){ is_empty = false; }
// 如果是COPY階段,就執行copy操作,按key范圍順序拷貝 if(client.status == Client::COPY){ if(client.copy()){ is_empty = false; } } if(is_empty){ if(idle >= NOOP_IDLES){ idle = 0; client.noop(); }else{ idle ++; usleep(TICK_INTERVAL_MS * 1000); } }else{ idle = 0; } float data_size_mb = link->output->size() / 1024.0 / 1024.0; if(link->flush() == -1){ log_info("%s:%d fd: %d, send error: %s", link->remote_ip, link->remote_port, link->fd(), strerror(errno)); break; } if(backend->sync_speed > 0){ usleep((data_size_mb / backend->sync_speed) * 1000 * 1000); } } log_info("Sync Client quit, %s:%d fd: %d, delete link", link->remote_ip, link->remote_port, link->fd()); delete link; Locking l(&backend->mutex); backend->workers.erase(pthread_self()); return (void *)NULL; } // 初始化操作里面根據slave發過來的seq和key判斷執行的狀態
// 首次連上來的slave,傳過來的last_key和last_seq都沒有值,進入COPY狀態;
// 如果last_key為空,last_seq不為空,則表明是重連,進入SYNC狀態;
// 否則為COPY狀態。 void BackendSync::Client::init(){ const std::vector<Bytes> *req = this->link->last_recv(); last_seq = 0; if(req->size() > 1){ last_seq = req->at(1).Uint64(); } last_key = ""; if(req->size() > 2){ last_key = req->at(2).String(); } // is_mirror if(req->size() > 3){ if(req->at(3).String() == "mirror"){ is_mirror = true; } } const char *type = is_mirror? "mirror" : "sync"; // a slave must reset its last_key when receiving 'copy_end' command if(last_key == "" && last_seq != 0){ log_info("[%s] %s:%d fd: %d, sync recover, seq: %" PRIu64 ", key: '%s'", type, link->remote_ip, link->remote_port, link->fd(), last_seq, hexmem(last_key.data(), last_key.size()).c_str() ); this->status = Client::SYNC; Binlog log(this->last_seq, BinlogType::COPY, BinlogCommand::END, ""); log_trace("fd: %d, %s", link->fd(), log.dumps().c_str()); link->send(log.repr(), "copy_end"); }else if(last_key == "" && last_seq == 0){ log_info("[%s] %s:%d fd: %d, copy begin, seq: %" PRIu64 ", key: '%s'", type, link->remote_ip, link->remote_port, link->fd(), last_seq, hexmem(last_key.data(), last_key.size()).c_str() ); this->reset(); }else{ log_info("[%s] %s:%d fd: %d, copy recover, seq: %" PRIu64 ", key: '%s'", type, link->remote_ip, link->remote_port, link->fd(), last_seq, hexmem(last_key.data(), last_key.size()).c_str() ); this->status = Client::COPY; } log_debug("==last seq:%lu, last_key:%s, type:%s, status:%u", last_seq , hexmem(last_key.data(), last_key.size()).c_str(), type, this->status); } // 根據迭代器將當前的數據拷貝到slave int BackendSync::Client::copy(){ if(this->iter == NULL){ log_info("new iterator, last_key: '%s'", hexmem(last_key.data(), last_key.size()).c_str()); std::string key = this->last_key; if(this->last_key.empty()){ key.push_back(DataType::MIN_PREFIX); } this->iter = backend->ssdb->iterator(key, "", -1); log_info("iterator created, last_key: '%s'", hexmem(last_key.data(), last_key.size()).c_str()); } int ret = 0; int iterate_count = 0; int64_t stime = time_ms(); while(true){ // Prevent copy() from blocking too long if(++iterate_count > 1000 || link->output->size() > 2 * 1024 * 1024){ break; } if(time_ms() - stime > 3000){ log_info("copy blocks too long, flush"); break; } if(!iter->next()){ goto copy_end; } Bytes key = iter->key(); if(key.size() == 0){ continue; } // finish copying all valid data types if(key.data()[0] > DataType::MAX_PREFIX){ goto copy_end; } Bytes val = iter->val(); this->last_key = key.String(); char cmd = 0; char data_type = key.data()[0]; if(data_type == DataType::KV){ cmd = BinlogCommand::KSET; }else if(data_type == DataType::HASH){ cmd = BinlogCommand::HSET; }else if(data_type == DataType::ZSET){ cmd = BinlogCommand::ZSET; }else if(data_type == DataType::QUEUE){ cmd = BinlogCommand::QPUSH_BACK; }else{ continue; } ret = 1; Binlog log(this->last_seq, BinlogType::COPY, cmd, slice(key)); log_trace("fd: %d, %s", link->fd(), log.dumps().c_str()); link->send(log.repr(), val); } return ret; copy_end: log_info("%s:%d fd: %d, copy end", link->remote_ip, link->remote_port, link->fd()); this->status = Client::SYNC; delete this->iter; this->iter = NULL; Binlog log(this->last_seq, BinlogType::COPY, BinlogCommand::END, ""); log_trace("fd: %d, %s", link->fd(), log.dumps().c_str()); link->send(log.repr(), "copy_end"); return 1; } int BackendSync::Client::sync(BinlogQueue *logs){ Binlog log; // 從BinlogQueue取一條數據進行同步 while(1){ int ret = 0; uint64_t expect_seq = this->last_seq + 1; if(this->status == Client::COPY && this->last_seq == 0){ ret = logs->find_last(&log); }else{ ret = logs->find_next(expect_seq, &log); } if(ret == 0){ return 0; } if(this->status == Client::COPY && log.key() > this->last_key){ log_debug("fd: %d, last_key: '%s', drop: %s", link->fd(), hexmem(this->last_key.data(), this->last_key.size()).c_str(), log.dumps().c_str()); this->last_seq = log.seq(); // WARN: When there are writes behind last_key, we MUST create // a new iterator, because iterator will not know this key. // Because iterator ONLY iterates throught keys written before // iterator is created. if(this->iter){ delete this->iter; this->iter = NULL; } continue; } if(this->last_seq != 0 && log.seq() != expect_seq){ log_warn("%s:%d fd: %d, OUT_OF_SYNC! log.seq: %" PRIu64 ", expect_seq: %" PRIu64 "", link->remote_ip, link->remote_port, link->fd(), log.seq(), expect_seq ); this->status = Client::OUT_OF_SYNC; return 1; } // update last_seq log_debug("==last seq:%lu. now seq:%lu", this->last_seq, log.seq()); this->last_seq = log.seq(); char type = log.type(); if(type == BinlogType::MIRROR && this->is_mirror){ if(this->last_seq - this->last_noop_seq >= 1000){ this->noop(); return 1; }else{ continue; } } break; } // 根據數據類型添加數據到待發送緩沖區 int ret = 0; std::string val; switch(log.cmd()){ case BinlogCommand::KSET: case BinlogCommand::HSET: case BinlogCommand::ZSET: case BinlogCommand::QSET: case BinlogCommand::QPUSH_BACK: case BinlogCommand::QPUSH_FRONT: ret = backend->ssdb->raw_get(log.key(), &val); if(ret == -1){ log_error("fd: %d, raw_get error!", link->fd()); }else if(ret == 0){ //log_debug("%s", hexmem(log.key().data(), log.key().size()).c_str()); log_trace("fd: %d, skip not found: %s", link->fd(), log.dumps().c_str()); }else{ log_trace("fd: %d, %s", link->fd(), log.dumps().c_str()); link->send(log.repr(), val); } break; case BinlogCommand::KDEL: case BinlogCommand::HDEL: case BinlogCommand::ZDEL: case BinlogCommand::QPOP_BACK: case BinlogCommand::QPOP_FRONT: log_trace("fd: %d, %s", link->fd(), log.dumps().c_str()); link->send(log.repr()); break; } return 1; }
對於slave發送過來的sync140請求,master啟動單獨線程BackendSync,進入數據同步邏輯。該線程在slave和master連接期間一直存活。slave何時發送sync140?在啟動的時候。邏輯如下:
// serv.cpp SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer *net){ this->ssdb = (SSDBImpl *)ssdb; this->meta = meta; net->data = this; this->reg_procs(net); int sync_speed = conf.get_num("replication.sync_speed"); backend_dump = new BackendDump(this->ssdb); backend_sync = new BackendSync(this->ssdb, sync_speed); expiration = new ExpirationHandler(this->ssdb); cluster = new Cluster(this->ssdb); if(cluster->init() == -1){ log_fatal("cluster init failed!"); exit(1); } { // slaves const Config *repl_conf = conf.get("replication"); if(repl_conf != NULL){ std::vector<Config *> children = repl_conf->children; for(std::vector<Config *>::iterator it = children.begin(); it != children.end(); it++){ Config *c = *it; if(c->key != "slaveof"){ continue; } std::string ip = c->get_str("ip"); int port = c->get_num("port"); if(ip == "" || port <= 0 || port > 65535){ continue; } bool is_mirror = false; std::string type = c->get_str("type"); if(type == "mirror"){ is_mirror = true; }else{ type = "sync"; is_mirror = false; } std::string id = c->get_str("id"); log_info("slaveof: %s:%d, type: %s", ip.c_str(), port, type.c_str()); Slave *slave = new Slave(ssdb, meta, ip.c_str(), port, is_mirror); if(!id.empty()){ slave->set_id(id); } slave->auth = c->get_str("auth"); // start里面會啟動線程,執行Slave::_run_thread。連接master之后就發送sync140請求。並在此線程保存last_key和last_seq。 slave->start(); slaves.push_back(slave); } } } // load kv_range int ret = this->get_kv_range(&this->kv_range_s, &this->kv_range_e); if(ret == -1){ log_fatal("load key_range failed!"); exit(1); } log_info("key_range.kv: \"%s\", \"%s\"", str_escape(this->kv_range_s).c_str(), str_escape(this->kv_range_e).c_str() ); } // slave.cpp void* Slave::_run_thread(void *arg){ Slave *slave = (Slave *)arg; const std::vector<Bytes> *req; Fdevents select; const Fdevents::events_t *events; int idle = 0; bool reconnect = false; #define RECV_TIMEOUT 200 #define MAX_RECV_TIMEOUT 300 * 1000 #define MAX_RECV_IDLE MAX_RECV_TIMEOUT/RECV_TIMEOUT while(!slave->thread_quit){ if(reconnect){ slave->status = DISCONNECTED; reconnect = false; select.del(slave->link->fd()); delete slave->link; slave->link = NULL; sleep(1); } if(!slave->connected()){ if(slave->connect() != 1){ usleep(100 * 1000); }else{ select.set(slave->link->fd(), FDEVENT_IN, 0, NULL); } continue; } events = select.wait(RECV_TIMEOUT); if(events == NULL){ log_error("events.wait error: %s", strerror(errno)); sleep(1); continue; }else if(events->empty()){ if(idle++ >= MAX_RECV_IDLE){ log_error("the master hasn't responsed for awhile, reconnect..."); idle = 0; reconnect = true; } continue; } idle = 0; if(slave->link->read() <= 0){ log_error("link.read error: %s, reconnecting to master", strerror(errno)); reconnect = true; continue; } while(1){ req = slave->link->recv(); if(req == NULL){ log_error("link.recv error: %s, reconnecting to master", strerror(errno)); reconnect = true; break; }else if(req->empty()){ break; }else if(req->at(0) == "noauth"){ log_error("authentication required"); reconnect = true; sleep(1); break; }else{ if(slave->proc(*req) == -1){ goto err; } } } } // end while log_info("Slave thread quit"); return (void *)NULL; err: log_fatal("Slave thread exit unexpectedly"); exit(0); return (void *)NULL;; }