【順序消息】
順序消費是指消息的產生順序和消費順序相同。
比如訂單的生成、付款、發貨,這三個消息必須按順序處理才可以。
【順序消息的分類】
全局順序消息和部分順序消息。
上面訂單的例子,其實是部分順序消息,只要保證同一個訂單ID的三個消息能順序消費即可。
【全局順序消息】
【部分順序消費】
在實際的場景中,更多的是像訂單類消息那樣,只需要部分有序即可。
[ MessageQueueSelector ]
Producer發送端使用MessageQueueSelector類來控制把消息發往哪個MessageQueue。
[ MessageListenerOrderly ]
Consumer消費端使用MesageListenerOrderly類來解決單MessageQueue的消息被並發處理的問題。
MessageListenerOrderly並不是簡單地禁止並發處理,在MessageListenerOrderly的實現中,為每個Consumer Queue加個鎖,消費每個消息前,需要獲得這個消息對應的Consumer Queue所對應的鎖,這樣保證了同一時間,同一個Consumer Queue的消息不被並發消費,但不同的Consumer Queue的消息可以並發處理。
【消息重復問題】
消息重復一般情況下不會發生,但是消息量大、網絡有波動的情況下,消息重復消費就是個大概率事件。
比如Producer有個方法setRetryTimesWhenSendFailed,是設置在消息發送失敗時的自動重試次數,默認為2。
如果第一次發送消息時,Broker端收到了消息,但是沒有正確返回發送成功的狀態,就會造成消息重復。
[ 如何解決消息重復問題? ]
方法1:
確保消費邏輯的冪等性,即多次調用和一次調用的效果一樣。
方法2:
維護一個已消費消息的記錄,消費前查詢這個消息是否被消費過。
【動態增減機器1——NameServer】
1.集群的各個組件從NameServer獲取各種屬性和地址信息。
2.各個Broker會定時上報自己的狀態信息到NameServer。
為了提高可靠性,建議啟動多個NameServer,NameServer占用的資源不多,可以和Broker部署在同一台機器上。有多個NameServer后,減少NameServer不會對其他組件產生影響。
[ 設置NameServer地址的四種方式,優先級由高到低 ]
1.通過代碼設置,即通過 Producer.setNameSrvAddr("127.0.0.1:9876;127.0.0.2:9876") 來設置。
2.使用Java啟動參數設置,對應的是option的rocketmq.namesrv.addr。
3.通過Linux環境變量來設置,在啟動前設置變量:NAMESRV_ADDR。
4.通過HTTP服務來設置,(重點:這是唯一支持動態增加NameServer,無需重啟其它組件的方式!)如果上述的3個優先級更高的方式沒有使用,程序會向一個HTTP地址請求來獲取NameServer地址,默認URL是http://jmenv.tbsite.net:8080/rocketmq/nsaddr。
通過roketmq.namesrv.domain參數來覆蓋jmenv.tbsite.net;
通過rocketmq.namesrv.domain.subgroup參數來覆蓋nsaddr;
[ 注:]
上面第4中方式是唯一支持動態增加NameServer,無需重啟其他組件的方式,使用這種方式后其他組件會每隔2分鍾請求一次該URL,獲取最新的NameServer地址。
【動態增減機器2——Broker】
[ 動態增加Broker機器是否會對原來的Topic產生影響? ]
只增加Broker不會對原有的Topic產生影響,原來創建好的Topic中數據的讀寫依然在原來的那些Broker上進行。
[ 集群擴容(新增Broker機器)后,怎么做? ]
1.把新建的Topic指定到新的Broker機器上,均衡利用資源。
2.另一種方式是通過updateTopic命令更改現有的Topic配置,在新加的Broker上創建新的隊列。比如“TopicA”是現有的一個Topic,因為數據量增加后需要擴容,新增的一個Broker機器地址是127.0.0.3:10911,這個時候執行如下命令:
sh ./bin/mqadmin updateTopic -b 127.0.0.3:10911 -t TopicA -n 127.0.0.1:9876
結果就是在新增的Broker機器上,為T“TopicA”創建了8個讀寫隊列。
[ 如果需要減少Broker,怎么做? ]
減少Broker要看是否有持續運行的Producer,當一個Topic只有一個Master Broker,停掉這個Broker后,消息的發送必定會受影響,需要再停止Broker前,停止發消息。
[ 如果某個Topic下有多個Master Broker,停掉了其中一個,這時候是否會丟失消息呢? ]
和Producer使用的發消息的方式有關。
1.如果是同步方式發送:
在DefaultMQProducer內部有關而自動重試邏輯,其中一個Broker停了,會自動向另一個Broker發消息,不會出現丟消息的情況。
2.如果是異步方式或sendOneWay方式發送:
會丟失切換過程中的消息,因為在異步和sendOneWay方式下,Producer.setRetryTimesWhenSendFailed設置不會起作用,發送失敗不會重試。
DefaultMQProducer默認每30S到NameServer請求最新的路由消息,Producer如果獲取不到已停止的Broker下的隊列消息,后續就自動不再想這些隊列發送消息。
[ 如何置換Mster Broker機器? ]
如果Producer能夠暫停,在有一個Master和一個Slave的情況下也可以順利切換,可以關閉Producer后關閉Master Broker,這個時候所有的讀取會被定向到Slave Broker機器,消費消息不受影響。把Master Broker機器置換完后,基於原來的數據啟動這個Master Broker,然后再啟動Producer程序正常發送消息。
【各種故障對消息的影響,針對各種故障的處理方式 】
可能出現的故障:
1.Broker正常關閉,啟動。 2.Broker異常Crash,然后啟動。 3.OS Crash,重啟。 4.機器斷電,當能馬上恢復供電。 5.磁盤損壞。 6.CPU、主板、內存等關鍵設備損壞。
現有的RocketMQ集群,一般每個Topic都配有多Master角色的Broker寫入,並且每個Master都至少有一個Slave機器。
[ 1. Broker正常關閉,啟動 ]
這種情況內存中的數據不會丟失。
如果重啟過程中有持續運行的Consumer,Broker Master機器出故障后,Consumer會自動重連對應的Broker Slave機器,不會有消息丟失和偏差。
當Broker Master重啟成功后,Consumer又會重現連接到Broker Master機器上。
如果重啟過程中有持續運行的Producer,一台Broker Master出故障后,Producer會向該Topic下的其他Master機器發送消息,如果Producer采用的是同步發送的方式,不會有消息丟失。
[ 第2,3,4種情況的處理 ]
2,3,4屬於軟件故障,內存的數據有可能會丟失。
刷盤的策略不同,造成的影響不同。
如果Broker Master和Broker Slave都配置成SYNC_FLUSH同步刷盤,可以達到消息不丟失。
[ 第5,6種情況的處理 ]
第5,6種情況屬於硬件故障,原有機器的磁盤數據可能會丟失,如果Master和Slave機器間配置成同步復制方式,某一台機器發生這樣的故障,也可以達到消息不丟失的效果,如果Master和Slave之間是異步復制的方式,兩次Sync之間的消息會丟失。
【各種故障處理的總結】
1.多Master,每個Master都帶有多個Slave。
2.主從之間設置成SYNC_MASTER。同步復制。
3.Producer設置成同步方式寫。
4.刷盤策略設置成SYNC_FLUSH,同步刷盤。