什么是nmq
消息中間件是一個完備的、易於使用的消息隊列系統,替代現有cm/transfer所有的功能,力求解決當前社區提交系統難運維、不通用等弊病,提供一個全流程支持、功能完善、性能可擴展、運維方便、可靠的消息隊列及整套提交系統解決方案。 開發代號是NMQ。
背景
長期以來,社區幾大產品線(貼吧、空間、知道、ks等)都獨立維護着自己的提交系統。雖然產品邏輯和規模不同,但從實現上來講,面對的問題相近,解決的思 路相似,維護重復、運維分散、人力浪費。為避免重復工作和促進提交集群收斂,我們希望完成一個大社區范疇下消息中間件的統一解決方案。
和cm/transfer區別
-
nmq是一套包含服務、運維平台的整體解決方案。
-
模型上,nmq 推的模型和cm/transfer一樣。topic等同於cm,pusher等同於transfer。
-
除了推模式之外,nmq還支持拉的模型。
-
nmq自身支持提交消息的按產品線、按業務、按性能拆分。cm里用於標示命令號的cmd_no被擴展為product、topic、cmd。可以按product、topic將產品線和業務進行拆分。如果單機性能出現瓶頸,可以將同一個topic拆 分為不同的子topic。
-
nmq不分配自增id;nmq沒有和業務相關的字段填充和字段檢查的功能。
-
nmq自身支持發送消息時的並發、時序。
-
專門為nmq設計開發的mqp運維平台讓運維上更方便。
-
pusher 支持transfer的延時發送功能,只需簡單配置 sending_delay_time
名詞解釋
topic:按業務划分的消息序列,邏輯上的概念。產品線可以根據自己的業務特點划分,可大 可小。一般情況下,大產品線按各個子業務划分為不同的topic(例如:空間可以划分為博客、相冊、用戶等topic);小產品線可以整體作為一個 topic,如果業務復雜也可以划分為不同的topic。
子topic: 按性能划分的消息序列,實體上的概念。topic可以按partition-key的自定義規則,拆分為若干個子topic,每個子topic內的消息是嚴格保證時序的。
partition-key:消息划分或歸屬的依據,用於topic拆分子topic、競爭 消費時保證時序、多IDC時主IDC的歸屬。消息需自己指定partition-key,且只能有一個partition-key。如果不拆分子 topic或不關心時序,可以不指定partition-key。
競爭模式:同一個模塊部署很多機器,但所有機器競爭消費同一份消息,比如c模塊+mysql 主從模式下,c模塊多機部署,但由於不同機器上的c模塊都更新同一個主庫,因此每條消息只需要被一個機器上的模塊消費即可,不需要每條消息都分發給所有機 器;競爭模式,為以后的主流更新模式。
多主模式:同競爭模式不同,同一個模塊的各個機器,都需要消費全量消息;一般模塊內維護全量數據副本的場景下使用;大產品線的專有實現模塊,一般是這種模式。
架構圖和數據流圖
上游模塊向nmqproxy發送消息,消息經過消息中間件的nmqproxy轉發到對應的topic server模塊,topicserver將消息序列化到磁盤上,pusher掃描並讀取到最新的消息,然后發送給下游模塊。 除了主流的推模型外,nmq也支持“拉”模式,在拉模式下,模塊所有的信息都保存在模塊本地,模塊通過nmq提供的pulllib同nmq保持心跳。
如何提交消息
1.接入
1.1怎么接入nmq
產品線之前接入cm/transfer有2種方式:
c/c++通過nshead+mcpack、php通過ral等方式。 產品線接入nmq與接入cm/transfer一般沒有太大變化,一般來說只需要加入以下字段:
-
_product(產品線名稱,7個字節以內)
-
_topic(產品線業務名稱,7個字節以內)
-
_cmd(需要做的事,與之前cmd_no含義類似,只是_cmd為string,例如"addblog"等)
1.2注意項
nmq不再具有cm的id分配功能。解決方法,產品線調idalloc(arch或者space的idalloc)模塊進行id分配,然后再提交到nmq;因此接入前先判斷是否需要id分配功能。
下游轉發不支持mysql.so和comdb.so。因此如果之前的業務是transfer下游直接是mysql或者comdb的話,需要產品線自行開發下游c或者php模塊,實現更新db和comdb功能。
2.提交請求和響應
請求
向nmqproxy提交消息,使用nshead+mcpack2協議,同時兼任mcpack1格式
//請求包
{
_product: "space",
_topic: "album",
_cmd: "album_add",
_partition_key: "10001", //可選
_msg_unique_key:12333234, //可選
// 消息自定義的內容
}
響應
響應包中含_error_no和_error_msg字段,表明發送是否成功。注意,響應包中不含消息的原有字段和nmqproxy填充的字段。
//響應包
{
_error_no: (int32)0,
_error_msg: "OK",
}
3.字段和規范
字段名約束:一個下划線開頭的字段為保留字段。應用不能使用一個下划線開頭的字段名。
字段值約束:只能使用大小寫字母、數字、和下划線。
必須填的字段:_product、_topic、_cmd。
長度限制(含'\0'):product字段8個字符,topic字段8個字符,cmd字段16個字符。
-
product是產品線的唯一標示。
-
topic是業務划分的消息序列的表示,詳見"名詞解釋"。
-
cmd是用有意義的字符串表示的命令號。
-
可選填的字段:_idc。長度限制(含'\0').product字段8個字符
【二期才支持】idc是機房的標示,用於多IDC提交。
可選填的字段:_partition_key,_msg_unique_key。長度無限制。
partition_key用於拆分子topic、多IDC轉發,一般使用用戶id、吧id作為partition_key。
【暫不支持】msg_unique_key用於提交去重,建議使用隨機性和區分度非常好的uuid。
4.配置實例
1.nmqproxy的ip是用資源定位發布的,因此需要上游模塊配置上nmqproxy在zk的服務器和路徑。
注:整個集群公用一組nmqproxy,所以使用的nmqproxy的資源定位是一樣。
2.在nmqproxy中配置本應用的消息的路由信息。
修改nmqproxy/conf/nmqproxy.conf文件,在[topics]中增加:
#每個topic的配置 [.@topic] #產品線名稱 product : tieba #topic名稱 topic : post #是否啟用,1或0。可選,默認為1 enable : 1 #是否走多IDC提交流程,1或0。可選,默認為0 multi_idc : 0 #拆分子topic的個數。可選,默認為1,表示不拆分 sub_topic_num : 1 #topic server的url,名稱需要和ubclient配置里面的名稱的一致,拆分時,自動在后面補0/1… topic_server_name_prefix : topic-tieba-post-commit pusher_server_name_prefix : pusher-tieba-post
product和topic是和提交消息里的字段保持一致。
最后的topic_server_name_prefix是topic_group的名稱,需要和nmqproxy/conf/ubclient.conf保持一致的。
3.topic server的配置不需要修改
如何接收消息
一個后端模塊,想從nmq接受消息,對rd來說,只需要提供該模塊的2個配置文件即可。
建議該模塊的模塊命名為$(product)_$(module),那么需要下面2個配置文件:
-
module_$(product)_$(module).conf
-
machine_$(product)_$(module).conf
模塊文件命名規范: 以下游名字來命名,比如下游叫abc模塊,如果abc模塊只用了某個product的數據,建議名為module_$(product)_abc.conf.如果abc是個很大的系統,用到很多模塊的數據,允許module_abc.conf
module_$(product)_$(module).conf主要配置該模塊的模塊級配置,包括發送協議、接受的消息類型、並發時序控制、字段復制等等。
machine_$(product)_$(module).conf主要配置該模塊的后端機器具體訪問設置,包括ip、端口、超時等配置。
目前支持本地配置,也支持資源定位(zk/webfoot),該配置文件,同ubclient的配置文件基本一致,但在針對多主模塊的配置時,針對每個機器增加了identifer和flag字段。另外多主模式的模塊,不支持資源定位方式。
pusher配置范例
module_$(product)_$(module).conf + 競爭模式+ mcpack協議
下面是pusher的下游是c模塊,mcpack接收的范例配置, 參考space_follow點擊下載
[modules] [.@module] #模塊名,需要跟ubclient配置里面的名稱一致; name : space_follow #是否啟用 1啟用 0停用 默認0 flag : 1 #是競爭模式還是多主模式;0:競爭,1:多主 sending_type : 0 #轉發協議配置; sending_protocol : mcpack #發送窗口大小,需要比線程數大; sending_window_size : 10 #發送線程數。競爭模式為競爭線程數,多主模式下,為每個機器的並發線程數 sending_thread_num : 8 #出錯后重試的時間間隔(單位MS) sending_retry_time: 500 #命令過濾規則 ,接受的命令配置; # *表示通配,只能出現在字符串的最后,不能出現在中間 [..msg_filter] @filter:space.follow.* #時序控制規則,支持按照某個partition key的時序支持 [..sequence_control] #制定mutex key,必須是整形數類型的字段; mutex_key : qid #指定對於 沒有PK的命令,是否強制串行處理; #1:是:該命令必須等待所有靠前的命令都執行完成,再執行。該命令執行前,靠后的命令也不能執行。(主要用於存在批量命令時,保證時序)。 #0:否:該命令完全無序£??到了就執行。 force_sequence_when_no_mutex_key : 1 #支持任意模塊自定義配置、so自定義配置 [..ext_config] [..mcpack] #發送模式,0:nshead+mcpack, 1:ubrpc send_type : 0 #mcpack字段復制 #在消息中間件中,內部填充字段都是以"_"開頭,比如_trans_id, _log_id等,為了兼容,可以在so中將這些字段復制改名; [...@mcpack_key_copy] from : _transid to : trans_id [...@mcpack_key_copy] from : _log_id to : log_id
machine_$(product)_$(module).conf + mcpack協議
下面是pusher的下游是c模塊,mcpack接收的范例配置, 參考space_follow點擊下載
[UbClientConfig] [.UbClient] [..@Service] Name : space_follow ConnectAll : 0 DefaultConnectTimeOut : 300 DefaultReadTimeOut : 1000 DefaultWriteTimeOut : 1000 DefaultMaxConnect : 10 #DefaultRetry : 5 #LONG / SHORT DefaultConnectType : SHORT #DefaultLinger : 0 #ReqBuf : 100 #ResBuf : 100 #DefaultAsyncWaitingNum : 100 [...CurrStrategy] ClassName : UbClientStrategy [...CurrHealthy] ClassName : UbClientHealthyChecker [...@Server] IP : 10.32.52.30 Port : 29003 [...@Server] IP : 10.32.52.31 Port : 29003
module_$(product)_$(module).conf + 競爭模式 + 本地配置
下面是pusher的下游是php模塊,http接收的范例配置. 參考lbs_attent 點擊下載
[modules]
#下面是一個模塊的配置
[.@module]
#模塊名,需要跟ubclient配置里面的名稱一致;
name : lbs_attent
#是否啟用 1啟用 0停用
flag : 1
#是競爭模式還是多主模式;0:競爭,1:多主
sending_type : 0
#轉發協議配置;
sending_protocol : http
#發送窗口大小,需要比線程數大;
sending_window_size : 10
#發送線程數。競爭模式為競爭線程數,多主模式下,為每個機器的並發線程數
sending_thread_num : 8
#出錯后重試的時間間隔(單位MS)
sending_retry_time: 500
#命令過濾規則 ,接受的命令配置;
# *表示通配,只能出現在字符串的最后,不能出現在中間
[..msg_filter]
@filter : promo.atten.*
#@filter : promo.*
#時序控制規則,支持按照某個partition key的時序支持
[..sequence_control]
#制定mutex key,必須是整形數類型的字段;
mutex_key : qid
#指定對於 沒有PK的命令,是否強制串行處理;
#1:是:該命令必須等待所有靠前的命令都執行完成,再執行。該命令執行前,靠后的命令也不能執行。(主要用於存在批量命令時,保證時序)。
#0:否:該命令完全無序,到了就執行。
force_sequence_when_no_mutex_key : 1
#支持任意模塊自定義配置、so自定義配置
[..ext_config]
custom_key : sample
[..http]
#第二部分
#用戶自定義配置內容
[...default_conf]
#重試次數,默認為-1,一直重試; 0:不重試;
max_retry_times : -1
#是否將提交的mcpack作為post數據發送,1發送0不發送,缺省1,0的情況下只往后端發url
send_pack : 1
#發送mcpack的方式:0:二進制模式直接post, 1:轉化成text,使用post $key=$pack_str 的方式發送;
# 2: 轉換成json,使用post $key=$pack_str 的方式發送;默認為0模式;
send_pack_type : 0
#在send_pack_type=1/2的時候,配置post的key;默認"data”;
send_pack_key : data
#server 冗余均衡策略: 0:random,對多個ip進行輪詢;1:master-slave,只在出錯時切換到下一個ip;
#默認0;
server_redundancy_policy : 0
#uri的模版,其中{{}}里面的字段將從mcpack里面獲取替換; 支持req_download.uid這種多級的字段;
uri : /promov1/commit/attention?mod=commit&transid={{_transid}}
#http method, 0:get, 1:post, default: 1
#http_method : 0
#過濾器,會判斷mcpack中的uid字段的值%2后是否為1,只有為1時才發送
#目前只支持"=" 和 "%" 這兩種;
#filter : {{user_id}}%2=1
#http header, 多個header用\\r\\n隔開;
http_header : User-Agent: NuSOAP/0.6.6\r\ncharset=UTF-8
#命令號相關配置
#針對每個不同的命令號,可以重設上面的uri/http_method/filter/send_pack/http_header,覆蓋上面的默認配置;
#[...space_photo_add]
#uri : /so/test22?service=Commit&pid=test&tk=test&transid={{_transid}}
#filter : {{is_xxx}}=1
#[...space_post_add]
#send_pack : 1
#uri : /so/test33?service=Commit&pid=test&tk=test&transid={{_transid}}
#filter : {{uid}}%2=1
#mcpack字段復制
#在消息中間件中,內部填充字段都是以"_"開頭,比如_transid, _log_id等,為了兼容,可以在so中將這些字段復制改名;
#[...@mcpack_key_copy]
#from : _transid
#to : trans_id
machine_$(product)_$(module).conf + 競爭模式 + 本地配置
下面是pusher的下游是php模塊,http接收的范例配置. 參考lbs_attent 點擊下載
[UbClientConfig] [.UbClient] #競爭的例子(不用zk時) [..@Service] Name : lbs_attent ConnectAll : 0 DefaultConnectTimeOut : 300 DefaultReadTimeOut : 1000 DefaultWriteTimeOut : 1000 DefaultMaxConnect : 10 #DefaultRetry : 5 #LONG / SHORT DefaultConnectType : SHORT #DefaultLinger : 0 #ReqBuf : 100 #ResBuf : 100 #DefaultAsyncWaitingNum : 100 #聲明將要使用的策略類及屬性 [...CurrStrategy] ClassName : UbClientStrategy [...CurrHealthy] ClassName : UbClientHealthyChecker [...@Server] IP : 10.40.71.100 Port : 8016 [...@Server] IP : 10.40.71.101 Port : 8016
從cm/transfer遷移
每個產品線可能有自己特殊的情況,以下僅列出一些常見的問題,供參考。 cm/transfer到nmq不能做到透明的遷移,需要產品線做上下游的兼容。
1.遷移方式
雙寫或順切。一般考慮到平滑過渡和不丟數據的可回滾,會使用雙寫的方案。
2.上游兼容
上游將消息提交從cm/transfer遷移到nmq,需要做幾類事情的改變:
-
命令號從cmd_no遷移到product+topic+cmd
-
和id分配服務交互需要自行實現
-
其他在cm單點做的事情,比如字段檢查等,需要自行實現
可選的方案有兩種:升級上游模塊,或者增加單獨的adapter層。視產品線的情況而定。
3.下游兼容
下游兼容最主要的是發送協議的兼容。簡單的講,下游的兼容主要通過so來做。
- nmq不支持transfer的default協議(協議中帶transid,並且下游可以使 用transid反向控制)。最佳建議:升級下游支持裸的mcpack方式,自己無需保存transid,也不再使用下游反向控制transid的功能。 為兼容default協議使用偽造的方式很trick,無論是在so中實現,還是加一個中間層。
- 其他mysql、comdb的發送方式。 a) 建議不要再使用transfer直接更新mysql的方式,改用下游自己去更新。 b) comdb等so視情況,在pusher下實現新的so即可。
- 一些細節的兼容,可以在so中支持,比如transid等字段的兼容。
自定義轉發流程
如果協議使用mcpack和http,但默認so不能滿足的功能,請聯系nmq維護者。 如果使用其他的轉發協議如comdb、memcache,請聯系nmq維護者,如果不是通用的需求可能需要產品線自己來開發。 自定義so需要實現若干個回調函數,定義在talk_ext.h中。
/*
* @brief 初始化操作,成功返回0,失敗返回-1
*
* @param server 服務器配置
* @param [out] handle_out 自定義處理的句柄,如果有的話
* @param [in] module_conf 自定義處理的命令文件
*
* @return 成功返回0, 失敗返回-1
*/
typedef int (*np_init_f)(talk_svr_t * server, void **handle_out, comcfg::Configure *module_conf);
/*
* @brief 發送命令邏輯
*
* @param server 服務器配置
* @param handle 自定義處理的句柄,如果有的話
* @param logdi logid
* @param req_head 同步請求頭
* @param buf 數據
* @param buf_len 數據的長度
*
* @return 成功返回0,失敗返回-1, 命令格式錯誤返回1
*/
typedef int (*np_send_f)(
talk_svr_t * server,
void *handle,
unsigned logdi,
ub::UbClientManager *ubmgr,
const char *mod_name,
void *buf,
unsigned buf_len);
/*
* @brief 析構操作
*
* @param server server信息
* @param handle 自定義處理的句柄,如果有的話
*
* @return 成功返回0,失敗返回-1
*/
typedef int (*np_free_f)(talk_svr_t * server, void **handle);
/*
* @brief 重設連接
*
* @param server server信息
* @param handle 自定義句柄
*/
typedef void (*np_reset_f)(talk_svr_t * server, void *handle);

