Docker部署RocketMQ(JDK11)


 

說起微服務,不談容器,不談雲,那還談個啥?容器中又以Docker最為流行,那么我們今天就來實踐下容器化微服務,然后順帶解決下各種疑難雜症。

環境: Idea2019.03/Gradle6.0.1/JDK11.0.4/RocketMQ4.6.0/Linux8.0/Docker19.03.5

難度: 新手--戰士--老兵--大師

目標:

  1. 理解RocketMQ的原理
  2. Linux8.0+JDK11+RocketMQ部署
  3. Docker部署RocketMQ集群
  4. MQ-Api應用測試

說明:

前面有涉及RocketMQ,本篇作為補充。為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼中大量使用注釋,理解更輕松。代碼地址:其中的day26,https://github.com/xiexiaobiao/dubbo-project.git

 

原創文章,謝絕一切形式轉載,否則追究法律責任。

 本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。 

 

正文:

01-RocketMQ原理

生產使用部署圖:

 

  • Producer/Consumer可多個成組,一般一個生產者組/消費者組對應一個微服務
  • Broker是RocketMQ的核心模塊,負責接收並存儲消息,同時提供Push/Pull接口來將消息發送給Consumer,主從間進行Async/Sync同步,Master節點之間不做數據交互,Master宕機,則該組只讀不可寫,slave不會自動轉為master
  • Name Server是一個幾乎無狀態節點,節點之間無任何信息同步
  • 每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server
  • Producer/Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息

存儲模式:

 

每個Topic在Broker上會划分成幾個邏輯隊列,每個邏輯隊列保存一部分消息數據,但是保存的消息數據實際是指向commitLog的消息索引,消息實際保存在commitLog上。這也是Rocket支持消息重發、回溯的關鍵。

 

02-Linux下單機安裝

Linux和JDK11,略!我使用rocketmq-all-4.6.0-bin-release.zip,解壓:

unzip rocketmq-all-4.6.0-source-release.zip

解壓后的目錄為:/usr/rocketmq/rocketmq-all-4.6.0-bin-release

2.1-啟動nameServer: sh bin/mqnamesrv

通過查看其shell腳本可以發現,實質是去調用runserver.sh,因rocketmq4.6啟動腳本和JDK11不兼容,典型報錯如下:

因此去修改runserver.sh

[root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/runserver.sh

GC的機制,JDK11使用G1不使用CMS,刪除JVM參數:

-XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection  -XX:-UseParNewGC -XX:+PrintGCDetails

刪除GCLog相關整行:

JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"

JDK11不再獨立分jre,刪除整行:

JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"

修改:`-Xloggc` 為 `-Xlog:gc`

修改類路徑,否則提示找不到mainClass,增加一行:

export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

刪除整行:
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

 

再啟動NameServer:

2.2-啟動broker實例,並用-n指定namesrv:

sh bin/mqbroker -n localhost:9876

通過查看其shell腳本可以發現實質是去調用runbroker.sh,有JDK11兼容問題,因此去修改runbroker.sh

[root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/runbroker.sh

刪除:#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
增加:export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

刪除:JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"

刪除:-XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+PrintGCDetails

刪除:JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"

 

再啟動Broker:

2.3-還要修改tools.sh

[root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/tools.sh

刪除:JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
刪除:#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
增加:export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

 

2.4-內存要足夠!如遇報錯信息:

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000700000000, 4294967296, 0) failed; error='Not enough space' (errno=12)

下圖顯示mmap默認需要4G內存:

加大內存!或者通過上面的兩個sh文件中參數進行修改: JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" 改為: -Xms2g -Xmx2g -Xmn1g

2.5-安裝RocketMq客戶端 https://github.com/apache/rocketmq-externals,其中的子項目:rocketmq-console,注意修改下pom依賴和properties文件,再打包成jar運行即可(具體見我往期文章)。 這個console比較簡陋,湊合下:

 

03-Docker部署RocketMQ

因Docker容器技術還是必須要掌握的,我也拿來用一下,先使用三個container來分別提供nameSrv,broker和console服務,部署圖如下:

安裝Docker到Linux,略!Docker安裝rocketMQ:

[root@server224 docker]# docker search rocketmq

這里選擇foxiswho/rocketmq,因為rocketmqinc/rocketmq里面沒有nameServer。 查看鏡像詳細信息命令:

[root@server224 docker]# docker inspect foxiswho/rocketmq

查看當前鏡像所有的版本shell命令:

curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags | tr -d '[\[\]" ]' | tr '}' '\n' | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s\n",image,$3)}}'

 

下載鏡像:

[root@server224 docker]# docker pull foxiswho/rocketmq:server-4.5.2

[root@server224 docker]# docker pull foxiswho/rocketmq:broker-4.5.2

先啟動nameserver容器:

[root@server224 docker]# docker run -d -p 9876:9876 --name rmqnamesrv foxiswho/rocketmq:server-4.5.2

然后啟動broker容器:

[root@server224 docker]#docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" foxiswho/rocketmq:broker-4.5.2

再啟動console容器:

[root@server224 docker]# docker pull styletang/rocketmq-console-ng:latest

[root@server224 docker]# docker run -d --name rmqconsole -p 8180:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng

進入某個容器示例:

[root@server224 docker]# docker exec -it rmqnamesrv /bin/bash

[rocketmq@4c2ae3332c7c bin]$ ls

可見該image下版本為openJDK1.8!

代碼測試:

我這里只是用了一個簡單的Producer/Consumer,之前的文章中已有使用RocketMq實現分布式事務的項目,這里就不重復了。

寫個生產者,com.biao.rmq.Producer

public class Producer {
    public static void main(String[] args) throws Exception{
        // RPC hook to execute per each remoting command execution.
        // 添加鈎子函數
        RPCHook rpcHook = new RPCHook() {
            @Override
            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                System.out.println("Producer RPC hook doBeforeRequest");
            }

            @Override
            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                System.out.println("Producer RPC hook doAfterResponse");
            }
        };
        //DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        //        boolean enableMsgTrace, final String customizedTraceTopic)
        // Namespace for this MQ Producer instance.
        // DefaultMQProducer mqProducer = new DefaultMQProducer("namespace01","producerGroup01",
        //        rpcHook,true,"customizedTraceTopic01");

        // 以下為簡單類型MQProducer
        DefaultMQProducer mqProducer = new DefaultMQProducer("producerGroup01");
        // producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
        mqProducer.setNamesrvAddr("192.168.1.224:9876");
        mqProducer.setInstanceName("rocket-Producer-instance01");
        mqProducer.start();

        for (int i = 0; i < 3; i++) {
            LocalDateTime localDateTime = LocalDateTime.now();
            Message msg = new Message("topic01","tag01",
                    ("message test. time:"+localDateTime).getBytes(StandardCharsets.UTF_8));
            // 獲取消息發送的狀態信息,包含了sendStatus/msgId/offsetMsgId/messageQueue/queueOffset,
            // 從而可以進一步根據SendResult內容做業務處理
            SendResult sendResult = mqProducer.send(msg,500L);
            System.out.println(sendResult);
        }

        mqProducer.shutdown();
    }
}

 

運行結果:

 

 

寫個消費者,com.biao.rmq.Consumer

public class Consumer {
    public static void main(String[] args) throws Exception{

/*        MQAdmin mqAdmin = new ;
        MQAdminImpl mqAdmin1 = new MQAdminImpl();*/

        // RPC hook to execute per each remoting command execution.
        // 添加鈎子函數

        RPCHook rpcHook = new RPCHook() {
            @Override
            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                System.out.println("Consumer RPC hook doBeforeRequest");
            }

            @Override
            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                System.out.println("Consumer RPC hook doAfterResponse");
            }
        };

        // Strategy Algorithm for message allocating between consumers
        AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueStrategy() {
            @Override
            public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                return null;
            }

            @Override
            public String getName() {
                return null;
            }
        };

        // 2022年將移除DefaultMQPullConsumer,已@Deprecated,推薦未來使用 DefaultLitePullConsumer,但還沒實現,這是耍人??
        /*DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("namespace02",
                "consumerGroup02",rpcHook,allocateMessageQueueStrategy,
                true,"customizedTraceTopic01");*/
        // simple constructor
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup02");
        consumer.setNamesrvAddr("192.168.1.224:9876");
        // Specify where to start in case the specified consumer group is a brand new one.
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // tag 的匹配模式 only support or operation such as "tag1 || tag2 || tag3"
        consumer.subscribe("topic01", "*");
        // A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 對消息的處理業務
                System.out.printf("%s Receive New Messages:%n", Thread.currentThread().getName());
                for (MessageExt msg : msgs
                     ) {
                    System.out.println("host >> " + msg.getBornHost());
                    System.out.println("MsgId >> " + msg.getMsgId());
                    System.out.println("body >> "  + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.setInstanceName("rocket-Consumer-instance01");
        // Launch the consumer instance.
        consumer.start();
        System.out.printf("Consumer Started.%n");
        //defaultMQPushConsumer.shutdown();
    }
}

 

運行結果:

 

 

04-Docker部署HA集群

HA集群部署圖,全容器實現:

 

但這里有個比較麻煩的地方,NameSrv要指定不同端口需要 –c 參數,但此image里並沒有使用此參數,需重建image,這就扯的有點遠了,故我將一個NameSrv裝linux上,一個NameSrv裝container上,權當替代方案了,但不影響效果。 linux上啟動的NameSrv,可修改端口為9877,先建一個文件,只修改端口號: /usr/docker/docker/namesrv.properties

啟動時:

[root@server224 rocketmq-all-4.6.0-bin-release]# ./bin/mqnamesrv -c /usr/docker/docker/namesrv.properties

各容器啟動配置腳本,rocket提供了模板文件:

修改下啟動配置文件,即clusterName,IP和port,這里以asyn中的broker-a.properties/broker-a-s.properties為例,其他不需要改:

啟動容器命令示例,其他類似:

 

[root@server224 docker]#docker run -d -p 11911:11911 -p 11909:11909 --name rmqbroker3 --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -v /usr/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/a-s.properties:/etc/rocketmq/broker.conf foxiswho/rocketmq:broker-4.5.2

 

window上訪問地址:http://192.168.1.224:8180/ 

 

問題:

1.常規部署時無法第二次啟動broker,查看broker.log也無異常,這個問題,花了好長時間找答案!終於在store.log中發現有異常信息:

Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make public void jdk.internal.ref.Cleaner.clean() accessible: module java.base does not "exports jdk.internal.ref" to unnamed module @3590fc5b

然后查閱得知是JDK11模塊化引起的問題,需修改jvm啟動參數:

[root@server224 rocketmq-all-4.6.0-bin-release]# vim bin/runbroker.sh

增加: JAVA_OPT="${JAVA_OPT} --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED"

然后啟動broker,卻又發現broker使用了172.17.0.1這個IP地址,而nameSrv使用的是虛機另一ip 192.168.1.224,導致應用無法獲取topic,即broker與nameSrv通訊失敗,原因是安裝docker后,docker自動安裝了一個虛擬網卡,用於管理所有容器的:

修改broker配置,指定broker的ip:

[root@server224 rocketmq-all-4.6.0-bin-release]# vim conf/broker.conf

並在啟動時使用配置文件:

[root@server224 rocketmq-all-4.6.0-bin-release]# sh ./bin/mqbroker -n localhost:9876 -c ./conf/broker.conf

再次啟動nameSrv --> broker --> app終於成功!

 

2.Docker中broker無法啟動,狀態一直是Exited

[root@server224 docker]# docker logs rmqbroker

虛機本地建立broker.conf文件,

[root@server224 rocketmq-all-4.6.0-bin-release]# vim /usr/docker/docker/broker.conf

再啟動broker,命令中加入 -v 參數,做一個本地文件到容器的映射:

[root@server224 docker]#docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -v /usr/docker/docker/broker.conf:/etc/rocketmq/broker.conf foxiswho/rocketmq:broker-4.5.2

終於正常:

 

3.容器模式下應用發送消息報錯:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

原因是上面的映射文件broker.conf中brokerIP地址未設置,會導致brokerIP使用container的ip如172.17.0.3。

此篇完!

 

原創文章,謝絕任何形式轉載,否則追究法律責任。

我的微信公眾號,只發原創文章。

往期文章:

  1. 流式計算(五)-Flink 計算模型
  2. 流式計算(四)-Flink Stream API 篇二
  3. 流式計算(三)-Flink Stream 篇一
  4. 流式計算(一)-Java8Stream


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM