mq使用經驗


1、Producer使用指南--發送消息注意事項


 

1、正常情況下一個業務系統盡可能用一個Topic,消息子類型用tags來標識,tags可以由業務系統自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。

 

  MQCPMessage msg = new MQCPMessage(); // 初始化消息對象

  message.setTags("TagA"); // 設置消息TAG

 

2、每個消息在業務層面的唯一標識碼,要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。     

 MQCPMessage msg = new MQCPMessage(); // 初始化消息對象

  message.setTags("TagA"); // 設置消息TAG

  String orderId = "20034568923546"; // 訂單Id

  message.setKeys(orderId);

 

3、消息發送成功或者失敗,要打印消息日志,務必要打印sendresult和key字段。

 

4、send消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在sendResult里定義

 

5、對於消息不可丟失應用,務必要有消息重發機制。例如如果消息發送失敗,存儲到數據庫,能有定時程序嘗試重發,或者人工觸發重發。

 

2、Consumer使用指南--消費端去重

 

RocketMQ無法避免消息重復,所以如果業務對消費重復非常敏感,務必要在業務層面去重,有以下幾種去重方式:

 

  1. 將消息的唯一鍵,可以是msgID,也可以是消息內容中的唯一標識字段,例如訂單Id等,消費之前判斷是否在Db或Tair(全局KV存儲)中存在,如果不存在則插入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)msgId一定是全局唯一標識符,但是可能會存在同樣的消息有兩個不同msgId的情況(有多種原因),這種情況可能會使業務上重復消費,建議最好使用消息內容中的唯一標識字段去重。

 

  2. 使用業務層面的狀態機去重

5、如何判斷發送消息是否成功?

      客戶端Producer調用send消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在sendResult里定義。

 

返回狀態                

狀態釋義                

SEND_OK

消息發送成功                

FLUSH_DISK_TIMEOUT

消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時MASTER服務器宕機,消息才會丟失                

FLUSH_SLAVE_TIMEOUT

消息發送成功,但是服務器同步到Slave時超時,消息已經進入服務器隊列,只有此時SLAVE服務器宕機,消息才會丟失                

SLAVE_NOT_AVAILABLE

消息發送成功,但是此時SLAVE不可用,消息已經進入服務器隊列,只有此時SLAVE服務器宕機,消息才會丟失                

 

      目前MQCP測試和生產環境集群都采用兩主兩從共4台Broker機器,針對大部分業務系統來講,只要MQCP沒有拋出異常,可以默認消息已成功發送。建議業務系統針對發送消息后所有非SEND_OK狀態的消息,打印Warning日志,並在運營端設置對應的監控規則,及時發郵件提醒。

 

6、如何判斷消費消息是否成功?

 

     客戶端Consumer在MQCPMessageListener中實現pushMessage(),遍歷並處理消息后會返回給MQCP端消費的狀態,狀態只有消費成功或者消費失敗兩種狀態。

 

 

消費狀態                

狀態釋義                

CONSUME_OK

消費成功                

CONSUME_FAIL

消費失敗                

 

7、消費端如何實現定時消費?

     在某些業務場景下,消費端希望在業務低峰(例如半夜12點后)時開始從MQCP拉取消息,在業務高峰期前關閉掉消費功能,以此來降低系統負載。這種類似場景涉及到如何在不停業務服務的場景下,多次的開啟和關閉MQCP消費服務。

      MQCP的消費者本身是可以多實例初始化的,每個實例的消費者服務開啟和關閉也是獨立的,所以可以很良好的支持定時消費的場景。

如果業務系統有類似的需求,我們建議:

      1. 業務系統本身需要添加功能開關,支持配置化的方式來開啟或關閉消費服務。其實現本身比較簡單,就是調用MQCP消費者的start || shutdown方法。必要時需要對方法添加上層邏輯封裝,來實現定制化的需求。

      2. 調用完consumer對象的shutdown方法后,不要立即初始化下一個consumer對象並啟用服務,建議至少延遲幾秒種,等相關的資源回收完畢。

      3. 完善業務端開啟或關閉消息服務的日志,方便后續運維處理問題。

      4. 具體實現方式,可以參考下面TestTimedConsumeImpl類。

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import com.paic.mqcp.client.MQCPFactory;
import com.paic.mqcp.client.common.MQCPException;
import com.paic.mqcp.client.common.MQCPMessageListener;
import com.paic.mqcp.client.consumer.MQCPConsumer;
import com.paic.mqcp.client.dto.MQCPMessage;
import com.paic.mqcp.client.dto.MQCPMessageFilter;
import com.paic.mqcp.client.util.MQCPConstant;
import com.paic.mqcp.client.util.MQCPConsumeStatus;
import com.paic.mqcp.common.consumer.ConsumeFromWhere;

/**
 * 模擬業務系統需要在不停應用的場景下,開啟、關閉接受消息服務
 * 
 * @author WUJING754
 *
 */
public class TestTimedConsumeImpl {
	
	/** 屬性對象 */
	private static Properties p = null;
	
	/**
	 * 獲取屬性值對象
	 * 
	 * @return Properties
	 */
	public static Properties initialProperties() {
		
		p = new Properties();
		// 業務系統可以從配置文件中取出該屬性,demo中寫死的
		p.setProperty(MQCPConstant.NAME_SERVER_ADDRESS,
				"10.20.22.148:9876;10.20.22.149:9876");
		// 消費者ID,業務系統需要在MQCP-ADMIN中注冊,否則無法正常發送消息
		// demo中已經初始化好了的,后續請聯系MQCP開發注冊系統 
		p.setProperty(MQCPConstant.CONSUMER_ID, "CID_PARP_TEST_DEFAULT");
		// INSTANCE_NAME屬性建議業務系統不設置,用默認的即可
		//p.setProperty(MQCPConstant.INSTANCE_NAME,"_PACP");
		
		return p;
	}
	
	public static MQCPConsumer initialConsumer(){
		
		MQCPConsumer pushConsumer = null;
		
		try {
			// 初始化過濾對象
			MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();
			List<String> list = new ArrayList<String>();
			// 設置tag 
			// tag的作用是過濾消息,如果設置改值,MQCP只會取出發送消息時設置了該tag值的消息
			// 需要注意的是同一CID下的多個應用實例需要設置為同樣的tag列表來過濾消息,以保證消息不會被過濾取走但未被業務系統處理
			list.add("testByWUJING754");
			mqcpFilter.setTags(list);
			// 初始化MQCPPushConsumer並指定為集群消費(消息只會被消費一次,無論應用的實例有多少個)
			// PUSH消費模式下,客戶端包會啟動后台線程不斷的從MQCP中拉去消息(准實時方式,毫秒級延時)
			// 業務系統收取到消息后,需要實現MQCPMessageListener,來處理消息
			pushConsumer = MQCPFactory
					.createConsumer(initialProperties());
			// 設置第一次CID消費消息的時間點,這里設置從最后的消息開始獲取
			// 如果不設置該值,默認是隊列的最開始出消費消息
			pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
			// 訂閱 T_PAFA5_LOG_P 主題的消息(創建topic需要在MQCP-ADMIN中,如需創建請聯系MQCP開發)
			pushConsumer.subscribe("T_PARP_TEST_DEMO", mqcpFilter,
					new MQCPMessageListener() {
						@Override
						public MQCPConsumeStatus pushMessage(
								List<MQCPMessage> messageList) {
							// 監聽到有消息抵達后,業務系統需要遍歷messageList對象,來獲取消息
							// messageList的默認大小為1,即消息是一條一條的推送到客戶端的
							for (MQCPMessage msg : messageList) {
								// 消息明細  MQCPMessage對象
								// 建議業務系統將從消息平台拉取消息和處理消息的邏輯解耦,
								// 在consumer監聽器中只監聽到消息,建議簡單的將獲取的消息解析存儲,然后返回 MQCPConsumeStatus.CONSUME_OK,
								// 后端可以異步來處理接收到的消息 
								try {
									System.out.println("###########receive message\n[msgTopic:"
													+ msg.getTopic()
													+ "\nmsgId:"
													+ msg.getMsgId()
													+ "\nmsgContent:"
													+ new String(msg.getConent(),"UTF-8")
													+ "\nmsgKey:"
													+ msg.getKey() + "]");
								} catch (UnsupportedEncodingException e) {
									e.printStackTrace();
								}
							}

							return MQCPConsumeStatus.CONSUME_OK;
						}
					});

		} catch (Exception ex) {
			ex.printStackTrace();
		}
		return pushConsumer;

	}
	
	/**
	 * 入口
	 * @param args
	 */
	public static void main(String[] args) {
		
		// 初始化一個pushConsumer對象
		MQCPConsumer pushConsumer = initialConsumer();
		
		// 啟動該consumer對象,開始消費消息
		startConsumingMsg(pushConsumer);
		
		// 業務系統不斷收取消息,處理消息過程中...
		
		// 根據業務需要,可以調用stopConsumingMsg()方法來停止收取消息
		stopConsumingMsg(pushConsumer);
		
		// 主線程休眠30秒
		try {
			Thread.sleep(30000L);
		} catch (InterruptedException e) {
			
			e.printStackTrace();
		}
		// 模擬業務系統再次初始化一個consumer對象
		pushConsumer = initialConsumer();
		// 啟動consumer,開始收取消息
		startConsumingMsg(pushConsumer);
		
		// 業務系統不斷收取消息,處理消息過程中...
		
		// 業務系統根據需要,停止取消息服務		
		stopConsumingMsg(pushConsumer);
		
	}
	/*
	 * 啟動傳入的消費者對象
	 */
	public static boolean startConsumingMsg(final MQCPConsumer pushConsumer){
		
		boolean flag = true;
		try {
			pushConsumer.start();
			System.out.println("-->start consumer success.");
		} catch (MQCPException e) {
			
			System.out.println("-->start consumer fail due to " + e.getMessage());
			e.printStackTrace();
			flag = false;
		}
		return flag;
	}
	
	/*
	 * 關閉傳入的消費者對象
	 */
	public static void stopConsumingMsg(final MQCPConsumer pushConsumer){
		
		pushConsumer.shutdown();
		System.out.println("-->shutdown consumer success.");
	}

}

9、客戶端生產者是否有消息重發機制?

 (摘至RocketMq官方文檔)
RocketMq消息發送失敗如何處理,Producer的send方法本身支持內部重試,重試邏輯如下:
  1. 至多重試5次。
  2. 如果發送失敗,則輪轉到下一個Broker。
  3. 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認5s。
所以,如果本身向broker發送消息產生超時異常,就不會再做重試。

  以上策略仍然不能保證消息一定發送成功,為保證消息一定成功,建議應用這樣做:如果調用send同步方法發送失敗,則嘗試將消息存儲到db,由后台線程定時重試,保證消息一定到達Broker。
     

  上述DB重試方式為什么沒有集成到MQ客戶端內部做,而是要求應用自己去完成,我們基於以下幾點考慮 
    1. MQ的客戶端設計為無狀態模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內存、網絡。
    2. 如果MQ客戶端內部集成一個KV存儲模塊,那么數據只有同步落盤才能較可靠,而同步落盤本身性能開銷較大,所以通常會采用           異步落盤,又由於應用關閉過程不受MQ運維人員控制,可能經常會發生kill -9這樣暴力方式關閉,造成數據沒有及時落盤而丟           失。
    3. Producer所在機器的可靠性較低,一般為虛擬機,不適合存儲重要數據。
      綜上,建議重試過程交由應用來控制。

 

 

10、客戶端消費者是否有消息重發機制?

 (摘至RocketMq官方文檔)
消息重試機制如下:

       

 

  注意:重試的消息,MsgKey不變,MsgId會變。

 

11、業務消費端沒有取到消息,如何去定位問題?

 

     正常情況下生產者發送消息到MQCP,消息被投遞到消費端的延時應該在毫秒級。如果消費端遲遲沒有收到消息,建議采用下面的步驟來排查問題:

1、 獲得消息的消息ID或者KEY,去MQCP-ADMIN的消息查詢模塊,根據自己的消費者ID找到其對應的消費狀態。

2、 常見的投遞狀態有:

SUBSCRIBED_AND_CONSUMED

訂閱了,而且消費了(Offset越過了)                

SUBSCRIBED_BUT_FILTERD   

訂閱了,但是被過濾掉了                

the consumer group[***] not online

訂閱了,但是消費者未啟動                

SUBSCRIBED_AND_NOT_CONSUME_YET

訂閱了,但是沒有消費(Offset小)                

UNKNOW_EXCEPTION

未知異常                

  注: SUBSCRIBED_AND_CONSUMED狀態,表示消息已被正常消費掉,如果此時有異常,需要業務系統檢查日志,分析看看是否因為解析消息時有異常,導致消息未被正確處理。
     SUBSCRIBED_BUT_FILTERD狀態,需要業務系統檢查初始化Consumer對象時傳入的TagList是否和生產者定義的tag匹配。
     SUBSCRIBED_AND_NOT_CONSUME_YET狀態,可能的原因是由於消息有積壓,消息還未被取走,可以稍等幾十秒再去查詢一下狀態。
     the consumer group[***] not online狀態,表示對應CID的消費者還未正確啟動。業務系統需要檢查消費者是否已啟動,如果已啟動請檢查是否啟動時有報錯。有可能相關的配置項配置錯誤,導致consumer啟動時校驗失敗。
     UNKNOW_EXCEPTION表示消息平台有異常,請聯系MQCP開發同事。

 

 

13、如何避免接收到的消息是亂碼?

 

     對於生產者來說,建議將消息body轉為byte數組時顯示指定為UTF-8編碼。對於消費者來說,建議在接收到消息后將byte數組轉為String時指定UTF-8編碼。這樣可以避免因為消息body中有中文或者特殊字符,消費端解析時亂碼,進而造成消息解析失敗。

  

Example of producer

MQCPMessage msg = new MQCPMessage();

msg.setConent("test msg body".getBytes("UTF-8")); // 生產者組裝消息body時指定urf-8編碼

 

Example of consumer

       

14、MQCP中的消息標簽(tag)如何使用?

    在消息中間件實際的使用場景中,消費者只需要消息隊列中的部分消息,其余消息希望默認不被接收,直接丟棄掉。針對類似這種場景 ,MQCP提供通過合理使用消息標簽(Tag)的方式來實現消費端靈活過濾隊列中消息的功能。  
實現方式如下:
        1、 生產者和消費者雙方約定消息標簽具體的設置值及其代表的含義。
        2、 生產者在發送消息時,組裝消息對象的時候,需要給對應消息設置正確的消息標簽。

 

MQCPMessage msg = new MQCPMessage();

//設置過濾標簽---大小寫敏感

msg.setTag("SystemTag");

 

        3、 消費者在組裝消費者對象時,需要正確設置消息過濾的過濾器

 

MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();

List<String> list = new ArrayList<String>();

// 設置tag 

// tag的作用是過濾消息,如果設置改值,MQCP只會取出發送消息時設置了該tag值的消息

// 需要注意的是同一CID下的多個應用實例需要設置為同樣的tag列表來過濾消息,以保證消息不會被過濾取走但未被業務系統處理list.add("SystemTag");

mqcpFilter.setTags(list);

 

   上述生產端、消費端的實現代碼具體可查閱MQCP官網提供的Demo。

 

16、The consumer’s subscription not exist報錯?

 

     現象:業務系統開發環境報異常:The consumer’s subscription not exist,檢查消費者狀態,多台實例在線,檢查消息消費情況,消息一直不被消費,狀態為:SUBSCRIBED_AND_NOT_CONSUME_YET(訂閱未消費)。

 

     原因:業務系統之前已經接入過MQCP,且開發環境有多個開發人員在同時進行開發,多個主機同時連接MQCP,本次該consumer新增了1個訂閱關系(subscription-A),但業務系統只有1個同事(小Y)的代碼才新增了訂閱關系,其他幾個開發人員的代碼沒有配置,一旦他們的consumer啟動后,給broker保持通信時發送的訂閱關系中,並沒有subscription-A,broker會remove該訂閱關系,因此小Y的consumer啟動后,與broker通信時發現沒有subscription-A,就會報:The consumer’s subscription not exist。

 

    解決方案:1、不同的cid,訂閱不同的topic,避免同一個cid訂閱多個topic

           2、業務系統開發環境,只開發相關功能的主機才啟動consumer

 

17、業務系統消費端兩種常見的錯誤實現?

 

       

       

 

18、申請生產環境mq消息查詢權限

 

請使用IE瀏覽器,在itsm系統申請對應的權限,通道如下:

 

IT權限管理-申請->平安科技_消息協作平台監控應用(MQCP_ADMIN)=>帳戶管理組 

 

 

 

19、測試環境admin平台查詢消息已消費,但消費端未查詢到消息

 

   同一個消費者是集群消費模式,在測試環境中,只想在某一個環境測試MQ功能,需要每個環境配置不同的CID。比如某系統測試環境有多套環境:STG1和STG2環境,測試人員正在STG1測試,如果兩個環境的CID相同,則消息有可能就被STG2取走,測試人員在STG1上查不到該消息。        

   解決方法:      

   1、若為剛新增的發布訂閱關系,請聯系MQCP同事申請給每一個環境配置獨立的CID,並創建發布訂閱關系 

   2、若每個環境都申請了獨立的CID,請檢查是否其他某個環境使用了該環境的CID

 

20、java.lang.reflect.InvocationTargetException告警

 

    業務系統引用客戶端包在1.0.15之前版本在啟動時可能會出現該異常,不影響消息的收發,1.0.15的客戶端版本修復了該問題

   解決方法:

   更新客戶端包為最新的版本,最新包版本可在包庫查詢。客戶端包不斷在迭代優化,建議業務系統及時更新客戶端包。客戶端包每個版本更新內容可在官網“相關文檔---CLIENT包版本線”查詢。

   需注意:客戶端包在1.0.20優化了發布訂閱關系的校驗方式,配置文件的配置項有變化,從原來的name_server_address、cluster_id 

、virtual_account變為了:server_address、virtual_account,具體請查閱demo

 

21、服務啟動時報錯:MQCP_CLIENT->Initial cache failed,no cache message received after waiting for 90s

 

     客戶端在重啟的時報錯初始化發布訂閱關系緩存數據失敗。 

     解決方法:     

     1、檢查是否新增了配置文件:mqcp_client.properties,需新增

     2、檢查mqcp_client.properties配置文件,客戶端版本為1.0.20之前的版本,需檢查name_server_address、cluster_id的值是否正確,前后是否包含空格,檢查virtual_account配置的虛擬用戶是否與admin平台配置的一致,客戶端版本為1.0.15之前的,還需在代碼里新增name_server_address的配置:p.setProperty(MQCPConstant.NAME_SERVER_ADDRESS,"不同環境配置不同的nameserver地址" ); 客戶端版本為1.0.20及之后的,需要檢查server_address、virtual_account是否正確,是否包含空格。     

     3、建議更新客戶端包為最新的包。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM