說起微服務,不談容器,不談雲,那還談個啥?容器中又以Docker最為流行,那么我們今天就來實踐下容器化微服務,然后順帶解決下各種疑難雜症。
環境: Idea2019.03/Gradle6.0.1/JDK11.0.4/RocketMQ4.6.0/Linux8.0/Docker19.03.5
難度: 新手--戰士--老兵--大師
目標:
- 理解RocketMQ的原理
- Linux8.0+JDK11+RocketMQ部署
- Docker部署RocketMQ集群
- MQ-Api應用測試
說明:
前面有涉及RocketMQ,本篇作為補充。為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼中大量使用注釋,理解更輕松。代碼地址:其中的day26,https://github.com/xiexiaobiao/dubbo-project.git
原創文章,謝絕一切形式轉載,否則追究法律責任。
本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。
正文:
01-RocketMQ原理
生產使用部署圖:
- Producer/Consumer可多個成組,一般一個生產者組/消費者組對應一個微服務
- Broker是RocketMQ的核心模塊,負責接收並存儲消息,同時提供Push/Pull接口來將消息發送給Consumer,主從間進行Async/Sync同步,Master節點之間不做數據交互,Master宕機,則該組只讀不可寫,slave不會自動轉為master
- Name Server是一個幾乎無狀態節點,節點之間無任何信息同步
- 每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server
- Producer/Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息
存儲模式:
每個Topic在Broker上會划分成幾個邏輯隊列,每個邏輯隊列保存一部分消息數據,但是保存的消息數據實際是指向commitLog的消息索引,消息實際保存在commitLog上。這也是Rocket支持消息重發、回溯的關鍵。
02-Linux下單機安裝
Linux和JDK11,略!我使用rocketmq-all-4.6.0-bin-release.zip,解壓:
unzip rocketmq-all-4.6.0-source-release.zip
解壓后的目錄為:/usr/rocketmq/rocketmq-all-4.6.0-bin-release
2.1-啟動nameServer: sh bin/mqnamesrv
通過查看其shell腳本可以發現,實質是去調用runserver.sh,因rocketmq4.6啟動腳本和JDK11不兼容,典型報錯如下:
因此去修改runserver.sh
[root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/runserver.sh
GC的機制,JDK11使用G1不使用CMS,刪除JVM參數: -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:-UseParNewGC -XX:+PrintGCDetails 刪除GCLog相關整行: JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" JDK11不再獨立分jre,刪除整行: JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib" 修改:`-Xloggc` 為 `-Xlog:gc` 修改類路徑,否則提示找不到mainClass,增加一行: export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH} 刪除整行: export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
再啟動NameServer:
2.2-啟動broker實例,並用-n
指定namesrv:
sh bin/mqbroker -n localhost:9876
通過查看其shell腳本可以發現實質是去調用runbroker.sh,有JDK11兼容問題,因此去修改runbroker.sh
[root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/runbroker.sh
刪除:#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} 增加:export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH} 刪除:JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" 刪除:-XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+PrintGCDetails 刪除:JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
再啟動Broker:
2.3-還要修改tools.sh
[root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/tools.sh
刪除:JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext" 刪除:#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} 增加:export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
2.4-內存要足夠!如遇報錯信息:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000700000000, 4294967296, 0) failed; error='Not enough space' (errno=12)
下圖顯示mmap默認需要4G內存:
加大內存!或者通過上面的兩個sh文件中參數進行修改: JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
改為: -Xms2g -Xmx2g -Xmn1g
2.5-安裝RocketMq客戶端 https://github.com/apache/rocketmq-externals
,其中的子項目:rocketmq-console,注意修改下pom依賴和properties文件,再打包成jar運行即可(具體見我往期文章)。 這個console比較簡陋,湊合下:
03-Docker部署RocketMQ
因Docker容器技術還是必須要掌握的,我也拿來用一下,先使用三個container來分別提供nameSrv,broker和console服務,部署圖如下:
安裝Docker到Linux,略!Docker安裝rocketMQ:
[root@server224 docker]# docker search rocketmq
這里選擇foxiswho/rocketmq,因為rocketmqinc/rocketmq里面沒有nameServer。 查看鏡像詳細信息命令:
[root@server224 docker]# docker inspect foxiswho/rocketmq
查看當前鏡像所有的版本shell命令:
curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags | tr -d '[\[\]" ]' | tr '}' '\n' | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s\n",image,$3)}}'
下載鏡像:
[root@server224 docker]# docker pull foxiswho/rocketmq:server-4.5.2
[root@server224 docker]# docker pull foxiswho/rocketmq:broker-4.5.2
先啟動nameserver容器:
[root@server224 docker]# docker run -d -p 9876:9876 --name rmqnamesrv foxiswho/rocketmq:server-4.5.2
然后啟動broker容器:
[root@server224 docker]#docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" foxiswho/rocketmq:broker-4.5.2
再啟動console容器:
[root@server224 docker]# docker pull styletang/rocketmq-console-ng:latest
[root@server224 docker]# docker run -d --name rmqconsole -p 8180:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng
進入某個容器示例:
[root@server224 docker]# docker exec -it rmqnamesrv /bin/bash
[rocketmq@4c2ae3332c7c bin]$ ls
可見該image下版本為openJDK1.8!
代碼測試:
我這里只是用了一個簡單的Producer/Consumer,之前的文章中已有使用RocketMq實現分布式事務的項目,這里就不重復了。
寫個生產者,com.biao.rmq.Producer:
public class Producer { public static void main(String[] args) throws Exception{ // RPC hook to execute per each remoting command execution. // 添加鈎子函數 RPCHook rpcHook = new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { System.out.println("Producer RPC hook doBeforeRequest"); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { System.out.println("Producer RPC hook doAfterResponse"); } }; //DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, // boolean enableMsgTrace, final String customizedTraceTopic) // Namespace for this MQ Producer instance. // DefaultMQProducer mqProducer = new DefaultMQProducer("namespace01","producerGroup01", // rpcHook,true,"customizedTraceTopic01"); // 以下為簡單類型MQProducer DefaultMQProducer mqProducer = new DefaultMQProducer("producerGroup01"); // producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); mqProducer.setNamesrvAddr("192.168.1.224:9876"); mqProducer.setInstanceName("rocket-Producer-instance01"); mqProducer.start(); for (int i = 0; i < 3; i++) { LocalDateTime localDateTime = LocalDateTime.now(); Message msg = new Message("topic01","tag01", ("message test. time:"+localDateTime).getBytes(StandardCharsets.UTF_8)); // 獲取消息發送的狀態信息,包含了sendStatus/msgId/offsetMsgId/messageQueue/queueOffset, // 從而可以進一步根據SendResult內容做業務處理 SendResult sendResult = mqProducer.send(msg,500L); System.out.println(sendResult); } mqProducer.shutdown(); } }
運行結果:
寫個消費者,com.biao.rmq.Consumer:
public class Consumer { public static void main(String[] args) throws Exception{ /* MQAdmin mqAdmin = new ; MQAdminImpl mqAdmin1 = new MQAdminImpl();*/ // RPC hook to execute per each remoting command execution. // 添加鈎子函數 RPCHook rpcHook = new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { System.out.println("Consumer RPC hook doBeforeRequest"); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { System.out.println("Consumer RPC hook doAfterResponse"); } }; // Strategy Algorithm for message allocating between consumers AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueStrategy() { @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { return null; } @Override public String getName() { return null; } }; // 2022年將移除DefaultMQPullConsumer,已@Deprecated,推薦未來使用 DefaultLitePullConsumer,但還沒實現,這是耍人?? /*DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("namespace02", "consumerGroup02",rpcHook,allocateMessageQueueStrategy, true,"customizedTraceTopic01");*/ // simple constructor DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup02"); consumer.setNamesrvAddr("192.168.1.224:9876"); // Specify where to start in case the specified consumer group is a brand new one. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // tag 的匹配模式 only support or operation such as "tag1 || tag2 || tag3" consumer.subscribe("topic01", "*"); // A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 對消息的處理業務 System.out.printf("%s Receive New Messages:%n", Thread.currentThread().getName()); for (MessageExt msg : msgs ) { System.out.println("host >> " + msg.getBornHost()); System.out.println("MsgId >> " + msg.getMsgId()); System.out.println("body >> " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.setInstanceName("rocket-Consumer-instance01"); // Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); //defaultMQPushConsumer.shutdown(); } }
運行結果:
04-Docker部署HA集群
HA集群部署圖,全容器實現:
但這里有個比較麻煩的地方,NameSrv要指定不同端口需要 –c 參數,但此image里並沒有使用此參數,需重建image,這就扯的有點遠了,故我將一個NameSrv裝linux上,一個NameSrv裝container上,權當替代方案了,但不影響效果。 linux上啟動的NameSrv,可修改端口為9877
,先建一個文件,只修改端口號: /usr/docker/docker/namesrv.properties
啟動時:
[root@server224 rocketmq-all-4.6.0-bin-release]# ./bin/mqnamesrv -c /usr/docker/docker/namesrv.properties
各容器啟動配置腳本,rocket提供了模板文件:
修改下啟動配置文件,即clusterName,IP和port,這里以asyn中的broker-a.properties/broker-a-s.properties為例,其他不需要改:
啟動容器命令示例,其他類似:
[root@server224 docker]#docker run -d -p 11911:11911 -p 11909:11909 --name rmqbroker3 --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -v /usr/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/a-s.properties:/etc/rocketmq/broker.conf foxiswho/rocketmq:broker-4.5.2
window上訪問地址:http://192.168.1.224:8180/
問題:
1.常規部署時無法第二次啟動broker,查看broker.log也無異常,這個問題,花了好長時間找答案!終於在store.log中發現有異常信息:
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make public void jdk.internal.ref.Cleaner.clean() accessible: module java.base does not "exports jdk.internal.ref" to unnamed module @3590fc5b
然后查閱得知是JDK11模塊化引起的問題,需修改jvm啟動參數:
[root@server224 rocketmq-all-4.6.0-bin-release]# vim bin/runbroker.sh
增加: JAVA_OPT="${JAVA_OPT} --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED"
然后啟動broker,卻又發現broker使用了172.17.0.1這個IP地址,而nameSrv使用的是虛機另一ip 192.168.1.224,導致應用無法獲取topic,即broker與nameSrv通訊失敗,原因是安裝docker后,docker自動安裝了一個虛擬網卡,用於管理所有容器的:
修改broker配置,指定broker的ip:
[root@server224 rocketmq-all-4.6.0-bin-release]# vim conf/broker.conf
並在啟動時使用配置文件:
[root@server224 rocketmq-all-4.6.0-bin-release]# sh ./bin/mqbroker -n localhost:9876 -c ./conf/broker.conf
再次啟動nameSrv --> broker --> app終於成功!
2.Docker中broker無法啟動,狀態一直是Exited
:
[root@server224 docker]# docker logs rmqbroker
虛機本地建立broker.conf文件,
[root@server224 rocketmq-all-4.6.0-bin-release]# vim /usr/docker/docker/broker.conf
再啟動broker,命令中加入 -v 參數,做一個本地文件到容器的映射:
[root@server224 docker]#docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -v /usr/docker/docker/broker.conf:/etc/rocketmq/broker.conf foxiswho/rocketmq:broker-4.5.2
終於正常:
3.容器模式下應用發送消息報錯:
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
原因是上面的映射文件broker.conf中brokerIP地址未設置,會導致brokerIP使用container的ip如172.17.0.3。
此篇完!
原創文章,謝絕任何形式轉載,否則追究法律責任。
我的微信公眾號,只發原創文章。
往期文章: