MQ疑難雜症小記


為什么使用消息隊列?

什么業務場景,這個業務場景有個什么技術挑戰,如果不用MQ可能會很麻煩,但是你現在用了MQ之后帶給了你很多的好處。消息隊列的常見使用場景,其實場景有很多,但是比較核心的有3個:解耦、異步、削峰。

解耦:

A系統發送個數據到BCD三個系統,接口調用發送,那如果E系統也要這個數據呢?那如果C系統現在不需要了呢?現在A系統又要發送第二種數據了呢?而且A系統要時時刻刻考慮BCDE四個系統如果掛了咋辦?要不要重發?我要不要把消息存起來?

你需要去考慮一下你負責的系統中是否有類似的場景,就是一個系統或者一個模塊,調用了多個系統或者模塊,互相之間的調用很復雜,維護起來很麻煩。但是其實這個調用是不需要直接同步調用接口的,如果用MQ給他異步化解耦,也是可以的,你就需要去考慮在你的項目里,是不是可以運用這個MQ去進行系統的解耦。

異步:

A系統接收一個請求,需要在自己本地寫庫,還需要在BCD三個系統寫庫,自己本地寫庫要30msBCD三個系統分別寫庫要300ms450ms200ms。最終請求總延時是30 + 300 + 450 + 200 = 980ms,接近1s,異步后,BCD三個系統分別寫庫的時間,A系統就不再考慮了。

削峰:

每天0點到16點,A系統風平浪靜,每秒並發請求數量就100個。結果每次一到16~23點,每秒並發請求數量突然會暴增到1萬條。但是系統最大的處理能力就只能是每秒鍾處理1000個請求啊。怎么辦?需要我們進行流量的削峰,讓系統可以平緩的處理突增的請求。

 

消息隊列有什么優點和缺點?

優點上面已經說了,就是在特殊場景下有其對應的好處,解耦、異步、削峰。

缺點呢? 

系統可用性降低

系統引入的外部依賴越多,越容易掛掉,本來你就是A系統調用BCD三個系統的接口就好了,ABCD四個系統好好的,沒啥問題,你偏加個MQ進來,萬一MQ掛了怎么辦?MQ掛了,整套系統崩潰了,業務也就停頓了。

系統復雜性提高

硬生生加個MQ進來,怎么保證消息沒有重復消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性? 

一致性問題

A系統處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是BCD三個系統那里,BD兩個系統寫庫成功了,結果C系統寫庫失敗了,你這數據就不一致了。

所以消息隊列實際是一種非常復雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉。 

常見消息隊列的比較

 

消息的重復

原因

第一類原因

消息發送端應用的消息重復發送,有以下幾種情況。

消息發送端發送消息給消息中間件,消息中間件收到消息並成功存儲,而這時消息中間件出現了問題,導致應用端沒有收到消息發送成功的返回因而進行重試產生了重復。

消息中間件因為負載高響應變慢,成功把消息存儲到消息存儲中后,返回“成功”這個結果時超時。

消息中間件將消息成功寫入消息存儲,在返回結果時網絡出現問題,導致應用發送端重試,而重試時網絡恢復,由此導致重復。

可以看到,通過消息發送端產生消息重復的主要原因是消息成功進入消息存儲后,因為各種原因使得消息發送端沒有收到“成功”的返回結果,並且又有重試機制,因而導致重復。

第二類原因

消息到達了消息存儲,由消息中間件進行向外的投遞時產生重復,有以下幾種情況。

消息被投遞到消息接收者應用進行處理,處理完畢后應用出問題了,消息中間件不知道消息處理結果,會再次投遞。

消息被投遞到消息接收者應用進行處理,處理完畢后網絡出現問題了,消息中間件沒有收到消息處理結果,會再次投遞。

消息被投遞到消息接收者應用進行處理,處理時間比較長,消息中間件因為消息超時會再次投遞。

消息被投遞到消息接收者應用進行處理,處理完畢后消息中間件出問題了,沒能收到消息結果並處理,會再次投遞

消息被投遞到消息接收者應用進行處理,處理完畢后消息中間件收到結果但是遇到消息存儲故障,沒能更新投遞狀態,會再次投遞。

可以看到,在投遞過程中產生的消息重復接收主要是因為消息接收者成功處理完消息后,消息中間件不能及時更新投遞狀態造成的。

如何解決重復消費

那么有什么辦法可以解決呢?主要是要求消息接收者來處理這種重復的情況,也就是要求消息接收者的消息處理是冪等操作。

什么是冪等性?

對於消息接收端的情況,冪等的含義是采用同樣的輸入多次調用處理函數,得到同樣的結果。例如,一個SQL操作

update stat_table set count= 10 where id =1

這個操作多次執行,id等於1的記錄中的 count字段的值都為10,這個操作就是冪等的,我們不用擔心這個操作被重復。

再來看另外一個SQL操作

update stat_table set count= count +1 where id= 1;

這樣的SQL操作就不是冪等的,一旦重復,結果就會產生變化。

常見辦法

因此應對消息重復的辦法是,使消息接收端的處理是一個冪等操作。這樣的做法降低了消息中間件的整體復雜性,不過也給使用消息中間件的消息接收端應用帶來了一定的限制和門檻。

1. MVCC:

多版本並發控制,樂觀鎖的一種實現,在生產者發送消息時進行數據更新時需要帶上數據的版本號,消費者去更新時需要去比較持有數據的版本號,版本號不一致的操作無法成功。例如博客點贊次數自動+1的接口:

public boolean addCount(Long id, Long version);

update blogTable set count= count+1,version=version+1 where id=321 and version=123

每一個version只有一次執行成功的機會,一旦失敗了生產者必須重新獲取數據的最新版本號再次發起更新。

2. 去重表:

利用數據庫表單的特性來實現冪等,常用的一個思路是在表上構建唯一性索引,保證某一類數據一旦執行完畢,后續同樣的請求不再重復處理了(利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。)

以電商平台為例子,電商平台上的訂單id就是最適合的token。當用戶下單時,會經歷多個環節,比如生成訂單,減庫存,減優惠券等等。每一個環節執行時都先檢測一下該訂單id是否已經執行過這一步驟,對未執行的請求,執行操作並緩存結果,而對已經執行過的id,則直接返回之前的執行結果,不做任何操作。這樣可以在最大程度上避免操作的重復執行問題,緩存起來的執行結果也能用於事務的控制等。

消息的可靠性傳輸

ActiveMQ

要保證消息的可靠性,除了消息的持久化,還包括兩個方面,一是生產者發送的消息可以被ActiveMQ收到,二是消費者收到了ActiveMQ發送的消息。

生產者

非持久化又不在事務中的消息,可能會有消息的丟失。為保證消息可以被ActiveMQ收到,我們應該采用事務消息或持久化消息。

消費者

對消息的確認有4種機制

1、 AUTO_ACKNOWLEDGE = 1    自動確認

2、 CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   

3、 DUPS_OK_ACKNOWLEDGE = 3    自動批量確認

4、 SESSION_TRANSACTED = 0    事務提交並確認

ACK_MODE描述了Consumerbroker確認消息的方式(時機),比如當消息被Consumer接收之后,Consumer將在何時確認消息。所以ack_mode描述的不是producerbroker之間的關系,而是customerbroker之間的關系。

對於broker而言,只有接收到ACK指令,才會認為消息被正確的接收或者處理成功了,通過ACK,可以在consumerBroker之間建立一種簡單的“擔保”機制.

AUTO_ACKNOWLEDGE  

自動確認

    “同步”(receive)方法返回message給消息時會立即確認。

     "異步"(messageListener)方式中,將會首先調用listener.onMessage(message),如果onMessage方法正常結束,消息將會正常確認。如果onMessage方法異常,將導致消費者要求ActiveMQ重發消息。

CLIENT_ACKNOWLEDGE :

客戶端手動確認,這就意味着AcitveMQ將不會“自作主張”的為你ACK任何消息,開發者需要自己擇機確認。

我們可以在當前消息處理成功之后,立即調用message.acknowledge()方法來"逐個"確認消息,這樣可以盡可能的減少因網絡故障而導致消息重發的個數;當然也可以處理多條消息之后,間歇性的調用acknowledge方法來一次確認多條消息,減少ack的次數來提升consumer的效率,不過需要自行權衡。

DUPS_OK_ACKNOWLEDGE

類似於AUTO_ACK確認機制,為自動批量確認而生,而且具有延遲確認的特點ActiveMQ會根據內部算法,在收到一定數量的消息自動進行確認。在此模式下,可能會出現重復消息什么時候consumer故障重啟后,那些尚未ACK的消息會重新發送過來。

SESSION_TRANSACTED

session使用事務時,就是使用此模式。當決定事務中的消息可以確認時,必須調用session.commit()方法,commit方法將會導致當前session的事務中所有消息立即被確認在事務開始之后的任何時機調用rollback(),意味着當前事務的結束,事務中所有的消息都將被重發。當然在commit之前拋出異常,也會導致事務的rollback。

RabbitMQ

1)生產者弄丟了數據

生產者將數據發送到RabbitMQ的時候,可能數據就在半路給搞丟了,因為網絡啥的問題,都有可能。此時可以選擇用RabbitMQ提供的事務功能,就是生產者發送數據之前開啟RabbitMQ事務(channel.txSelect),然后發送消息,如果消息沒有成功被RabbitMQ接收到,那么生產者會收到異常報錯,此時就可以回滾事務(channel.txRollback),然后重試發送消息;如果收到了消息,那么可以提交事務(channel.txCommit)。但是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,因為太耗性能。

所以一般來說,如果要確保RabbitMQ的消息別丟,可以開啟confirm模式,在生產者那里設置開啟confirm模式之后,你每次寫的消息都會分配一個唯一的id,然后如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息id的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。

事務機制和cnofirm機制最大的不同在於,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是confirm機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息RabbitMQ接收了之后會異步回調你一個接口通知你這個消息接收到了。

所以一般在生產者這塊避免數據丟失,都是用confirm機制的。

2RabbitMQ弄丟了數據

就是RabbitMQ自己弄丟了數據,這個你必須開啟RabbitMQ的持久化,就是消息寫入之后會持久化到磁盤,哪怕是RabbitMQ自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。除非極其罕見的是,RabbitMQ還沒持久化,自己就掛了,可能導致少量數據會丟失的,但是這個概率較小。

設置持久化有兩個步驟,第一個是創建queue和交換器的時候將其設置為持久化的,這樣就可以保證RabbitMQ持久化相關的元數據,但是不會持久化queue里的數據;第二個是發送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化的,此時RabbitMQ就會將消息持久化到磁盤上去。必須要同時設置這兩個持久化才行,RabbitMQ哪怕是掛了,再次重啟,也會從磁盤上重啟恢復queue,恢復這個queue里的數據。

而且持久化可以跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤之后,才會通知生產者ack了,所以哪怕是在持久化到磁盤之前,RabbitMQ掛了,數據丟了,生產者收不到ack,你也是可以自己重發的。

哪怕是你給RabbitMQ開啟了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,但是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ掛了,就會導致內存里的一點點數據會丟失。

3)消費端弄丟了數據

RabbitMQ如果丟失了數據,主要是因為你消費的時候,剛消費到,還沒處理,結果進程掛了,比如重啟了,那么就尷尬了,RabbitMQ認為你都消費了,這數據就丟了。

這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關閉RabbitMQ自動ack,可以通過一個api來調用就行,然后每次你自己代碼里確保處理完的時候,再程序里ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認為你還沒處理完,這個時候RabbitMQ會把這個消費分配給別的consumer去處理,消息是不會丟的。

Kafka

1)消費端弄丟了數據

唯一可能導致消費者弄丟數據的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經消費好了這個消息,其實你剛准備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。

大家都知道kafka會自動提交offset,那么只要關閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數據不會丟。但是此時確實還是會重復消費,比如你剛處理完,還沒提交offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。

生產環境碰到的一個問題,就是說我們的kafka消費者消費到了數據之后是寫到一個內存的queue里先緩沖一下,結果有的時候,你剛把消息寫入內存queue,然后消費者會自動提交offset

然后此時我們重啟了系統,就會導致內存queue里還沒來得及處理的數據就丟失了

2kafka弄丟了數據

這塊比較常見的一個場景,就是kafka某個broker宕機,然后重新選舉partitonleader時。大家想想,要是此時其他的follower剛好還有些數據沒有同步,結果此時leader掛了,然后選舉某個followerleader之后,他不就少了一些數據?這就丟了一些數據啊。

所以此時一般是要求起碼設置如下4個參數:

給這個topic設置replication.factor參數:這個值必須大於1,要求每個partition必須有至少2個副本。

kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確保leader掛了還有一個follower吧。

producer端設置acks=all:這個是要求每條數據,必須是寫入所有replica之后,才能認為是寫成功了。

producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。

3)生產者會不會弄丟數據

如果按照上述的思路設置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。

消息的順序性

從根本上說,異步消息是不應該有順序依賴的。在MQ上估計是沒法解決。要實現嚴格的順序消息,簡單且可行的辦法就是:保證生產者 - MQServer - 消費者是一對一對一的關系

ActiveMQ

1、通過高級特性consumer獨有消費者(exclusive consumer

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

consumer = session.createConsumer(queue);

當在接收信息的時候,有多個獨占消費者的時候,只有一個獨占消費者可以接收到消息。

 

獨占消息就是在有多個消費者同時消費一個queue時,可以保證只有一個消費者可以消費消息,這樣雖然保證了消息的順序問題,不過也帶來了一個問題,就是這個queue的所有消息將只會在這一個主消費者上消費,其他消費者將閑置,達不到負載均衡分配,而實際業務我們可能更多的是這樣的場景,比如一個訂單會發出一組順序消息,我們只要求這一組消息是順序消費的,而訂單與訂單之間又是可以並行消費的,不需要順序,因為順序也沒有任何意義,有沒有辦法做到呢?可以利用activemq的另一個高級特性之messageGroup

2、利用Activemq的高級特性:messageGroups

Message Groups特性是一種負載均衡的機制。在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker會檢查是否有某個consumer擁有這個message group。如果沒有,那么broker會選擇一個consumer,並將它關聯到這個message group。此后,這個consumer會接收這個message group的所有消息,直到:Consumer被關閉。Message group被關閉,通過發送一個消息,並設置這個消息的JMSXGroupSeq-1

 

bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");

bytesMessage.setIntProperty("JMSXGroupSeq", -1);

如上圖所示,同一個queue中,擁有相同JMSXGroupID的消息將發往同一個消費者,解決順序問題,不同分組的消息又能被其他消費者並行消費,解決負載均衡的問題。

RabbitMQ

如果有順序依賴的消息,要保證消息有一個hashKey,類似於數據庫表分區的的分區key列。保證對同一個key的消息發送到相同的隊列。A用戶產生的消息(包括創建消息和刪除消息)都按AhashKey分發到同一個隊列。只需要把強相關的兩條消息基於相同的路由就行了,也就是說經過m1m2的在路由表里的路由是一樣的,那自然m1會優先於m2去投遞。而且一個queue只對應一個consumer。

Kafka

一個topic,一個partition,一個consumer,內部單線程消費

如何解決消息隊列的延時以及過期失效問題?

rabbitmqrabbitmq是可以設置過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間,而又沒有設置死信隊列機制,就會被rabbitmq給清理掉,這個數據就沒了。

ActiveMQ則通過更改配置,支持消息的定時發送。

有幾百萬消息持續積壓幾小時怎么解決?

發生了線上故障,幾千萬條數據在MQ里積壓很久。是修復consumer的問題,讓他恢復消費速度,然后等待幾個小時消費完畢?這是個解決方案。不過有時候我們還會進行臨時緊急擴容。

一個消費者一秒是1000條,一秒3個消費者是3000條,一分鍾是18萬條。1000多萬條,所以如果積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概1小時的時間才能恢復過來。

一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:

先修復consumer的問題,確保其恢復消費速度,然后將現有cnosumer都停掉。

新建一個topicpartition是原來的10倍,臨時建立好原先10倍或者20倍的queue數量。然后寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue

接着臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據。

這種做法相當於是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據。

等快速消費完積壓數據之后,再恢復原先部署架構,重新用原先的consumer機器來消費消息。

Kafka是如何實現高性能的?

宏觀架構層面利用Partition實現並行處理

Kafka中每個Topic都包含一個或多個Partition,不同Partition可位於不同節點。同時Partition在物理上對應一個本地文件夾,每個Partition包含一個或多個Segment,每個Segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,可以把一個Partition當作一個非常長的數組,可通過這個“數組”的索引(offset)去訪問其數據。

一方面,由於不同Partition可位於不同機器,因此可以充分利用集群優勢,實現機器間的並行處理。另一方面,由於Partition在物理上對應一個文件夾,即使多個Partition位於同一個節點,也可通過配置讓同一節點上的不同Partition置於不同的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優勢。

利用多磁盤的具體方法是,將不同磁盤mount到不同目錄,然后在server.properties中,將log.dirs設置為多目錄(用逗號分隔)。Kafka會自動將所有Partition盡可能均勻分配到不同目錄也即不同目錄(也即不同disk)上。

Partition是最小並發粒度,Partition個數決定了可能的最大並行度。。

ISR實現可用性與數據一致性的動態平衡

常用數據復制及一致性方案

Master-Slave

- RDBMS的讀寫分離即為典型的Master-Slave方案

- 同步復制可保證強一致性但會影響可用性

- 異步復制可提供高可用性但會降低一致性

WNR

- 主要用於去中心化的分布式系統中。

- N代表總副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少要讀取的副本數

- W+R>N時,可保證每次讀取的數據至少有一個副本擁有最新的數據

- 多個寫操作的順序難以保證,可能導致多副本間的寫操作順序不一致。Dynamo通過向量時鍾保證最終一致性

Paxos及其變種

- GoogleChubbyZookeeper的原子廣播協議(Zab),RAFT

基於ISR的數據復制方案

Kafka的數據復制是以Partition為單位的。而多個備份間的數據復制,通過FollowerLeader拉取數據完成。從一這點來講,Kafka的數據復制方案接近於上文所講的Master-Slave方案。不同的是,Kafka既不是完全的同步復制,也不是完全的異步復制,而是基於ISR的動態復制方案。

ISR,也即In-sync Replica。每個PartitionLeader都會維護這樣一個列表,該列表中,包含了所有與之同步的Replica(包含Leader自己)。每次數據寫入時,只有ISR中的所有Replica都復制完,Leader才會將其置為Commit,它才能被Consumer所消費。

這種方案,與同步復制非常接近。但不同的是,這個ISR是由Leader動態維護的。如果Follower不能緊“跟上”Leader,它將被LeaderISR中移除,待它又重新“跟上”Leader后,會被Leader再次加加ISR中。每次改變ISR后,Leader都會將最新的ISR持久化到Zookeeper中。

由於Leader可移除不能及時與之同步的Follower,故與同步復制相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。

ISR中的所有Follower都包含了所有Commit過的消息,而只有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處於同步狀態,從而與異步復制方案相比提高了數據一致性。

ISR可動態調整,極限情況下,可以只包含Leader,極大提高了可容忍的宕機的Follower的數量。與Majority Quorum方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。

具體實現層面高效使用磁盤特性和操作系統特性

將寫磁盤的過程變為順序寫

Kafka的整個設計中,Partition相當於一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些數據,並且不刪除已經消費的數據,從而避免了隨機寫磁盤的過程。

由於磁盤有限,不可能保存所有數據,實際上作為消息系統Kafka也沒必要保存所有數據,需要刪除舊的數據。而這個刪除過程,並非通過使用“讀-寫”模式去修改文件,而是將Partition分為多個Segment,每個Segment對應一個物理文件,通過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操作。

在存儲機制上,使用了Log Structured Merge Trees(LSM)

注:Log Structured Merge Trees(LSM),谷歌 “BigTable” 的論文,中提出,LSM是當前被用在許多產品的文件結構策略:HBase, Cassandra, LevelDB, SQLite,KafkaLSM被設計來提供比傳統的B+樹或者ISAM更好的寫操作吞吐量,通過消去隨機的本地更新操作來達到這個目標。這個問題的本質還是磁盤隨機操作慢,順序讀寫快。這二種操作存在巨大的差距,無論是磁盤還是SSD,而且快至少三個數量級。

充分利用Page Cache

使用Page Cache的好處如下

- I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提高性能

- I/O Scheduler會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間

- 充分利用所有空閑內存(非JVM內存)。如果使用應用層Cache(即JVM堆內存),會增加GC負擔

- 讀操作可直接在Page Cache內進行。如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過Page Cache)交換數據

- 如果進程重啟,JVM內的Cache會失效,但Page Cache仍然可用

Broker收到數據后,寫磁盤時只是將數據寫入Page Cache,並不保證數據一定完全寫入磁盤。從這一點看,可能會造成機器宕機時,Page Cache內的數據未寫入磁盤從而造成數據丟失。但是這種丟失只發生在機器斷電等造成操作系統不工作的場景,而這種場景完全可以由Kafka層面的Replication機制去解決。如果為了保證這種情況下數據不丟失而強制將Page Cache中的數據Flush到磁盤,反而會降低性能。也正因如此,Kafka雖然提供了flush.messagesflush.ms兩個參數將Page Cache中的數據強制Flush到磁盤,但是Kafka並不建議使用。

如果數據消費速度與生產速度相當,甚至不需要通過物理磁盤交換數據,而是直接通過Page Cache交換數據。同時,FollowerLeader Fetch數據時,也可通過Page Cache完成。

注:Page Cache,又稱pcache,其中文名稱為頁高速緩沖存儲器,簡稱頁高緩。page cache的大小為一頁,通常為4K。在linux讀寫文件時,它用於緩存文件的邏輯內容,從而加快對磁盤上映像和數據的訪問。 Linux操作系統的一個特色。

支持多Disk Drive

Brokerlog.dirs配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然后將這些目錄都配置到log.dirs里。Kafka會盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。

零拷貝

Kafka中存在大量的網絡數據持久化到磁盤(ProducerBroker)和磁盤文件通過網絡發送(BrokerConsumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。

傳統模式下的四次拷貝與四次上下文切換

以將磁盤文件通過網絡發送為例。傳統模式下,一般使用如下偽代碼所示的方法先將文件數據讀入內存,然后通過Socket將內存中的數據發送出去。

buffer = File.readSocket.send(buffer)

這一過程實際上發生了四次數據拷貝。首先通過系統調用將文件數據讀入到內核態BufferDMA拷貝),然后應用程序將內存態Buffer數據讀入到用戶態BufferCPU拷貝),接着用戶程序通過Socket發送數據時將用戶態Buffer數據拷貝到內核態BufferCPU拷貝),最后通過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨着四次上下文切換。

Linux 2.4+內核通過sendfile系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer后,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。

從具體實現來看,Kafka的數據傳輸通過Java NIOFileChanneltransferTotransferFrom方法實現零拷貝。

注: transferTotransferFrom並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。

減少網絡開銷批處理

批處理是一種常用的用於提高I/O性能的方式。對Kafka而言,批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。

Kafka send方法並非立即將消息發送出去,而是通過batch.sizelinger.ms控制實際發送頻率,從而實現批量發送。

由於每次網絡傳輸,除了傳輸消息本身以外,還要傳輸非常多的網絡協議本身的一些內容(稱為Overhead),所以將多條消息合並到一起傳輸,可有效減少網絡傳輸的Overhead,進而提高了傳輸效率。

數據壓縮降低網絡負載

Kafka0.7開始,即支持將數據壓縮后再傳輸給Broker。除了可以將每條消息單獨壓縮然后傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一起壓縮后傳輸。數據壓縮的一個基本原理是,重復數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網絡傳輸效率。

Broker接收消息后,並不直接解壓縮,而是直接將消息以壓縮后的形式持久化到磁盤。Consumer Fetch到數據后再解壓縮。因此Kafka的壓縮不僅減少了ProducerBroker的網絡傳輸負載,同時也降低了Broker磁盤操作的負載,也降低了ConsumerBroker間的網絡傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。

高效的序列化方式

Kafka消息的KeyPayload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器即可。因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如AvroProtocal Buffer)來減少實際網絡傳輸和磁盤存儲的數據規模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM