Rocketmq選型
Rocket是一個專業的隊列服務,性能優於Rabbitmq,優勢是性能和並發,源於Kafka的擴展版,增強了數據的可靠性。
Rocketmq的隊列類型
普通隊列,廣播隊列、順序隊列,分區順序
2、同步機制
Rocketmq使用主從同步模式,同步分為同步和異步模式,這和mysql類似。
3、Rocketmq管理命令
rocketmq也可以通過web管理,坑中有說
創建topic
bin/mqadmin updateTopic -n '192.168.1.64:9876;192.168.1.107:9876' -c DefaultCluster -t tt -w 5 -r 5 -o=true #-o=true是有序隊列
Rocketmq坑
1、rocketmq 在本地搭建環境啟動時,內存溢出報了內存錯誤:
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
解決方法:修改bin/runbroker.sh,bin/runserver.sh
-server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
內存大的虛擬機沒問題,像我這渣渣筆記本,得節省點啊!
2、Rocketmq管理web
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
下載安裝Console項目
Rocketmq的普通隊列、順序隊列、分區順序隊列、廣播消息實現、事務隊列
https://www.jianshu.com/p/3afd610a8f7d
https://www.jianshu.com/p/790d6bc4a1c1
https://www.jianshu.com/p/53324ea2df92
我遇到的坑
阿里雲上面買的RocketMq和開源版本的Rocketmq的sdk不同,有着兼容問題。我們有一個自己機房和阿里的共用運行的服務,這時候發現只能在阿里機房上布,而在本地機房看來只能再寫一套抽象層了,這非常不友好。使用了阿里的 Rocketmq也就意味着你的程序移植不到其它雲或服務器上了。
slave 修改:
1、內存優化
bin/runbroker.sh MaxDirectMemorySize =15g 改成1g
2、host解析失敗
在rocketmq的主從是通過獲取hostname然后解析host到ip上進行互相訪問的,所以需要將各機器的hosts增加解析
192.168.1.102 localhost.ceontsPHP
192.168.1.101 localhost.bobby
hostname可以通過各服務器的/etc/hostsname查看
rocket會報錯:rocketmq Failed to obtain the host name
3、鎖文件沖突原因
因為rocketmq設計一個用戶運行一個示例,一個用戶鎖定了一個用戶store存儲目錄,所以在一起運行會沖突,提示:Lock failed,MQ already startedz
解決方法是每個示例使用一個用戶
或
broker.conf 指定 storePathRootDir目錄
4、端口會沖突
不同broker需使用不同端口,如果不是root用戶,端口號應該是1024+
一個broker不僅僅使用你指定的listenPort,上下最好空幾個,比如指定1081端口,他還占用了1079 1081 1082
5、配置文件坑
我直接拿官方配置文件2m-2s-async下的配置,不行,沒有提示,就是執行clusterList時,沒有集群,這很坑,然后我自己建配置文件成功
bin/mqbroker -c /root/rocketmq/release/conf/2m-2s-async/broker-a-s.properties > a.log 2>&1
bin/mqadmin clusterList -n centos:9876
bin/mqadmin updateTopic -n 'centos:9876' -c mycluster -t tt -w 5 -r 5 -o=true
# broker a
namesrvAddr=centos:9876
brokerIP1=192.168.56.101
brokerName=broker-a
brokerClusterName=mycluster
brokerId=0
listenPort=1081
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/root/storeA
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=500
commitIntervalCommitLog=200
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=48
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
transientStorePoolEnable=false
#broker a-s
namesrvAddr=centos:9876
brokerIP1=192.168.56.102
brokerName=broker-a
brokerClusterName=mycluster
brokerId=1
listenPort=1091
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/root/storeAs
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=500
commitIntervalCommitLog=200
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=48
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
transientStorePoolEnable=false
#broker b
namesrvAddr=centos:9876
brokerIP1=192.168.56.101
brokerName=broker-b
brokerClusterName=mycluster
brokerId=0
listenPort=1085
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/root/storeB
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=500
commitIntervalCommitLog=200
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=48
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
transientStorePoolEnable=false
#broker b-s
namesrvAddr=centos:9876
brokerIP1=192.168.56.102
brokerName=broker-b
brokerClusterName=mycluster
brokerId=1
listenPort=1095
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/root/storeBs
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=500
commitIntervalCommitLog=200
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=48
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
transientStorePoolEnable=false
rocketmq sdk使用newFixThreadPool線程池用於獲取server中的數據(線程數獲取的cpu核心數),拿到消息后,submit 給->ThreadPoolExecutor線程池進行執行消費,執行消費可以指定最大線程數和最小線程數。
rocketmq多消費端是需要通過tag指定的,如果你想和使用rabbitmq一樣使用,可能會瘋,百度搜出來的雞八狗屁文章,根本狗屁不通。
rocketmq 存儲結構:
https://blog.csdn.net/mr253727942/article/details/55805876
IO調度方式:
https://www.cnblogs.com/cobbliu/p/5389556.html