消息隊列


消息中間件是一個完備的、易於使用的消息隊列系統,替代現有cm/transfer所有的功能,力求解決當前社區提交系統難運維、不通用等弊病,提供一個全流程支持、功能完善、性能可擴展、運維方便、可靠的消息隊列及整套提交系統解決方案。 開發代號是NMQ

長期以來,社區幾大產品線(貼吧、空間、知道、ks等)都獨立維護着自己的提交系統。雖然產品邏輯和規模不同,但從實現上來講,面對的問題相近,解決的思 路相似,維護重復、運維分散、人力浪費。為避免重復工作和促進提交集群收斂,我們希望完成一個大社區范疇下消息中間件的統一解決方案。

  • nmq是一套包含服務、運維平台的整體解決方案。

  • 模型上,nmq 推的模型和cm/transfer一樣。topic等同於cm,pusher等同於transfer。

  • 除了推模式之外,nmq還支持拉的模型。

  • nmq自身支持提交消息的按產品線、按業務、按性能拆分。cm里用於標示命令號的cmd_no被擴展為product、topic、cmd。可以按product、topic將產品線和業務進行拆分。如果單機性能出現瓶頸,可以將同一個topic拆 分為不同的子topic。

  • nmq不分配自增id;nmq沒有和業務相關的字段填充和字段檢查的功能。

  • nmq自身支持發送消息時的並發、時序。

  • 專門為nmq設計開發的mqp運維平台讓運維上更方便。

  • pusher 支持transfer的延時發送功能,只需簡單配置 sending_delay_time

 

topic:按業務划分的消息序列,邏輯上的概念。產品線可以根據自己的業務特點划分,可大 可小。一般情況下,大產品線按各個子業務划分為不同的topic(例如:空間可以划分為博客、相冊、用戶等topic);小產品線可以整體作為一個 topic,如果業務復雜也可以划分為不同的topic。
子topic: 按性能划分的消息序列,實體上的概念。topic可以按partition-key的自定義規則,拆分為若干個子topic,每個子topic內的消息是嚴格保證時序的。
partition-key:消息划分或歸屬的依據,用於topic拆分子topic、競爭 消費時保證時序、多IDC時主IDC的歸屬。消息需自己指定partition-key,且只能有一個partition-key。如果不拆分子 topic或不關心時序,可以不指定partition-key。
競爭模式:同一個模塊部署很多機器,但所有機器競爭消費同一份消息,比如c模塊+mysql 主從模式下,c模塊多機部署,但由於不同機器上的c模塊都更新同一個主庫,因此每條消息只需要被一個機器上的模塊消費即可,不需要每條消息都分發給所有機 器;競爭模式,為以后的主流更新模式。
多主模式:同競爭模式不同,同一個模塊的各個機器,都需要消費全量消息;一般模塊內維護全量數據副本的場景下使用;大產品線的專有實現模塊,一般是這種模式。

單擊放大查看

上游模塊向nmqproxy發送消息,消息經過消息中間件的nmqproxy轉發到對應的topic server模塊,topicserver將消息序列化到磁盤上,pusher掃描並讀取到最新的消息,然后發送給下游模塊。 除了主流的推模型外,nmq也支持“拉”模式,在拉模式下,模塊所有的信息都保存在模塊本地,模塊通過nmq提供的pulllib同nmq保持心跳。

1.接入

1.1怎么接入nmq

產品線之前接入cm/transfer有2種方式:
c/c++通過nshead+mcpack、php通過ral等方式。 產品線接入nmq與接入cm/transfer一般沒有太大變化,一般來說只需要加入以下字段:

  • _product(產品線名稱,7個字節以內)

  • _topic(產品線業務名稱,7個字節以內)

  • _cmd(需要做的事,與之前cmd_no含義類似,只是_cmd為string,例如"addblog"等)

1.2注意項

nmq不再具有cm的id分配功能。解決方法,產品線調idalloc(arch或者space的idalloc)模塊進行id分配,然后再提交到nmq;因此接入前先判斷是否需要id分配功能。
下游轉發不支持mysql.so和comdb.so。因此如果之前的業務是transfer下游直接是mysql或者comdb的話,需要產品線自行開發下游c或者php模塊,實現更新db和comdb功能。

2.提交請求和響應

請求

向nmqproxy提交消息,使用nshead+mcpack2協議,同時兼任mcpack1格式

	//請求包
	{
	_product: "space",
	_topic: "album",
	_cmd: "album_add",
	_partition_key: "10001", //可選
	_msg_unique_key:12333234, //可選
	//	消息自定義的內容
	}
									

響應

響應包中含_error_no和_error_msg字段,表明發送是否成功。注意,響應包中不含消息的原有字段和nmqproxy填充的字段。

	//響應包
	{
	_error_no: (int32)0,
	_error_msg: "OK",
	}
									

3.字段和規范

字段名約束:一個下划線開頭的字段為保留字段。應用不能使用一個下划線開頭的字段名。
字段值約束:只能使用大小寫字母、數字、和下划線。
必須填的字段:_product、_topic、_cmd。
長度限制(含'\0'):product字段8個字符,topic字段8個字符,cmd字段16個字符。

  • product是產品線的唯一標示。

  • topic是業務划分的消息序列的表示,詳見"名詞解釋"。

  • cmd是用有意義的字符串表示的命令號。

  • 可選填的字段:_idc。長度限制(含'\0').product字段8個字符

【二期才支持】idc是機房的標示,用於多IDC提交。 
可選填的字段:_partition_key,_msg_unique_key。長度無限制。 
partition_key用於拆分子topic、多IDC轉發,一般使用用戶id、吧id作為partition_key。 
【暫不支持】msg_unique_key用於提交去重,建議使用隨機性和區分度非常好的uuid。

4.配置實例

1.nmqproxy的ip是用資源定位發布的,因此需要上游模塊配置上nmqproxy在zk的服務器和路徑。
注:整個集群公用一組nmqproxy,所以使用的nmqproxy的資源定位是一樣。
2.在nmqproxy中配置本應用的消息的路由信息。
修改nmqproxy/conf/nmqproxy.conf文件,在[topics]中增加:

#每個topic的配置
[.@topic]
#產品線名稱
product : tieba
#topic名稱
topic : post
#是否啟用,1或0。可選,默認為1
enable : 1
#是否走多IDC提交流程,1或0。可選,默認為0
multi_idc : 0
#拆分子topic的個數。可選,默認為1,表示不拆分
sub_topic_num : 1
#topic server的url,名稱需要和ubclient配置里面的名稱的一致,拆分時,自動在后面補0/1…
topic_server_name_prefix : topic-tieba-post-commit
pusher_server_name_prefix : pusher-tieba-post
									

product和topic是和提交消息里的字段保持一致。 
最后的topic_server_name_prefix是topic_group的名稱,需要和nmqproxy/conf/ubclient.conf保持一致的。
3.topic server的配置不需要修改

一個后端模塊,想從nmq接受消息,對rd來說,只需要提供該模塊的2個配置文件即可。
建議該模塊的模塊命名為$(product)_$(module),那么需要下面2個配置文件: 

  • module_$(product)_$(module).conf

  • machine_$(product)_$(module).conf

模塊文件命名規范: 以下游名字來命名,比如下游叫abc模塊,如果abc模塊只用了某個product的數據,建議名為module_$(product)_abc.conf.如果abc是個很大的系統,用到很多模塊的數據,允許module_abc.conf

module_$(product)_$(module).conf主要配置該模塊的模塊級配置,包括發送協議、接受的消息類型、並發時序控制、字段復制等等。
machine_$(product)_$(module).conf主要配置該模塊的后端機器具體訪問設置,包括ip、端口、超時等配置。
目前支持本地配置,也支持資源定位(zk/webfoot),該配置文件,同ubclient的配置文件基本一致,但在針對多主模塊的配置時,針對每個機器增加了identifer和flag字段。另外多主模式的模塊,不支持資源定位方式。

module_$(product)_$(module).conf + 競爭模式+ mcpack協議

下面是pusher的下游是c模塊,mcpack接收的范例配置, 參考space_follow點擊下載

[modules]
[.@module]
#模塊名,需要跟ubclient配置里面的名稱一致;
name : space_follow
#是否啟用 1啟用 0停用 默認0
flag : 1
#是競爭模式還是多主模式;0:競爭,1:多主
sending_type : 0
#轉發協議配置;
sending_protocol : mcpack
#發送窗口大小,需要比線程數大;
sending_window_size : 10
#發送線程數。競爭模式為競爭線程數,多主模式下,為每個機器的並發線程數
sending_thread_num : 8
#出錯后重試的時間間隔(單位MS)
sending_retry_time: 500

#命令過濾規則 ,接受的命令配置;
# *表示通配,只能出現在字符串的最后,不能出現在中間
[..msg_filter]
@filter:space.follow.*

#時序控制規則,支持按照某個partition key的時序支持
[..sequence_control]
#制定mutex key,必須是整形數類型的字段;
mutex_key : qid

#指定對於 沒有PK的命令,是否強制串行處理;
#1:是:該命令必須等待所有靠前的命令都執行完成,再執行。該命令執行前,靠后的命令也不能執行。(主要用於存在批量命令時,保證時序)。
#0:否:該命令完全無序£??到了就執行。
force_sequence_when_no_mutex_key : 1



#支持任意模塊自定義配置、so自定義配置
[..ext_config]

[..mcpack]
#發送模式,0:nshead+mcpack, 1:ubrpc
send_type : 0

#mcpack字段復制
#在消息中間件中,內部填充字段都是以"_"開頭,比如_trans_id, _log_id等,為了兼容,可以在so中將這些字段復制改名;
[...@mcpack_key_copy]
from : _transid
to : trans_id

[...@mcpack_key_copy]
from : _log_id
to : log_id
									

machine_$(product)_$(module).conf + mcpack協議

下面是pusher的下游是c模塊,mcpack接收的范例配置, 參考space_follow點擊下載

[UbClientConfig]
[.UbClient]

[..@Service]
Name  : space_follow
ConnectAll :  0
DefaultConnectTimeOut  :  300
DefaultReadTimeOut  :  1000
DefaultWriteTimeOut  :  1000
DefaultMaxConnect  :  10
#DefaultRetry  :  5
#LONG / SHORT
DefaultConnectType  :  SHORT
#DefaultLinger  :  0
#ReqBuf  :  100
#ResBuf  :  100
#DefaultAsyncWaitingNum  :  100
[...CurrStrategy]
ClassName  :  UbClientStrategy
[...CurrHealthy]
ClassName  :  UbClientHealthyChecker

[...@Server]
IP : 10.32.52.30
Port : 29003 
[...@Server]
IP : 10.32.52.31
Port : 29003 
									

module_$(product)_$(module).conf + 競爭模式 + 本地配置

下面是pusher的下游是php模塊,http接收的范例配置. 參考lbs_attent 點擊下載

[modules]
#下面是一個模塊的配置

[.@module]
#模塊名,需要跟ubclient配置里面的名稱一致;
name : lbs_attent
#是否啟用 1啟用 0停用
flag : 1 
#是競爭模式還是多主模式;0:競爭,1:多主
sending_type : 0 
#轉發協議配置;
sending_protocol : http
#發送窗口大小,需要比線程數大;
sending_window_size : 10
#發送線程數。競爭模式為競爭線程數,多主模式下,為每個機器的並發線程數
sending_thread_num : 8
#出錯后重試的時間間隔(單位MS)
sending_retry_time: 500 

#命令過濾規則 ,接受的命令配置;
# *表示通配,只能出現在字符串的最后,不能出現在中間
[..msg_filter]
@filter : promo.atten.*
#@filter : promo.*

#時序控制規則,支持按照某個partition key的時序支持
[..sequence_control]
#制定mutex key,必須是整形數類型的字段;
mutex_key : qid

#指定對於 沒有PK的命令,是否強制串行處理;
#1:是:該命令必須等待所有靠前的命令都執行完成,再執行。該命令執行前,靠后的命令也不能執行。(主要用於存在批量命令時,保證時序)。
#0:否:該命令完全無序,到了就執行。
force_sequence_when_no_mutex_key : 1


#支持任意模塊自定義配置、so自定義配置                                                                                                      
[..ext_config]
custom_key : sample

[..http]
#第二部分
#用戶自定義配置內容
[...default_conf]

#重試次數,默認為-1,一直重試; 0:不重試;
max_retry_times : -1

#是否將提交的mcpack作為post數據發送,1發送0不發送,缺省1,0的情況下只往后端發url
send_pack : 1 

#發送mcpack的方式:0:二進制模式直接post, 1:轉化成text,使用post $key=$pack_str 的方式發送;
# 2: 轉換成json,使用post $key=$pack_str 的方式發送;默認為0模式; 
send_pack_type : 0 

#在send_pack_type=1/2的時候,配置post的key;默認"data”;
send_pack_key : data

#server 冗余均衡策略: 0:random,對多個ip進行輪詢;1:master-slave,只在出錯時切換到下一個ip;
#默認0;
server_redundancy_policy : 0 

#uri的模版,其中{{}}里面的字段將從mcpack里面獲取替換; 支持req_download.uid這種多級的字段;
uri : /promov1/commit/attention?mod=commit&transid={{_transid}}

#http method, 0:get, 1:post, default: 1
#http_method : 0

#過濾器,會判斷mcpack中的uid字段的值%2后是否為1,只有為1時才發送
#目前只支持"=" 和 "%" 這兩種;
#filter : {{user_id}}%2=1

#http header, 多個header用\\r\\n隔開;
http_header : User-Agent: NuSOAP/0.6.6\r\ncharset=UTF-8


#命令號相關配置
#針對每個不同的命令號,可以重設上面的uri/http_method/filter/send_pack/http_header,覆蓋上面的默認配置;
#[...space_photo_add]
#uri : /so/test22?service=Commit&pid=test&tk=test&transid={{_transid}}
#filter : {{is_xxx}}=1

#[...space_post_add]
#send_pack : 1
#uri : /so/test33?service=Commit&pid=test&tk=test&transid={{_transid}}
#filter : {{uid}}%2=1

#mcpack字段復制
#在消息中間件中,內部填充字段都是以"_"開頭,比如_transid, _log_id等,為了兼容,可以在so中將這些字段復制改名;
#[...@mcpack_key_copy]
#from : _transid
#to : trans_id
									

machine_$(product)_$(module).conf + 競爭模式 + 本地配置

下面是pusher的下游是php模塊,http接收的范例配置. 參考lbs_attent 點擊下載

[UbClientConfig]                                                                                                                           
[.UbClient]

#競爭的例子(不用zk時)

[..@Service]
Name  : lbs_attent
ConnectAll :  0
DefaultConnectTimeOut  :  300 
DefaultReadTimeOut  :  1000
DefaultWriteTimeOut  :  1000
DefaultMaxConnect  :  10  
#DefaultRetry  :  5
#LONG / SHORT
DefaultConnectType  :  SHORT
#DefaultLinger  :  0
#ReqBuf  :  100
#ResBuf  :  100
#DefaultAsyncWaitingNum  :  100
#聲明將要使用的策略類及屬性
[...CurrStrategy]
ClassName  :  UbClientStrategy
[...CurrHealthy]
ClassName  :  UbClientHealthyChecker
[...@Server]
IP : 10.40.71.100
Port : 8016
[...@Server]
IP : 10.40.71.101
Port : 8016
									

如果協議使用mcpack和http,但默認so不能滿足的功能,請聯系nmq維護者。 如果使用其他的轉發協議如comdb、memcache,請聯系nmq維護者,如果不是通用的需求可能需要產品線自己來開發。 自定義so需要實現若干個回調函數,定義在talk_ext.h中。

/*
 * @brief 初始化操作,成功返回0,失敗返回-1
 *
 * @param server 服務器配置
 * @param [out] handle_out 自定義處理的句柄,如果有的話
 * @param [in] module_conf 自定義處理的命令文件
 *
 * @return 成功返回0, 失敗返回-1
 */
typedef int (*np_init_f)(talk_svr_t * server, void **handle_out, comcfg::Configure *module_conf);
/*
 * @brief 發送命令邏輯
 *
 * @param server 服務器配置
 * @param handle 自定義處理的句柄,如果有的話
 * @param logdi  logid
 * @param req_head  同步請求頭
 * @param buf 數據
 * @param buf_len  數據的長度
 *
 * @return 成功返回0,失敗返回-1, 命令格式錯誤返回1
 */
typedef int (*np_send_f)(
    talk_svr_t * server,
    void *handle,
    unsigned logdi,
    ub::UbClientManager *ubmgr,
    const char *mod_name,
    void *buf,
    unsigned buf_len);
/*
 * @brief 析構操作
 *
 * @param server  server信息
 * @param handle 自定義處理的句柄,如果有的話
 *
 * @return 成功返回0,失敗返回-1
 */
typedef int (*np_free_f)(talk_svr_t * server, void **handle);

/*
 * @brief 重設連接
 *
 * @param server server信息
 * @param handle    自定義句柄
 */
typedef void (*np_reset_f)(talk_svr_t * server, void *handle);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM