1、Producer使用指南--發送消息注意事項
1、正常情況下一個業務系統盡可能用一個Topic,消息子類型用tags來標識,tags可以由業務系統自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。
MQCPMessage msg = new MQCPMessage(); // 初始化消息對象
message.setTags("TagA"); // 設置消息TAG
2、每個消息在業務層面的唯一標識碼,要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。
MQCPMessage msg = new MQCPMessage(); // 初始化消息對象
message.setTags("TagA"); // 設置消息TAG
String orderId = "20034568923546"; // 訂單Id
message.setKeys(orderId);
3、消息發送成功或者失敗,要打印消息日志,務必要打印sendresult和key字段。
4、send消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在sendResult里定義
5、對於消息不可丟失應用,務必要有消息重發機制。例如如果消息發送失敗,存儲到數據庫,能有定時程序嘗試重發,或者人工觸發重發。
2、Consumer使用指南--消費端去重
RocketMQ無法避免消息重復,所以如果業務對消費重復非常敏感,務必要在業務層面去重,有以下幾種去重方式:
1. 將消息的唯一鍵,可以是msgID,也可以是消息內容中的唯一標識字段,例如訂單Id等,消費之前判斷是否在Db或Tair(全局KV存儲)中存在,如果不存在則插入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)msgId一定是全局唯一標識符,但是可能會存在同樣的消息有兩個不同msgId的情況(有多種原因),這種情況可能會使業務上重復消費,建議最好使用消息內容中的唯一標識字段去重。
2. 使用業務層面的狀態機去重
5、如何判斷發送消息是否成功?
客戶端Producer調用send消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在sendResult里定義。
返回狀態 |
狀態釋義 |
SEND_OK |
消息發送成功 |
FLUSH_DISK_TIMEOUT |
消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時MASTER服務器宕機,消息才會丟失 |
FLUSH_SLAVE_TIMEOUT |
消息發送成功,但是服務器同步到Slave時超時,消息已經進入服務器隊列,只有此時SLAVE服務器宕機,消息才會丟失 |
SLAVE_NOT_AVAILABLE |
消息發送成功,但是此時SLAVE不可用,消息已經進入服務器隊列,只有此時SLAVE服務器宕機,消息才會丟失 |
目前MQCP測試和生產環境集群都采用兩主兩從共4台Broker機器,針對大部分業務系統來講,只要MQCP沒有拋出異常,可以默認消息已成功發送。建議業務系統針對發送消息后所有非SEND_OK狀態的消息,打印Warning日志,並在運營端設置對應的監控規則,及時發郵件提醒。
6、如何判斷消費消息是否成功?
客戶端Consumer在MQCPMessageListener中實現pushMessage(),遍歷並處理消息后會返回給MQCP端消費的狀態,狀態只有消費成功或者消費失敗兩種狀態。
消費狀態 |
狀態釋義 |
CONSUME_OK |
消費成功 |
CONSUME_FAIL |
消費失敗 |
7、消費端如何實現定時消費?
在某些業務場景下,消費端希望在業務低峰(例如半夜12點后)時開始從MQCP拉取消息,在業務高峰期前關閉掉消費功能,以此來降低系統負載。這種類似場景涉及到如何在不停業務服務的場景下,多次的開啟和關閉MQCP消費服務。
MQCP的消費者本身是可以多實例初始化的,每個實例的消費者服務開啟和關閉也是獨立的,所以可以很良好的支持定時消費的場景。
如果業務系統有類似的需求,我們建議:
1. 業務系統本身需要添加功能開關,支持配置化的方式來開啟或關閉消費服務。其實現本身比較簡單,就是調用MQCP消費者的start || shutdown方法。必要時需要對方法添加上層邏輯封裝,來實現定制化的需求。
2. 調用完consumer對象的shutdown方法后,不要立即初始化下一個consumer對象並啟用服務,建議至少延遲幾秒種,等相關的資源回收完畢。
3. 完善業務端開啟或關閉消息服務的日志,方便后續運維處理問題。
4. 具體實現方式,可以參考下面TestTimedConsumeImpl類。
import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import java.util.Properties; import com.paic.mqcp.client.MQCPFactory; import com.paic.mqcp.client.common.MQCPException; import com.paic.mqcp.client.common.MQCPMessageListener; import com.paic.mqcp.client.consumer.MQCPConsumer; import com.paic.mqcp.client.dto.MQCPMessage; import com.paic.mqcp.client.dto.MQCPMessageFilter; import com.paic.mqcp.client.util.MQCPConstant; import com.paic.mqcp.client.util.MQCPConsumeStatus; import com.paic.mqcp.common.consumer.ConsumeFromWhere; /** * 模擬業務系統需要在不停應用的場景下,開啟、關閉接受消息服務 * * @author WUJING754 * */ public class TestTimedConsumeImpl { /** 屬性對象 */ private static Properties p = null; /** * 獲取屬性值對象 * * @return Properties */ public static Properties initialProperties() { p = new Properties(); // 業務系統可以從配置文件中取出該屬性,demo中寫死的 p.setProperty(MQCPConstant.NAME_SERVER_ADDRESS, "10.20.22.148:9876;10.20.22.149:9876"); // 消費者ID,業務系統需要在MQCP-ADMIN中注冊,否則無法正常發送消息 // demo中已經初始化好了的,后續請聯系MQCP開發注冊系統 p.setProperty(MQCPConstant.CONSUMER_ID, "CID_PARP_TEST_DEFAULT"); // INSTANCE_NAME屬性建議業務系統不設置,用默認的即可 //p.setProperty(MQCPConstant.INSTANCE_NAME,"_PACP"); return p; } public static MQCPConsumer initialConsumer(){ MQCPConsumer pushConsumer = null; try { // 初始化過濾對象 MQCPMessageFilter mqcpFilter = new MQCPMessageFilter(); List<String> list = new ArrayList<String>(); // 設置tag // tag的作用是過濾消息,如果設置改值,MQCP只會取出發送消息時設置了該tag值的消息 // 需要注意的是同一CID下的多個應用實例需要設置為同樣的tag列表來過濾消息,以保證消息不會被過濾取走但未被業務系統處理 list.add("testByWUJING754"); mqcpFilter.setTags(list); // 初始化MQCPPushConsumer並指定為集群消費(消息只會被消費一次,無論應用的實例有多少個) // PUSH消費模式下,客戶端包會啟動后台線程不斷的從MQCP中拉去消息(准實時方式,毫秒級延時) // 業務系統收取到消息后,需要實現MQCPMessageListener,來處理消息 pushConsumer = MQCPFactory .createConsumer(initialProperties()); // 設置第一次CID消費消息的時間點,這里設置從最后的消息開始獲取 // 如果不設置該值,默認是隊列的最開始出消費消息 pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 訂閱 T_PAFA5_LOG_P 主題的消息(創建topic需要在MQCP-ADMIN中,如需創建請聯系MQCP開發) pushConsumer.subscribe("T_PARP_TEST_DEMO", mqcpFilter, new MQCPMessageListener() { @Override public MQCPConsumeStatus pushMessage( List<MQCPMessage> messageList) { // 監聽到有消息抵達后,業務系統需要遍歷messageList對象,來獲取消息 // messageList的默認大小為1,即消息是一條一條的推送到客戶端的 for (MQCPMessage msg : messageList) { // 消息明細 MQCPMessage對象 // 建議業務系統將從消息平台拉取消息和處理消息的邏輯解耦, // 在consumer監聽器中只監聽到消息,建議簡單的將獲取的消息解析存儲,然后返回 MQCPConsumeStatus.CONSUME_OK, // 后端可以異步來處理接收到的消息 try { System.out.println("###########receive message\n[msgTopic:" + msg.getTopic() + "\nmsgId:" + msg.getMsgId() + "\nmsgContent:" + new String(msg.getConent(),"UTF-8") + "\nmsgKey:" + msg.getKey() + "]"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return MQCPConsumeStatus.CONSUME_OK; } }); } catch (Exception ex) { ex.printStackTrace(); } return pushConsumer; } /** * 入口 * @param args */ public static void main(String[] args) { // 初始化一個pushConsumer對象 MQCPConsumer pushConsumer = initialConsumer(); // 啟動該consumer對象,開始消費消息 startConsumingMsg(pushConsumer); // 業務系統不斷收取消息,處理消息過程中... // 根據業務需要,可以調用stopConsumingMsg()方法來停止收取消息 stopConsumingMsg(pushConsumer); // 主線程休眠30秒 try { Thread.sleep(30000L); } catch (InterruptedException e) { e.printStackTrace(); } // 模擬業務系統再次初始化一個consumer對象 pushConsumer = initialConsumer(); // 啟動consumer,開始收取消息 startConsumingMsg(pushConsumer); // 業務系統不斷收取消息,處理消息過程中... // 業務系統根據需要,停止取消息服務 stopConsumingMsg(pushConsumer); } /* * 啟動傳入的消費者對象 */ public static boolean startConsumingMsg(final MQCPConsumer pushConsumer){ boolean flag = true; try { pushConsumer.start(); System.out.println("-->start consumer success."); } catch (MQCPException e) { System.out.println("-->start consumer fail due to " + e.getMessage()); e.printStackTrace(); flag = false; } return flag; } /* * 關閉傳入的消費者對象 */ public static void stopConsumingMsg(final MQCPConsumer pushConsumer){ pushConsumer.shutdown(); System.out.println("-->shutdown consumer success."); } }
9、客戶端生產者是否有消息重發機制?
(摘至RocketMq官方文檔)
RocketMq消息發送失敗如何處理,Producer的send方法本身支持內部重試,重試邏輯如下:
1. 至多重試5次。
2. 如果發送失敗,則輪轉到下一個Broker。
3. 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認5s。
所以,如果本身向broker發送消息產生超時異常,就不會再做重試。
以上策略仍然不能保證消息一定發送成功,為保證消息一定成功,建議應用這樣做:如果調用send同步方法發送失敗,則嘗試將消息存儲到db,由后台線程定時重試,保證消息一定到達Broker。
上述DB重試方式為什么沒有集成到MQ客戶端內部做,而是要求應用自己去完成,我們基於以下幾點考慮
1. MQ的客戶端設計為無狀態模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內存、網絡。
2. 如果MQ客戶端內部集成一個KV存儲模塊,那么數據只有同步落盤才能較可靠,而同步落盤本身性能開銷較大,所以通常會采用 異步落盤,又由於應用關閉過程不受MQ運維人員控制,可能經常會發生kill -9這樣暴力方式關閉,造成數據沒有及時落盤而丟 失。
3. Producer所在機器的可靠性較低,一般為虛擬機,不適合存儲重要數據。
綜上,建議重試過程交由應用來控制。
10、客戶端消費者是否有消息重發機制?
(摘至RocketMq官方文檔)
消息重試機制如下:
注意:重試的消息,MsgKey不變,MsgId會變。
11、業務消費端沒有取到消息,如何去定位問題?
正常情況下生產者發送消息到MQCP,消息被投遞到消費端的延時應該在毫秒級。如果消費端遲遲沒有收到消息,建議采用下面的步驟來排查問題:
1、 獲得消息的消息ID或者KEY,去MQCP-ADMIN的消息查詢模塊,根據自己的消費者ID找到其對應的消費狀態。
2、 常見的投遞狀態有:
SUBSCRIBED_AND_CONSUMED |
訂閱了,而且消費了(Offset越過了) |
SUBSCRIBED_BUT_FILTERD |
訂閱了,但是被過濾掉了 |
the consumer group[***] not online |
訂閱了,但是消費者未啟動 |
SUBSCRIBED_AND_NOT_CONSUME_YET |
訂閱了,但是沒有消費(Offset小) |
UNKNOW_EXCEPTION |
未知異常 |
注: SUBSCRIBED_AND_CONSUMED狀態,表示消息已被正常消費掉,如果此時有異常,需要業務系統檢查日志,分析看看是否因為解析消息時有異常,導致消息未被正確處理。
SUBSCRIBED_BUT_FILTERD狀態,需要業務系統檢查初始化Consumer對象時傳入的TagList是否和生產者定義的tag匹配。
SUBSCRIBED_AND_NOT_CONSUME_YET狀態,可能的原因是由於消息有積壓,消息還未被取走,可以稍等幾十秒再去查詢一下狀態。
the consumer group[***] not online狀態,表示對應CID的消費者還未正確啟動。業務系統需要檢查消費者是否已啟動,如果已啟動請檢查是否啟動時有報錯。有可能相關的配置項配置錯誤,導致consumer啟動時校驗失敗。
UNKNOW_EXCEPTION表示消息平台有異常,請聯系MQCP開發同事。
13、如何避免接收到的消息是亂碼?
對於生產者來說,建議將消息body轉為byte數組時顯示指定為UTF-8編碼。對於消費者來說,建議在接收到消息后將byte數組轉為String時指定UTF-8編碼。這樣可以避免因為消息body中有中文或者特殊字符,消費端解析時亂碼,進而造成消息解析失敗。
Example of producer
MQCPMessage msg = new MQCPMessage();
msg.setConent("test msg body".getBytes("UTF-8")); // 生產者組裝消息body時指定urf-8編碼
Example of consumer
14、MQCP中的消息標簽(tag)如何使用?
在消息中間件實際的使用場景中,消費者只需要消息隊列中的部分消息,其余消息希望默認不被接收,直接丟棄掉。針對類似這種場景 ,MQCP提供通過合理使用消息標簽(Tag)的方式來實現消費端靈活過濾隊列中消息的功能。
實現方式如下:
1、 生產者和消費者雙方約定消息標簽具體的設置值及其代表的含義。
2、 生產者在發送消息時,組裝消息對象的時候,需要給對應消息設置正確的消息標簽。
MQCPMessage msg = new MQCPMessage();
//設置過濾標簽---大小寫敏感
msg.setTag("SystemTag");
3、 消費者在組裝消費者對象時,需要正確設置消息過濾的過濾器
MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();
List<String> list = new ArrayList<String>();
// 設置tag
// tag的作用是過濾消息,如果設置改值,MQCP只會取出發送消息時設置了該tag值的消息
// 需要注意的是同一CID下的多個應用實例需要設置為同樣的tag列表來過濾消息,以保證消息不會被過濾取走但未被業務系統處理list.add("SystemTag");
mqcpFilter.setTags(list);
上述生產端、消費端的實現代碼具體可查閱MQCP官網提供的Demo。
16、The consumer’s subscription not exist報錯?
現象:業務系統開發環境報異常:The consumer’s subscription not exist,檢查消費者狀態,多台實例在線,檢查消息消費情況,消息一直不被消費,狀態為:SUBSCRIBED_AND_NOT_CONSUME_YET(訂閱未消費)。
原因:業務系統之前已經接入過MQCP,且開發環境有多個開發人員在同時進行開發,多個主機同時連接MQCP,本次該consumer新增了1個訂閱關系(subscription-A),但業務系統只有1個同事(小Y)的代碼才新增了訂閱關系,其他幾個開發人員的代碼沒有配置,一旦他們的consumer啟動后,給broker保持通信時發送的訂閱關系中,並沒有subscription-A,broker會remove該訂閱關系,因此小Y的consumer啟動后,與broker通信時發現沒有subscription-A,就會報:The consumer’s subscription not exist。
解決方案:1、不同的cid,訂閱不同的topic,避免同一個cid訂閱多個topic
2、業務系統開發環境,只開發相關功能的主機才啟動consumer
17、業務系統消費端兩種常見的錯誤實現?
18、申請生產環境mq消息查詢權限
請使用IE瀏覽器,在itsm系統申請對應的權限,通道如下:
IT權限管理-申請->平安科技_消息協作平台監控應用(MQCP_ADMIN)=>帳戶管理組
19、測試環境admin平台查詢消息已消費,但消費端未查詢到消息
同一個消費者是集群消費模式,在測試環境中,只想在某一個環境測試MQ功能,需要每個環境配置不同的CID。比如某系統測試環境有多套環境:STG1和STG2環境,測試人員正在STG1測試,如果兩個環境的CID相同,則消息有可能就被STG2取走,測試人員在STG1上查不到該消息。
解決方法:
1、若為剛新增的發布訂閱關系,請聯系MQCP同事申請給每一個環境配置獨立的CID,並創建發布訂閱關系
2、若每個環境都申請了獨立的CID,請檢查是否其他某個環境使用了該環境的CID
20、java.lang.reflect.InvocationTargetException告警
業務系統引用客戶端包在1.0.15之前版本在啟動時可能會出現該異常,不影響消息的收發,1.0.15的客戶端版本修復了該問題
解決方法:
更新客戶端包為最新的版本,最新包版本可在包庫查詢。客戶端包不斷在迭代優化,建議業務系統及時更新客戶端包。客戶端包每個版本更新內容可在官網“相關文檔---CLIENT包版本線”查詢。
需注意:客戶端包在1.0.20優化了發布訂閱關系的校驗方式,配置文件的配置項有變化,從原來的name_server_address、cluster_id
、virtual_account變為了:server_address、virtual_account,具體請查閱demo
21、服務啟動時報錯:MQCP_CLIENT->Initial cache failed,no cache message received after waiting for 90s
客戶端在重啟的時報錯初始化發布訂閱關系緩存數據失敗。
解決方法:
1、檢查是否新增了配置文件:mqcp_client.properties,需新增
2、檢查mqcp_client.properties配置文件,客戶端版本為1.0.20之前的版本,需檢查name_server_address、cluster_id的值是否正確,前后是否包含空格,檢查virtual_account配置的虛擬用戶是否與admin平台配置的一致,客戶端版本為1.0.15之前的,還需在代碼里新增name_server_address的配置:p.setProperty(MQCPConstant.NAME_SERVER_ADDRESS,"不同環境配置不同的nameserver地址" ); 客戶端版本為1.0.20及之后的,需要檢查server_address、virtual_account是否正確,是否包含空格。
3、建議更新客戶端包為最新的包。