消息中間件MetaQ高性能原因分析-轉自阿里中間件


簡介

MetaQ是一款高性能的消息中間件,經過幾年的發展,已經非常成熟穩定,歷經多年雙11的零點峰值壓測,表現堪稱完美。

MetaQ當前最新最穩定的穩本是3.x系統,MetaQ 3.x重新設計和實現,比之前的版本更優秀。雖然MetaQ借鑒了linkedin 的消息中間件kafak思想,但已經是青出於藍而勝於藍。

本文不對MetaQ做全面的介紹,只選擇高性能這點來分析。

性能測試對比圖

以上測試圖片,來自消息測中間件試團隊 @以夕 妹子的性能測試結果

核心功能

MetaQ作為一款消息中間件,消息中間件該有的功能,MetaQ也有。本文並不全面介紹MetaQ方面方面,只是選取性能這一角度,來剖析其高性能的原因。

功能組件

  • MetaQ Server

    最為核心的組件,它主要可以接收應用程序發送過來的消息並存儲,然后再投遞。

  • MetaQ Master

    是MetaQ Server邏輯上的角色,和MySQL Master概念類型,對外提供發送消息、訂閱消息以及維護着管理信息。

  • MetaQ Slave

    是MetaQ Server邏輯上的角色,和MySQL Slave概念類型,對外提供訂閱消息功能。

  • MetaQ Client

    主要是應用程序使用,使用MetaQ Client來發送消息、訂閱消息、其它控制信息。

  • 其它無數據管理及控制信息組件

    提供訂閱關系管理功能,MetaQ Server服務發現功能。

發送消息

MetaQ Client 發送消息,MetaQ Server收到消息,並存儲到文件系統。也就是說MetaQ會有大量write系統調用。

訂閱消息

MetaQ Client 訂閱消息,因其是Pull的模型。MetaQ Server收到Pull消息的請求,會從磁盤上讀取出消息,然后返回給MetaQ Client。這一步有大量的read系統調用。

矛盾

從上面的功能上看,Metaq Server要支持大量的磁盤IO操作,因為其是構建文件系統之上的消息中間件。既然使用了文件系統來存儲數據,但磁盤QPS每秒也就是幾百。MetaQ Server又必須高性能(如MetaQ Server性能是10W級別的QPS),才能在可接收的成本范圍內,滿足業務需求(不丟消息)。如何在QPS只有幾百的磁盤上,構建出一個高性能的MetaQ消息間件正是本文的中心。

高性能

前面介紹了MetaQ高性能的難點,那么我們如何解決這些難點。要解決這些難點,就必須找出這些難點。那么要寫一個高性能的消息中間件,會有哪些會部分會對影響性能。

影響性能的關鍵幾點

序列化與反序列化

從MetaQ Cleint要發送消息,必須要先序列化,然后才能通過網絡發送出去。 MetaQ Server收到消息后,要進行反序列化,才能解析出消息內容,最后序列化存儲到文件系統。

MetaQ Client收到消息,首頁MetaQ Server必須從文件中讀取消息,然后通過網絡發送給MetaQ Client,收到消息,進行反序列化,應用才能識別消息內容。

MetaQ核心功能,都要通過序列化與反序列化,所以其性能,對MetaQ性能有關鍵性的影響,其實不是對MetaQ,只要使用了序列化與反序列化,其對性能影響都很大。

write性能

因為MetaQ Server會有大量的write系統調用 ,所以其性能對MetaQ性能有着重要的影響。

read性能

因為MetaQ Server會有大量的read系統調用 ,所以其性能對MetaQ性能有着重要的影響。

網絡框架

因為發送消息,訂閱消息都必須經過網絡,如果網絡組件性能不好,對MetaQ性能有着關鍵的影響。

如何高性能

序列化與反序列化

要解決序列化與反序列化性能問題,我們就必須尋種各種序列化與反序列化技術性能對比,從而選出一個高性能的序列化與反序列化技術來作為MetaQ。

我們來看下Java世界可以選擇的序列化與反序列化技術

從圖中性能數據,可以看出,個人認為Google出品的Protocol Buffers應該是最佳選擇,不管軟件的質量、社區活躍、軟件的后續發展上來說,都是不錯的選擇。

但MetaQ並沒有選擇Protocol Buffers作為其序列化與反序列化的技術,一個原因是Protocol Buffers居然在小版之間本都不兼容,2.3和2.5的版本都不兼容。這會帶來一個嚴重的問題,如果MetaQ選擇2.3的版本,應用程序選擇了2.5,都會導致沖突,反之亦然。

MetaQ消息元數據是通過JSON來序列化與反序列化,消息Body是交給應用自己序列化與反序列化。

雖然使用Protocol Buffers性能會更好,但帶給用戶帶來麻煩。所以MetaQ選擇使用JSON。

IO優化

前面也已經介紹了,MetaQ Server 存大大量的IO,那么怎么優化呢?

read優化

read優化主要是使用了mmap文件映射技術。這樣可以減少系統上下文切換和復制數據的開銷。。

同時文件系統提供了文件預讀的功能,也使的讀取文件開銷,特別是順序讀時,開銷比較低。

write優化

前面也介紹了,write可能存在並發問題,那么MetaQ是如何解決的?

MetaQ消息只保留在一個物理文件上,所有的消息都會寫一個物理文件,每個物理文件都是固定大小,超過設置的閥值后,自動創建新的一個文件。當磁盤快滿時,會自動刪除老的文件。

Group Commit技術

Group Commit也就是組提交,組提交是指可以多次分寫請求只要通過一次刷新數據,就可以實現這些請求的數據都已刷新到磁盤上。

MySQL數據庫能保證ACID,事務提交也使用了Group Commit來提高性能(為了保證D,數據需要持久化到文件系統)。

詳細見下圖

當寫請求1到MetaQ Server時,把線程寫入內核后,觸發flush線程刷新數據到磁盤,以保證數據的可靠性。
然后再向MetaQ Client 響應發送消息成功。這個時間,只要文件系統和磁盤不損壞,數據是不會丟失的。

正在flush線程要准備刷新數據時,寫請求2,寫請求3,寫請求4也到MetaQ Server且寫入數據,這樣因寫請求1寫數據,觸發的flush順便也把寫請求2,寫請求3,寫請求4的數據也刷新到磁盤。這樣減少了刷新磁盤的次數,性能自然就高了,同時也保證的數據的可靠性。

如何實現Group Commit,請看源碼

// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
}
else {
service.wakeup();
}
}
// Asynchronous flush
else {
this.flushCommitLogService.wakeup();
}

並發安全

write如何保證並發安全,在寫數據前,需要搶占一個鎖,因為這只是把數據寫到文件系統緩存中,所以持有鎖的時間非常短,對性能友好。請看代碼

synchronized (this) {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();

// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);

MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: "
+ msg.getBornHostString());
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: "
+ msg.getBornHostString());
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

DispatchRequest dispatchRequest = new DispatchRequest(//
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10

this.defaultMessageStore.putDispatchRequest(dispatchRequest);

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} // end of synchronized

網絡性能

MetaQ的網絡框架,選擇了Netty4。Netty4因出色的性能和易用性,成為高性能場景的不二選擇。

后記

MetaQ高性能的秘密,我們從其功能結構,從功能的作用,一個個解釋了可能影響性能的點,及怎么解決這些問題,提高性能。

雖然一個個點看起來簡單,但要實現一個穩定、高性能的消息系統,還是不容易的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM