1. rocketmq源碼安裝
參考官方文檔:http://rocketmq.apache.org/docs/quick-start/
安裝好jdk和maven rocketmq安裝包:https://pan.baidu.com/s/1I3CqWaxFnxtUX1kJpIJkcQ 密碼: vu5m
代碼:https://github.com/2466845324/repository/tree/master/rocketmq
1. 解壓源碼包 unzip rocketmq-all-4.4.0-source-release.zip
2. 重命名並進目錄 mv rocketmq-all-4.4.0 rocketmq && cd rocketmq
3. 編譯打包(會下載很多依賴,大概10分鍾) mvn -Prelease-all -DskipTests clean install -U
4. 啟動nameserver服務 cd distribution/target/apache-rocketmq/bin && sh mqnamesrv
* 默認配置是4g, 如果你的服務器配置較低則會報錯, 則要修改
vim runbroker.sh 和 vim runserver.sh 這兩個文件,具體參數自己設定。
例如配置:JAVA_OPTAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
5. 后台啟動 nohup sh mqnamesrv & 查看啟動日志 tail -f nohup.out
6. 啟動brokerserver服務 nohup sh mqbroker -n localhost:9876 &
7. 查看狀態 jps

rocketmq控制台安裝
1. 解壓安裝包 unzip rocketmq-externals-master.zip && cd rocketmq-externals-master
2. 修改pom文件版本號 cd rocketmq-console && vim pom.xml 修改為 <rocketmq.version>4.4.0</rocketmq.version>
3. 指定機器地址 vim src/main/resources/application.properties 修改為rocketmq.config.namesrvAddr=127.0.0.1:9876
4. 在rocketmq-console目錄下編譯 mvn clean package -Dmaven.test.skip=true
5. 啟動 cd target && nohup java -jar rocketmq-console-ng-1.0.0.jar &
6. 查看日志 tail -f nohup.out 啟動測試 192.168.100.100:8080

2. 消息冪等性
生產者發送消息之后,為了確保消費者消費成功 我們通常會采用手動簽收方式確認消費,MQ就是使用了消息超時、重傳、確認機制來保證消息必達。
場景:
1. 訂單服務(生產者),點擊結算訂單之后需要付款,這時就會發送一條“結算”的消息到mq的broker中。
2. 此時支付服務(消費者)監聽到這條消息之后就會處理結算扣款的邏輯,然后手動簽收訂單告訴mq我已經消費完成了。
3. 如果在結算的過程中程序出現了異常,我們就返回mq一個消費失敗的狀態,此時mq就會重新發送這條消息;
或者是由於網絡波動支付服務一直沒有響應消息的消費狀態,mq也照樣會重新發送這條消息。
4. 那么這種情況下,支付服務(消費者)就會重復收到這條消息,如果不做任何判斷就有可能會重復消費出現多次扣款的情況。
解決方案:
在發送消息的時候,我們可以生成一個唯一ID標識每一條消息,將消息處理成功和去重日志通過事物的形式寫入去重表或緩存中。每次消費之前都先查一遍,如果存在就說明消費過了直接返回消費成功。
3. 消息順序性
消息有序是指按照消息的發送順序來消費。例如我們有這樣一個需求:我們通過讀取sql的日志文件得到它的所有sql,然后通過發送消息給其他服務去執行這些sql來同步數據。這個時候順序性就顯得尤為重要了。比如我們先修改這條數據,然后刪除數據。倘若消費的時候先刪除了再修改這時候得到的數據就不一致了。

解決方案:
我們往一個topic里面發送消息時,它默認會維護幾個隊列,是隨機發送到這些隊列里面的。消費者集群消費時,實際上是一個服務監聽一個隊列。我們發送消息時,對於同個數據的操作指定發送到一個隊列就好了,這時消費者就是按照順序來消費的。
4. 順序冪等案例實戰
比如我們現在我們有下面9條消息要發送,分別是對用戶、訂單、商品的操作,最終的結果應該是“用戶被刪除”,“訂單已完成”,“商品被下架”。如果就條消息隨機指定隊列發送的話,就可能用戶先被刪除了,然后進行修改;也有可能支付訂單的過程比較慢一直沒反饋,從而收到多條消息而重復支付。
生產者對同一數據的操作發送到同一隊列
public List<Data> getOrderList(){ List<Data> list = new ArrayList<Data>(); list.add(new Data(1,"注冊用戶")); list.add(new Data(2,"創建訂單")); list.add(new Data(1,"修改用戶")); list.add(new Data(2,"支付訂單")); list.add(new Data(1,"刪除用戶")); list.add(new Data(3,"商品上架")); list.add(new Data(2,"完成訂單")); list.add(new Data(3,"商品打折")); list.add(new Data(3,"商品下架")); return list; } /** * @Title:順序發送 * @author:吳磊 * @date:2019年5月4日 下午10:28:31 */ @RequestMapping("/send") public Object send() throws Exception{ // 1. 獲取操作數據 List<Data> list = getOrderList(); Message message = null; for(int i=0; i<list.size(); i++) { Data data = list.get(i); // keys是唯一標識, 用作重試冪等判斷. String keys = data.getId()+""; // 2. 裝載消息 (topic, 標簽, key, 消息體) message = new Message(PayOrderlyProducer.TOPIC,"test_tag",keys,data.getType().getBytes()); /* 3. 投遞消息 * 每一個topic默認為4個queue, 我們可以指訂隊列發送. * send(Message msg, MessageQueueSelector selector, Object arg),會將arg作為隊列下標傳 * 給MessageQueueSelector中MessageQueue的arg,從而選出具體的queue. */ SendResult sendResult = payOrderlyProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int id = (int) arg; int index = id % mqs.size();//對queue總數取模 return mqs.get(index); } },data.getId()); System.out.printf("發送結果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),data.getId(),data.getType()); } return "OK"; }
消費者手動簽收消息,如果消費失敗就重復消費,如果重試次數過多就通知運營人員手動處理。
public PayOrderlyConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); // 指定mq服務器地址 consumer.setNamesrvAddr(NAME_SERVER); // 指定消費策略:這里是從最后一條消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試 //consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(TOPIC, "test_tag"); /** * MessageListenerOrderly是單線程的消費 */ consumer.registerMessageListener( new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { MessageExt msg = msgs.get(0); int times = msg.getReconsumeTimes(); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); //模擬異常 String str = new String(msgs.get(0).getBody()); if(str.contains("訂單")) { int i=1/0; } //做業務邏輯操作 System.out.println("消費成功"); return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { System.out.println("重試次數"+times); //如果重試2次不成功,則記錄,人工介入 if(times >= 2){ System.out.println("重試次數大於2,記錄數據庫,發短信通知開發人員或者運營人員"); return ConsumeOrderlyStatus.SUCCESS; } e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); System.out.println("consumer start ..."); }
執行結果:可以看到id相同的數據(對同一數據的操作),都發送到同一隊列了,然后消費者對每一個隊列進行順序消費,消費失敗就會觸發重試機制。

5. 消息積壓解決方案
如果生產環境中consumer半夜掛掉了,項目數據量大的情況下第二天可能有一千萬條消息積壓在broker中。過多的數據積壓不僅占用磁盤空間,還會影響MQ性能。那么這個時候應該怎么辦呢。
修復consumer,然后慢慢消費嗎?比如rocketmq默認一個topic是4個queue,假如處理一條消息需要1秒,用4個consumer同時進行消費,也需要10000000÷4÷60=42分鍾才能消費完。那新的消息怎么辦呢?
解決方案:
由於堆積的topic里面message queue數量固定,即使我們這時增加consumer的數量,它也不能分配到message queue。這時我們可以寫一個分發程序做一個臨時topic隊列擴充,來提高消費者能力。程序從舊的topic中讀取到新的topic,只是新的topic的queue可以指定多一點(理論上可以無限擴充,正常1000以內),然后啟動更多的consumer在臨時新的topic里面消費。

