Netty構建分布式消息隊列(AvatarMQ)設計指南之架構篇


  目前業界流行的分布式消息隊列系統(或者可以叫做消息中間件)種類繁多,比如,基於Erlang的RabbitMQ、基於Java的ActiveMQ/Apache Kafka、基於C/C++的ZeroMQ等等,都能進行大批量的消息路由轉發。它們的共同特點是,都有一個消息中轉路由節點,按照消息隊列里面的專業術語,這個角色應該是broker。整個消息系統通過這個broker節點,進行從消息生產者Producer到消費者Consumer的消息路由。當然了,生產者和消費者可以是多對多的關系。消息路由的時候,可以根據關鍵字(專業的術語叫topic),進行關鍵字精確匹配、模糊匹配、廣播方式的消息路由。

  簡單來說,一個極簡的分布式消息隊列系統主要的構成模塊有:

  Broker:簡單來說就是消息隊列服務器實體。

  Producer:消息的生產者,主要用來發送消息給消費者。

  Consumer:消息的消費者,主要用來接收生產者的消息。

  Routing Key:路由關鍵字(Topic),主要用來控制生產者和消費者之間的發送與接收消息的對應關系。

  Channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

  到此為止,我們明白了一個分布式消息隊列系統的主要構成模塊,現在本人就通過Netty,這個優秀的Java NIO網絡通訊框架,構建一個支持上述應用場景的分布式消息隊列系統,本人把其命名為AvatarMQ。后續我會基於這個開源項目,連載出基於Netty構建分布式消息隊列系統系列相關的文章,闡明主要的設計思路、組織結構、模塊划分依據、類圖結構等等。為了說明方便,后續本文中,如果沒有特殊說明,有涉及基於Netty構建的分布式消息隊列系統,就是指代AvatarMQ。由於整個開源項目涉及的代碼量比較多,所以希望大家在本人編寫系列博客文章的基礎上,耐心地理解、分析其中的代碼模塊,相信一定不會讓您失望!

  AvatarMQ基於Netty,所以首先,你要能清楚的理解Netty是什么?它能做什么?有興趣的朋友可以關注一下Netty項目的官網(http://netty.io/),上面有很詳細的入門文章介紹。雖然都是英文的,但是這些一手的資料更具權威性,值得花時間深入研究探索,畢竟現在流行的雲計算、大數據領域成功的開源項目比如Hadoop、Storm等等,網絡通信層這塊全部依賴Netty,可見Netty的功能強大。

  基於Netty可以開發定制高性能、高可靠性的Java企業級服務端應用,而本文是我,在繼利用Netty構建高性能RPC服務器系列文章之后,又一個基於Netty開發的分布式消息隊列系統(AvatarMQ)。此外AvatarMQ還大量使用了Java多線程的相關類庫。所以希望在此之前,大家能回憶復習一下,這樣理解起來會更加得心應手、事半功倍。

  AvatarMQ是基於Netty構建的分布式消息隊列系統,支持多個生產者和多個消費者之間的消息路由、傳遞。主要特性如下:

  • AvatarMQ基於Java語言進行編寫,網絡通訊依賴Netty。
  • 生產者和消費者的關系可以是一對多、多對一、多對多的關系。
  • 若干個消費者可以組成消費者集群,生產者可以向這個消費者集群投遞消息。
  • 消費者集群對於有共同關注點的消費者支持消息的負載均衡策略。
  • 支持動態新增、刪除生產者、消費者。
  • 目前僅僅支持關鍵字的精確匹配路由,后續會逐漸完善。
  • 消息隊列服務器Broker基於Netty的主從事件線程池模型開發設計。
  • 網絡消息序列化采用Kryo進行消息的網絡序列化傳輸。
  • Broker的消息派發、負載均衡、應答處理(ACK)基於異步多線程模型進行開發設計。
  • Broker消息的投遞,目前支持嚴格的消息順序。其中Broker還支持消息的緩沖派發,即Broker會緩存一定數量的消息之后,再批量分配給對此消息感興趣的消費者。

  AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ

  整個開源項目依賴的jar包請參考:https://github.com/tang-jie/AvatarMQ/blob/master/nbproject/project.properties

  另外,值得注意的是:

  AvatarMQ使用的Netty是基於4.0版本(下載地址:http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2)。

  消息序列化使用的Kryo是基於kryo-3.0.3版本(下載地址:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3)。

  請大家自行去官網下載使用。

  現在,現在言歸正傳,我們先來看下整合AvatarMQ項目的軟件架構圖:

  

  從上述圖例中,我們可以很清楚的看到:生產者和消費者之間是通過Broker進行消息的路由和轉發,同時Broker還負責應答生產者和接收消費者的處理應答。

  在了解了,整個AvatarMQ的組織架構之后,我們再來實際運行一下AvatarMQ!

  首先,先啟動一下Broker服務器(對應代碼:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/spring/AvatarMQServerStartup.java

  如果一切正常,終端控制台會打印如下輸出:

  

  接着,我們就來實際驗證一下AvatarMQ的消息推送功能。

  1、生產者發送1條消息給關注這條消息的消費者。我們先啟動消費者,再啟動生產者。

  其中消費者1的測試代碼(AvatarMQConsumer1.java)如下所示:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;
import com.newlandframework.avatarmq.consumer.ProducerMessageHook;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;

/**
 * @filename:AvatarMQConsumer1.java
 * @description:AvatarMQConsumer1功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer1 {

    private static ProducerMessageHook hook = new ProducerMessageHook() {
        public ConsumerAckMessage hookMessage(Message message) {
            System.out.printf("AvatarMQConsumer1 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody()));
            ConsumerAckMessage result = new ConsumerAckMessage();
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-1", hook);
        consumer.init();
        consumer.setClusterId("AvatarMQCluster");
        consumer.receiveMode();
        consumer.start();
    }
}

  生產者1的測試代碼(AvatarMQProducer1.java)如下所示,其含義是發送1條消息,給關注“AvatarMQ-Topic-1”主題的消費者:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.producer.AvatarMQProducer;
import org.apache.commons.lang3.StringUtils;

/**
 * @filename:AvatarMQProducer1.java
 * @description:AvatarMQProducer1功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer1 {

    public static void main(String[] args) throws InterruptedException {
        AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-1");
        producer.setClusterId("AvatarMQCluster");
        producer.init();
        producer.start();

        System.out.println(StringUtils.center("AvatarMQProducer1 消息發送開始", 50, "*"));

        for (int i = 0; i < 1; i++) {
            Message message = new Message();
            String str = "Hello AvatarMQ From Producer1[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.delivery(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("AvatarMQProducer1 發送消息編號:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("AvatarMQProducer1 消息發送完畢", 50, "*"));
    }
}

  首先我們先來啟動消費者,如果一切正常,控制台輸出結果為:

  這個時候我們再運行生產者,發送一條消息給消費者。啟動生產者之后,控制台輸出結果如下:

  那現在,我們切回去看下消費者是否收到生產者的消息了呢?

  非常正確,我們的消費者果然收到了生產者發送過來的消息。

 

  2、生產者發送1條消息給不關注這條消息的消費者。

  首先說明的是,代碼樣例還是基於上述的AvatarMQConsumer1.java、AvatarMQProducer1.java。只不過這次是生產者發送的主題改成:“AvatarMQ-Topic-Test”,消費者關注的主題改成“AvatarMQ-Topic-1”。然后依次啟動消費者、生產者。下面是實際的運行情況:

  生產者成功發送消息:

  那按照要求,消費者應該無法收到生產者的這條消息,實際情況是不是這樣呢?事實勝於雄辯,看如下截圖所示:

  消費者依然處理啟動監聽狀態,說明完全符合我們的預期。

 

  3、生產者發送N條消息(這里是發送100條消息)給一個消費者集群(有2個消費者組成,並且這2個消費者關注的消息主題topic是相同的)。

  我們先啟動2個消費者,再啟動生產者。消費者代碼參考:

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;
import com.newlandframework.avatarmq.consumer.ProducerMessageHook;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;

/**
 * @filename:AvatarMQConsumer2.java
 * @description:AvatarMQConsumer2功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer2 {

    private static ProducerMessageHook hook = new ProducerMessageHook() {
        public ConsumerAckMessage hookMessage(Message message) {
            System.out.printf("AvatarMQConsumer2 收到消息編號:%s,消息內容:%s\n", message.getMsgId(), new String(message.getBody()));
            ConsumerAckMessage result = new ConsumerAckMessage();
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-2", hook);
        consumer.init();
        consumer.setClusterId("AvatarMQCluster2");
        consumer.receiveMode();
        consumer.start();
    }
}

  生產者代碼參考(目的是發送100條消息)給消費者集群。

package com.newlandframework.avatarmq.test;

import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.producer.AvatarMQProducer;
import org.apache.commons.lang3.StringUtils;

/**
 * @filename:AvatarMQProducer2.java
 * @description:AvatarMQProducer2功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer2 {

    public static void main(String[] args) throws InterruptedException {
        AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-2");
        producer.setClusterId("AvatarMQCluster2");
        producer.init();
        producer.start();

        System.out.println(StringUtils.center("AvatarMQProducer2 消息發送開始", 50, "*"));

        for (int i = 0; i < 100; i++) {
            Message message = new Message();
            String str = "Hello AvatarMQ From Producer2[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.delivery(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("AvatarMQProducer2 發送消息編號:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("AvatarMQProducer2 消息發送完畢", 50, "*"));
    }
}

  我們依次啟動消費者AvatarMQConsumer2兩次,這個時候終端控制台依次輸出:

  這個時候我們再啟動生產者,運行截圖如下:

  說明生產者發送了100條消息出去,看下我們消費者1接收的情況:

  繼續看下我們的消費者2,消息接收的情況,截圖如下:

  最終統計一下,消費者1,接收的消息編號都是奇數,一共50個。消費者2,接收到的消息編號都是偶數,一共50個。兩個消費者接收的消息總數加起來,剛好等於生產者發送的消息總數100個,完全符合我們的預期!另外消費者1、消費者2都收到了來自生產者的消息,說明Broker進行了消息的路由傳遞。

  4、多個生產者和多個消費者的消息傳遞,以及動態新增、刪除生產者、消費者。

  這個就交給大家自行測試了,由於篇幅有限,在此本人就不一一闡述。

 

  到目前為止,相信大家對於AvatarMQ所具備的基本功能,有了一個大致的印象。當然,AvatarMQ還有一些美中不足,比如:

  • 不支持消息的刷盤存儲,可能由於系統Crash,造成消息的丟失。后續需要接入一個存儲系統(基於Java NIO),保證消息的持久序列化。
  • AvatarMQ的生產者、消費者模塊,要進一步支持,斷網重連Broker的功能,確保在Broker重啟的情況下,把在途的消息繼續發送、接收完畢。
  • Broker單點的問題,根據高可用性集群HA(High Available)的標准,Broker也要有主節點和從節點機制。在主節點宕機的情況,從節點要能灰度過渡,不至於Broker主節點宕機,整個AvatarMQ消息系統陷入癱瘓狀態。
  • 消息應答失敗,還未支持重試功能。
  • 當然還有一些未知的bug,有待發現和修復。
  • AvatarMQ的處理性能,未經歷過生產系統實際檢驗,暫時無法保證其安全和可靠性。

  由於代碼編寫、測試等等工作,都是本人利用工作之余的時間完成,時間點上比較倉促。加上本人的技術水平有限,難免有說的不對及寫得不好的地方,或者其中應該有更好的解決方案。歡迎廣大同行、愛好者在線下進行學習交流,有什么寶貴的建議和觀點,懇請批評指正,不吝賜教。雖然AvatarMQ和業界主流、久經考驗的消息隊列系統,在處理性能、可靠性上,肯定還有不小的差距。但是可以基於此,加深對分布式消息隊列的理解,做到知其然知其所以然,何樂而不為?

  最后,本人后續會逐漸推出“基於Netty構建的分布式消息隊列系統(AvatarMQ)”,架構設計、原理分析的詳解連載文章,敬請期待!

  PS:目前AvatarMQ已經開源,整個項目托管到github,對應的網址為:https://github.com/tang-jie/AvatarMQ,歡迎有興趣的同行朋友、愛好者關注支持。如果覺得還不錯,可以點擊Star收藏、關注。當然,你還可以點擊推薦本文,也算是對我辛苦付出的一點支持和回報,謝謝大家!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM