FFLIB框架是為簡化分布式/多進程並發而生的。它起始於本人嘗試解決工作中經常遇到的問題如消息定義、異步、多線程、單元測試、性能優化等。基本介紹可以看這里:
http://www.cnblogs.com/zhiranok/archive/2012/07/30/fflib_framework.html
其中之所以特意采用了Broker模式,是吸收了MPI和Erlang的思想。
FFLIB 目前處於alpha階段,一些有用的功能還需繼續添加。但是FFLIB一開始就是為了解決實際問題而生。Broker 即可以以獨立進程運行,也可以集成到某個特定的進程中啟動。除了這些,FFLIB中使用epoll實現的網絡層也極具參考價值。網上有一些關於epoll ET 和 LT的討論,關於哪種方式更簡單,本人的答案是ET。ET模式下epoll 就是一個完全狀態機。開發者只需實現FD的read、write、error 三種狀態即可。
我進一步挖掘FFLIB的功能。寫一篇FFLIB的Tutorial。創建更多的FFLIB使用示例,以此來深入探討FFLIB的意義。在游戲開發中,或者一些分布式的環境中,有許多大家熟悉的模式。,本文挑選了如下作為FFLIB示例:
- Request/Reply
- 點對點通訊
- 阻塞通訊
- 多播通訊
- Map/Reduce
Request/Reply
異步的Request/Reply
在FFLIB中所有的消息都是Request和Reply一一對應的,默認情況下工作在異步模式。假設如下場景,Flash連入GatewayServer並發送Login消息包,GatewaServer 解析用戶名密碼,調用LoginServer 驗證。
首先定義msg:
struct user_login_t { struct in_t: public msg_i { in_t(): msg_i("user_login_t::in_t") {} string encode() { return (init_encoder() << uid << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> uid >> value; } long uid; string value; }; struct out_t: public msg_i { out_t(): msg_i("user_login_t::out_t") {} string encode() { return (init_encoder() << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> value; } bool value; }; };
LoginServer中如此定義接口:
class login_server_t { public: void verify(user_login_t::in_t& in_msg_, rpc_callcack_t<user_login_t::out_t>& cb_) { user_login_t::out_t out; out.value = true; cb_(out); } }; login_server_t login_server; singleton_t<msg_bus_t>::instance().create_service("login_server", 1) .bind_service(&login_server) .reg(&login_server_t::verify);
在GatewayServer中調用上面接口:
struct lambda_t { static void callback(user_login_t::out_t& msg_, socket_ptr_t socket_) { if (true == msg_.value) { //! socket_->send_msg("login ok"); } else { //! socket_->send_msg("login failed"); } } }; user_login_t::in_t in; in.uid = 520; in.value = "ILoveYou"; socket_ptr_t flash_socket = NULL;//! TODO singleton_t<msg_bus_t>::instance() .get_service_group("login_server_t") ->get_service(1) ->async_call(in, binder_t::callback(&lambda_t::callback, flash_socket));
如上所示, async_call 可以通過binder_t模板函數為回調函綁定參數。
同步的Request/Reply
大部分時候我們期望Reply被異步處理,但有時Reply 必須被首先處理后才能觸發后續操作,一般這種情況發生在程序初始化之時。假設如下場景,SceneServer啟動時必須從SuperServer中獲取配置,然后才能執行加載場景數據等后續初始化操作。
首先定義通信的msg:
struct config_t { struct in_t: public msg_i { in_t(): msg_i("config_t::in_t") {} string encode() { return (init_encoder() << server_type << server_id).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> server_type >> server_id; } int server_type; int server_id; }; struct out_t: public msg_i { out_t(): msg_i("config_t::out_t") {} string encode() { return (init_encoder() << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> value; } map<string, string> value; }; };
如上所示, msg 序列化自動支持map。
SuperServer 中定義返回配置的接口:
super_server_t super_server; singleton_t<msg_bus_t>::instance().create_service("super_server", 1) .bind_service(&super_server) .reg(&super_server_t::get_config);
SceneServer 可以如此實現同步Request/Reply:
rpc_future_t<config_t::out_t> rpc_future; config_t::in_t in; in.server_type = 1; in.server_id = 1; const config_t::out_t& out = rpc_future.call( singleton_t<msg_bus_t>::instance().get_service_group("super_server") ->get_service(1), in); cout << out.value.size() <<"\n"; //std::foreach(out.value.begin(), out.value.end(), fuctor_xx);
點對點通訊
異步Request/Reply 已經能夠解決大部分問題了,但是有時處理Push模式時稍顯吃了。我們知道消息推算有Push 和Poll兩種方式。了解二者:
http://blog.sina.com.cn/s/blog_6617106b0100hrm1.html
上面提到的Request/Reply 非常適合poll模式,以上一個獲取配置為例,SuperServer由於定義接口的時候只需知道callback,並不知道SceneServer的具體連接。,所以SuperServer不能向SceneServer Push消息。在FFLIB中並沒有限定某個節點必須是Client或只能是Service,實際上可以兼有二者的角色。SceneServer 也可以提供接口供SuperServer調用,這就符合了Push的語義。假設如下場景,GatewayServer需要在用戶登入時調用通知SessionServer,而某一時刻SessionServer也可能呢通知GatewayServer 強制某用戶下線。二者互為client和service。大家必須知道,在FFLIB中實現兩個節點的通信只需知道對方的服務名稱即可,Broker 在此時實現解耦的作用非常明顯,若要增加對其他節點的通信,只需通過服務名稱async_call即可。
定義通信的msg:
struct user_online_t { struct in_t: public msg_i { in_t(): msg_i("user_online_t::in_t") {} string encode() { return (init_encoder() << uid).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> uid; } long uid; }; struct out_t: public msg_i { out_t(): msg_i("user_online_t::out_t") {} string encode() { return (init_encoder() << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> value; } bool value; }; }; struct force_user_offline_t { struct in_t: public msg_i { in_t(): msg_i("force_user_offline_t::in_t") {} string encode() { return (init_encoder() << uid).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> uid; } long uid; }; struct out_t: public msg_i { out_t(): msg_i("force_user_offline_t::out_t") {} string encode() { return (init_encoder() << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> value; } bool value; }; };
GatewayServer 通知SessionServer 用戶上線,並提供強制用戶下線的接口:
class gateway_server_t { public: void force_user_offline(force_user_offline_t::in_t& in_msg_, rpc_callcack_t<force_user_offline_t::out_t>& cb_) { //! close user socket force_user_offline_t::out_t out; out.value = true; cb_(out); } }; gateway_server_t gateway_server; singleton_t<msg_bus_t>::instance().create_service("gateway_server", 1) .bind_service(&gateway_server) .reg(&gateway_server_t::force_user_offline); user_online_t::in_t in; in.uid = 520; singleton_t<msg_bus_t>::instance() .get_service_group("session_server") ->get_service(1) ->async_call(in, callback_TODO);
SessionServer 提供用戶上線接口,可能會調用GatewayServer 的接口強制用戶下線。
class session_server_t { public: void user_login(user_online_t::in_t& in_msg_, rpc_callcack_t<user_online_t::out_t>& cb_) { //! close user socket user_online_t::out_t out; out.value = true; cb_(out); } }; session_server_t session_server; singleton_t<msg_bus_t>::instance().create_service("session_server", 1) .bind_service(&session_server) .reg(&session_server_t::user_login); force_user_offline_t::in_t in; in.uid = 520; singleton_t<msg_bus_t>::instance() .get_service_group("gateway_server") ->get_service(1) ->async_call(in, callback_TODO);
多播通信
和點對點通信一樣,要實現多播,只需要知道目標的服務名稱。特別提一點的是,FFLIB中有服務組的概念。比如啟動了多個場景服務器SceneServer,除了數據不同,二者接口完全相同,有可能只是相同進程的不同實例。在FFLIB框架中把這些服務歸為一個服務組,然后再為每個實例分配索引id。
假設如下場景,SuperServer 中要實現一個GM接口,通知所有SceneServer 重新加載配置。
定義通信的msg:
struct reload_config_t { struct in_t: public msg_i { in_t(): msg_i("reload_config_t::in_t") {} string encode() { return (init_encoder()).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_); } }; struct out_t: public msg_i { out_t(): msg_i("reload_config_t::out_t") {} string encode() { return (init_encoder() << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> value; } bool value; }; };
SceneServer 提供重新載入配置接口:
class scene_server_t { public: void reload_config(reload_config_t::in_t& in_msg_, rpc_callcack_t<reload_config_t::out_t>& cb_) { //! close user socket reload_config_t::out_t out; out.value = true; cb_(out); } }; scene_server_t scene_server; singleton_t<msg_bus_t>::instance().create_service("scene_server", 1) .bind_service(&scene_server) .reg(&scene_server_t::reload_config);
在SuperServer 中如此實現多播(跟准確是廣播,大同小異):
struct lambda_t { static void reload_config(rpc_service_t* rs_) { reload_config_t::in_t in; rs_->async_call(in, callback_TODO); } }; singleton_t<msg_bus_t>::instance() .get_service_group("scene_server") ->foreach(&lambda_t::reload_config);
Map/Reduce
在游戲中使用Map/reduce 的情形並不多見,本人找到網上最常見的Map/reduce 實例 WordCount。情形如下:有一些文本字符串,統計每個字符出現的次數。
- Map操作,將文本分為多個子文本,分發給多個Worker 進程進行統計
- Reduce 操作,將多組worker 進程計算的結果匯總
- Worker:為文本統計各個字符出現的次數
定義通信消息:
struct word_count_t { struct in_t: public msg_i { in_t(): msg_i("word_count_t::in_t") {} string encode() { return (init_encoder() << str).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> str; } string str; }; struct out_t: public msg_i { out_t(): msg_i("word_count_t::out_t") {} string encode() { return (init_encoder() << value).get_buff(); } void decode(const string& src_buff_) { init_decoder(src_buff_) >> value; } map<char, int> value; }; };
定義woker的接口:
class worker_t { public: void word_count(word_count_t::in_t& in_msg_, rpc_callcack_t<word_count_t::out_t>& cb_) { //! close user socket word_count_t::out_t out; for (size_t i = 0; i < in_msg_.str.size(); ++i) { map<int, int>::iterator it = out.value.find(in_msg_.str[i]); if (it != out.value.end()) { it->second += 1; } else { out.value[in_msg_.str[i]] = 1; } } cb_(out); } }; worker_t worker; for (int i = 0; i < 5; ++i) { singleton_t<msg_bus_t>::instance().create_service("worker", 1) .bind_service(&worker) .reg(&worker_t::word_count); }
模擬Map/reduce 操作:
struct lambda_t { static void reduce(word_count_t::out_t& msg_, map<int, int>* result_, size_t* size_) { for (map<int, int>::iterator it = msg_.value.begin(); it != msg_.value.end(); ++it) { map<int, int>::iterator it2 = result_->find(it->first); if (it2 != result_->end()) { it2->second += it->second; } else { (*result_)[it->first] = it->second; } } if (-- size_ == 0) { //reduce end!!!!!!!!!!!!!!!! delete result_; delete size_; } } static void do_map(const char** p, size_t size_) { map<int, int>* result = new map<int, int>(); size_t* dest_size = new size_t(); *dest_size = size_; for (size_t i = 0; i < size_; ++i) { word_count_t::in_t in; in.str = p[i]; singleton_t<msg_bus_t>::instance() .get_service_group("worker") ->get_service(1 + i % singleton_t<msg_bus_t>::instance().get_service_group("worker")->size()) ->async_call(in, binder_t::callback(&lambda_t::reduce, result, dest_size)); } } }; const char* str_vec[] = {"oh nice", "oh fuck", "oh no", "oh dear", "oh wonderful", "oh bingo"}; lambda_t::do_map(str_vec, 6);
總結:
FFLIB 使進程間通信更容易
source code: https://ffown.googlecode.com/svn/trunk
示例代碼目錄:example/tutorial