其實Rocketmq的給第三方的插件已經全了,如果大家有興趣的話請移步https://github.com/apache/rocketmq-externals。本文主要是結合筆者已有的rmq在spark中的應用經驗對rocketmq做簡單介紹以及經驗總結,當然免不了會將rocketmq和如今特別火爆的kafka做一些對比(Ps:為了方便打字rmq后面會是rocketmq的縮寫)。
首先對rocktmq做一些流行的消息隊列對比,見http://rocketmq.apache.org/docs/motivation/。
提到mq不得不提消息隊列,對應於數據結構里面的“先進先出”的隊列。而rocketmq就是應用於大數據時代擁有高吞吐低延遲特性的分布式消息擁有發布訂閱功能的隊列系統。這樣的分布式消息系統主要提供應用解耦、流量消峰、消息分發等功能。本片不會對安裝集群做過多的介紹,安裝單機版本rmq的教程移步官方文檔http://rocketmq.apache.org/docs/quick-start/。rocktmq是阿里研發主要作用於雙十一這樣的高峰期實時流數據處理,起初是基於activemq,但是隨着對吞吐量的要求逐步提高,阿里的開發者們逐漸把眼光向kafka轉移,但是kafka並不具備低延遲和高可靠性。因此阿里決定研究這樣一個兼並傳統的訂閱消息系統的發布訂閱場景與高並發零誤差低延時的傳輸系統。
下面這個表是官網在2016年提供的activemq、kafka以及rocketmq的對比圖。或許對比有點落后,或許開發者比較的眼光比較偏向於rockemq但是僅作為參考(比如數據的有序性,kafka因為需要要有序性和高並發獲得一個平衡只能保證一個partition下的消息通過offset來保持消費有序(當一個主題只有一個Partition的時候就能保持全局消息有序性),rocketmq是通過主題與消息隊列的一對一對應的來確保全局有序性的,實際上這兩種都是可以保證全局有序性,前提都是失去消息的多線程消費)。
上表的對比並不是最新的,對比於2016年。如今,擁有眾多粉絲的kafka在上千家公司得到應用,社區的活躍性讓kafka做了從架構等方面的優化。這里需要提及兩點,目前在kafka官網文檔沒有看到改進說明。一、kafka作為中間件而言,消費模式只有集群消費,廣播消費只存在於同一個主題下不同消費組之間,同一個消費組內的不同消費組進程必須且只能消費某個消息主題下的不同partition,這也造成當消費主題過多時,多個消費者在消費狀態下會有過多磁盤IO讀取文件操作,造成kafka的延時性遠遠高於rocketmq;但是作為高並發,一個主題分成多個partition會使得kafka的高吞吐能力遠遠高於其他中間件。二、消息的重新消費。rmq支持通過指定某個時間點或者offset甚至選擇特定消費決策(latest或者earliest)來重置offset的兩種方式來重新獲取消息,而當前了解是kafka只支持后者一種方式。研究rmq如何實現高並發低延遲的機制請移步http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/。
組成rmq的各個角色介紹。
Producer:生產者。類似於郵件系統中發消息的角色。
ProducerGroup:相同角色的生產者分為一組(考慮到生產者的高效率為了避免不必要的消息初始化,一個組內只允許一個生產者實例)。
Consumer:消費者。類似於郵件系統中收消息的角色。
ConsumerGroup:類似於生產者組,相同角色的消費組分為一個組(在集群模式下,同一個消費組內的消費者均衡的分攤隊列中的消息,不同消費組內不同消費者可以同時接受相同的消息,這就實現了加載平衡和高容錯的目標)。
Topic:主題。是生產者和消費這之間傳輸之前確定好的消息類別。生產者發消息之前需要創建Topic,然后消費者想要獲取這個Topic下的消息需要訂閱這個主題。一個消費者組可以訂閱多個主題,只要這個組內的所有消費者訂閱的主題保持一致性。
Message:消息。就是發送信息的載體,里面包含需要發送的具體信息以及必須要帶Topic(可以理解這里的Topic就是郵件的地址,生產者作為發信人需要寫對的收件人地址,消費者需要登陸對應收件人的郵箱才能收到生產者發送到這個郵箱地址上的郵件)。
MessageQueue:消息隊列。類似於kafka中的partition,只不過這里的分區是邏輯分區不是partition這樣的物理分區,因此如果某個topic下的數據量特別多,可以通過分為不同的消息隊列來獲得高並發量,生產者可以高並發的發送消息,消費者可以高並發的讀取消息,此外需要說明的每個隊列管理一個offset,這里的offset准確的定義是某個topic下的指定隊列里的位置,通過offset可以定位具體的消息,用來指示消費者從offset開始處理。
Broker:接受來自Produer的消息,存儲消息,提供管道給Consumer獲取消息。也會存儲元數據信息,包括消費組、消費進程的offset以及主題甚至隊列的相關信息(HA架構中Broker可以是M/S模式消除單點故障,甚至是多M/S模式可以提供存儲量和吞吐量)。
NameServer:管理Broker的路由信息。Producer和Cosumer需要拿Topics去NameServer中找到對應的Broker的清單(多NameServer可以消除單點故障)。
MessageModel:集群消費和廣播消費。集群消費就是同一個主題下的所有消費者均衡的分攤消息隊列中的消息從而做到負載均衡,廣播消費是所有消費者都消費這個隊列的全量消息。
講完了rocktmq,我們再簡單介紹sparkstreaming。
Spark Streaming是提供高吞吐,擁有容錯能力的實時數據量處理的基於Spark Core的擴展。輸入數據源可以是Kafka、Flume、HDFS以及TCP套接字,並且擁有許多高級算子比如map、reduce、join和window。輸出可以是HDFS、數據庫或者實時儀表盤。甚至可以在這些數據量上執行機器學習和圖論相關的算法。其實,與其說streaming是實時處理,更確切的描述應該是micro-batch的偽實時流數據處理引擎。
在實時性要求不高的場景,是可以秒級的護理該單位時間內的所有數據。具體的接口文檔見https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/spark-streaming-rocketmq.md,這里只介紹編寫入口函數RocketMqUtils.createMQPullStream時
需要重點關注的幾個參數。
forceSpecial:Boolean。默認情況下是false,每個消息隊列如果擁有checkpoint就不管我們是否指定offset消費者都會從checkpoint開始消費數據,但是如果設置為true,那么rmq就會從指定的可以獲取的offset開始消費,對於沒有指定offset的隊列默認從最小位移開始消費。
ConsumerStrategy:ConsumerStrategy。分為earliest、lastest、specificOffset(queueToOffset: ju.Map[MessageQueue, Long])以及specificTime(queueToTime: ju.Map[MessageQueue, String])這四種類型。如果是第一種則是從隊列的最小位移開始消費,這時候可能會重復消費之前以及消費過的消息;第二種是從最大位移開始消費也就是會錯過消費進程啟動前的生產者發的消息;第三種是直接設置指定隊列的offset,如果這個offset小於最小位移就直接從該隊列的最小位移開始消費,否則直接從指定offset開始消費;第四種就是獲取某個時間點轉換為時間戳的的offset。對於沒有指定offset的隊列默認從最小位移開始消費。
autoCommit:Boolean。是否自動提交offset給rmq服務器。true的情況是一旦接受到就自動提交offset;false的情況是異步提交,消息處理並callback后才會提交offset。
failOnDataLoss:Boolean。當查詢的數據丟失(比如topic被刪除或者offset超出范圍)是否報異常退出程序還是僅僅日志警告輸出;這里如果對數據的丟失特別嚴格建議設置為true,否則丟了消息也只是日志warn而已。
這里就對遇到的坑位做一些總結:
1、找不到Topic。要么是打包少了fastjson這個依賴,要么是nameserver地址寫錯了或者topic寫錯了。
2、數據丟失。以下有兩種數據丟失場景。
第一種情況。生產者發了幾條消息給rmq,但是此時消費者的進程還沒有啟動,啟動消費者無法從rmq種獲取那幾條消息,初始化時日志warn顯示"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset"。如果參數設置forceSpecial=true,則會導致每次消費者重新啟動不會按照他上次消費的的checkpointoffset開始消費而是按照指定offset來消費或者直接從最小位移開始消費。但是從代碼的角度看這個參數的優先級是低於ConsumerStrategy(下面源碼種第三段黃色背景標記顯示是在選擇消費決策為SpecificOffsetStrategy后才會用到參數forceSpecial)。而我此時的ConsumerStrategy=lastest,這樣就會讓消費者從他能獲取的最近的幾條消息的maxoffset開始消費(參照第一段黃色標記部分),明顯這些消息都會被略過,而后面我們取ConsumerStrategy=earliest(作用於第二段黃色標記部分),這些之前發的消息全都成功接收成功(前提是消息隊列里面還存儲着這些消息)。
private def computePullFromWhere(mq: MessageQueue): Long = {
var result = -1L
val offsetStore = kc.getOffsetStore
val minOffset = kc.minOffset(mq)
val checkpointOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)
consumerStrategy match {
case LatestStrategy => {
if (checkpointOffset >= 0) {
//consider the checkpoint offset first
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = kc.maxOffset(mq)
} else {
result = checkpointOffset
}
} else {
// First start,no offset
if (mq.getTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0
} else {
result = kc.maxOffset(mq)
}
}
}
case EarliestStrategy => {
if (checkpointOffset >= 0) {
//consider the checkpoint offset first
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = checkpointOffset
}
} else {
// First start,no offset
result = minOffset
}
}
case SpecificOffsetStrategy(queueToOffset) => {
val specificOffset = queueToOffset.get(mq)
if (checkpointOffset >= 0 && !forceSpecial) {
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = checkpointOffset
}
} else {
specificOffset match {
case Some(ConsumerStrategy.LATEST) => {
result = kc.maxOffset(mq)
}
case Some(ConsumerStrategy.EARLIEST) => {
result = kc.minOffset(mq)
}
case Some(offset) => {
if (offset < minOffset) {
reportDataLoss(s"MessageQueue $mq's specific offset $offset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = offset
}
}
case None => {
if (checkpointOffset >= 0) {
//consider the checkpoint offset first
if (checkpointOffset < minOffset) {
reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
result = minOffset
} else {
result = checkpointOffset
}
} else {
logWarning(s"MessageQueue $mq's specific offset and checkpointOffset are none, then use the minOffset")
result = kc.minOffset(mq)
}
}
}
}
}
}
result
}
第二種情況是考慮這樣一個場景。生產者P1每天實時發某個主題T的消息給執行streaming任務的消費者C,主題T里面的消息由兩個key組成,一個是key1,一個key2,這兩個key分別代表消息的兩個不同的內容。消費者進程C會分別對這兩個Key做不同的處理然后將其分別轉化為RDD來做后面的計算,最后分別對計算后的內容包裝進成key1和key2的value,並以主題T2的形式發送回rmq讓另外一個消費者CC來消費。開發者考慮到key1和key2的計算資源充分利用,就將原來的消費者進程C拆成兩個消費者C1和C2來分別處理key1和key2的內容。這個時候,C2進程突然因為其他因素掛了,但是C1進程還在正常消費來自P1的消息,這就意味着C1會正常提交offset給P1,然后繼續接受來自P1的消息,但是實際上C1只會處理來自key1的內容,所以CC只能收到來自C1的消息,遲遲等不到C2的消息。最后導致就算重啟了C2也接受不到之前的消息(有一種想法是進程C1和C2同屬於一個消費組所以C1接收到了消息馬上提交offset給P1,這樣就算重啟C2由於C2的checkpoint早就被C1更改,除非重置offset。這件事情發生在第一個情況即沒有將消費決策改為earliest之前所以不好判斷是否兩個線程的同一個消費組的消費者是否會公用一個offset)。最后的解決方案是將C1和C2分成兩個消費者組來處理消息,當時認為這樣就相當於兩個隊列就不會互相干擾對方的offset。所以我在想這里的解決方案固然是分成兩個隊列處理最好,但是是否給者兩個key分為不同的tag處理是否也會生成兩個隊列?如果不同的tag里面的數據會輸入到不同的隊列,那么接收消息的時候對於C1和C2也不會收到對方的tag下的消息,比如C1只會接受tag1下的消息,並提交tag1對應隊列Q1的offset情況;C2只會接受tag2的消息,並提交tag2的對應隊列Q2的offset。后面我會嘗試這種方法,請大家有空也可以試驗一下,畢竟實踐是檢驗真理的標准。
針對上次第二種情況的丟數據問題,設計這樣一個實驗:
- 設置對照組1,也就是兩個不同消費組中消費者C1和C2分別先后接受來自同一個Topic的三個消息,其中C1先消費完生產者P1的消息,然后啟動消費進程C2;
- 設置實驗組2,同一個消費組不同消費者進程C1和C2分別先后接受來自同一個Topic的相同Tag的三個消息,消費次序同上;
- 設置實驗組3,同一個消費組不同消費者進程C1和C2分別同時接受來自同一個Topic下的tag1的兩個消息和tag2的三個消息;
- 設置實驗組4,同一個消費組不同消費者進程C1和C2分別先后接受來自同一個Topic下的tag1的兩個消息和tag2的三個消息。
實驗結果:
- 對照組1中的C1和C2分別能成功處理消息(其中C1接收的消息出現兩次重復而C2收到的消息有22次),並且接受的三個消息分別來自三個不同的隊列queueId=0、1、2,擁有不同的min_offset、max_offset以及queueOffset和commitLogOffset。其中隊列0的min_offset=2151、max_offset=2197、queueOffset=2196、commitLogOffset=247968272601、storeSize=884,隊列1的min_offset=2148、max_offset=2190、queueOffset=2189、commitLogOffset=247968273485、storeSize=894,隊列2的min_offset=2124、max_offset=2163、queueOffset=2162、commitLogOffset=247968274379、storeSize=901;
- 對照組1中的C1和C2分別能成功處理消息(其中C1收到3條消息,C2無消息接收),這里確實會存在同一個組內相同Tag中的消費者C1和C2存在先后接受消息的時差,導致C1先消費P1的3條消息提交完所有的offset,最后C2從隊列里獲取的commitLogOffset是C1消費完以后的offset,此時無新的消息發過來則無法收到新的消息,后面的進一步實驗證明了如果C1和C2同時在消費P1的數據則能同時獲取3個消息且commitLogOffset相同;
- 對照組1中的C1和C2分別能成功處理消息(C1只能接收到tag1的消息,C2只能接受tag2的消息),且這5個消息的commitLogOffset是連續的,其中C1接收的兩個消息分別是隊列0、3,C2接收的三個消息分別是隊列0、1、2,其中同一個隊列0下是有兩個不同tag下的消息a和b(其中a的min_offset=2151、max_offset=2200、queueOffset=2198,b的min_offset=2151、max_offset=2200、queueOffset=2199),其queueOffset相差1,且同一個隊列的這兩個消息的min_offset與max_offset相同,但是對比對照組1中的隊列1的min_offset=2151和max_offset=2197,這里的min_offset相同但是max_offset不同,本次實驗的queueOffset=2198是從上一次消費的max_offset=2197開始消費的(貌似max_offset是隨着接收消息的不斷遞增的取本次接收消息的最大queue_offset+1=max_offset,大概是因為我是在streaming中異步提交offset造成的);
- 顯而易見C1和C2分別能接受對應的不同的Tag下的消息,雖然消息是同時發送的,但是不同tag下的消費者實際上是可以接收到所有的tag的消息(雖然服務端會過濾一次,客戶端還會做二次過濾),但是C1消費了兩個Tag的數據,提交了commitLog后,C2就無法接收之前的數據。
結果分析:
分析結果1可知。每個send方法就是一個生產線程會產生三個消息隊列,並且者三個隊列是獨立的擁有各自的min_offset、max_offset以及queueOffset,但是commitLogOffset是由生產者統一存儲在commitlog文件中的,所有的消費者都需要從commitlog文件中根據上次消費的消息的commitLogOffset+storeSize的得到本次消息的commitLogOffset從而開始消費數據(如結果1中隊列0的commitLogOffset=247968272601以及storeSize=884,兩者相加等於隊列1的commitLogOffset從而開始消費隊列1的數據以此類推得到隊列2的數據)。兩個不同組下的消費者消費消息是獨立的;
分析結果2可知。同一個組中的不同消費者先后獲取同一個Tag的相同數據如果被某個消費者消費了一次那么其他消費者只能在該消費者消費的基礎上獲取下一個數據;
分析結果3和4可知。不同Tag下的消費者消費消息是獨立的,但是同一個Topic下不同Tag的消息是可以存在於同一個隊列,也就是一個Topic可以是分多個隊列存儲數據,不區分Tag,隊列是一個邏輯的概念,所以同一個隊列下的的queueOffset是邏輯遞增,但是commitLogOffset是實際上生產者存儲在commitLog中的消息存儲地址,是根據消息體的大小增加offset。
但是結果3和結果4依然沒有解決offset是否最小粒度只區別於tag,因為就算C2是后期啟動的,但是根據earliest消費決策在第一次啟動的時候仍然能獲取原來被C1提交了offset的部分數據,考慮到消費決策只在初次啟動消費者進程的時候起作用,再設置對照組5。
- 設置實驗組4,同一個消費組不同消費者進程C1和C2首先分別先后接受來自同一個Topic下的tag1的20個消息a1和tag2的三個消息b1,然后過段時間再分別發送一次tag1和tag2的三條消息a2、b2。
結果:在第一次發消息之后,C1剛開始啟動並向broker訂閱了tag1的消息,日志顯示處理了20條消息a1,但是C2日志頁面收到一條b1的消息,漏了另外兩條消息,從streaming接收的時間數量來看應該是被C1獲取並返回commitOffset,消息首先會在broker服務端過濾然后再在消費端的C1過濾,但是在streaming這邊確實是只會輸出20條消息,實際上是有接收另外2條消息的(在spark streaming的頁面顯示有22條記錄);同理C2在application 的spark streaming頁面是收到4條記錄,其中三條根據comimitLog計算確實是tag1的a1的最后三條消息並且這四條記錄(一條b1和三條a2)的commitLog依次遞增。在C1和C2都運行穩定后,在第二次發消息兩個客戶端分別都能接收到6條a1和3條b2(其中C1接收的a1有重復的兩條,C2收到的b2消息不重復)。
分析:這里的只要是同一個Topic的同一個消費組只會提交一次commitLogOffset,所以如果要徹底解決消息丟失的問題還是得分組,同一個組內如果出現不同的tag,這里的消息不會出現丟失(如果兩個客戶端不出現消費時差)但是可能會有重復數據。
總的來說目前不知道是rmq集群不穩定還是如何,streaming任務幾乎每隔幾天就會報錯連接不上rmq的nameserver或者連接不上broker。還有一個問題是由於我們的streaming任務每天會初始化數據持久化到內存種,然后每次這個時候都會warn找不到metadata,這個原因也可以研究一下,不知道是否跟unpersist方法有關。