mq有很多,近期買了《分布式消息中間件實踐》這本書,學習關於mq的相關知識。mq大致有有4個功能:
- 異步處理。比如業務端需要給用戶發送郵件,不需要等發送完了之后才讓業務端的調用代碼返回。
- 服務解耦。服務之間調用不需要在代碼上寫死調用某某服務,只需要發送一個消息即可。這種發送消息的處理一般都是立即返回。類似於生成一個后台job。
- 流量削峰。業務系統在做活動的時候短時間內的流量會特別大,基於mq的隊列的特性,可以處理這個瞬時流量過大的問題,減輕后端壓力。
- 消息通訊。主要是訂閱機制,類似聊天室。
內容介紹
低延時的分布式消息處理平台,高性能,可擴展,萬億級。由4個部分構成。name server,broker(消息代理人),producer,consumers。這4個都能水平擴展(做成集群的方式)來避免單點故障。
NameServers (集群)
NameServer是一個完整的功能性服務。主要包含以下兩個方面:
- Broker管理。接收Broker集群過來的注冊消息,然后提供心跳包機制來檢查Broker是否還存活。
- 路由管理。每一個NameServer都包含關於Broker的完整路由信息,並且隊列化處理客戶端的查詢請求。
有4種方式用於客戶端指定NameServer的地址。
- 編程方式。類似
producer.setNamesrvAddr("ip:port")
- Java 配置。使用
rocketmq.namesrv.addr
- 環境變量。使用
NAMESRV_ADDR
- http 端點。
提供輕量的服務發現與路由,每一個Name Server記錄完整的路由信息,提供相應的讀寫服務,同時支持快速存儲擴展。
Broker (集群)
Broker負責消息的存儲和分發,消息查詢,高可用保證等。模塊如下圖所示。
所以,他有以下幾個比較重要的子模塊:
- 遠程模塊。Broker的入口,用於接收客戶端過來的請求。
- 客戶端管理。用於(生產者/消費者)模式的客戶端管理,維護消費者對主題的訂閱。
- 存儲服務。提供一些簡單的api用於查詢or存儲物理磁盤上的消息。
- 高可用保證服務。提供主從Broker之間的數據同步特性。
- 索引服務。通過特定的key對消息構建索引,然后提供消息查詢。
通過提供輕量級的主題隊列機制來負責消息的儲存。支持推拉模式,包含錯誤容忍機制(2或3份拷貝)。消息的順序處理。提供容災回復,預警機制等。
Producer (集群)
生產者提供分布式部署。分布式的生產者通過負載均衡的方式發送消息給Broker。支持快速錯誤反饋,並且低延遲。
Consumer (集群)
消費者也支持在推/拉模式中的分布式部署。支持集群消費+消息廣播。提供實時的消息訂閱機制以滿足大部分需求。
如何運行(單機測試環境)
下載
安裝包就好(源碼也行),然后解壓
wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
unzip rocketmq-all-4.5.1-bin-release.zip -d /usr/lib/rocketmq/
改一些配置
環境變量
#配置64位jdk是必須的---這里跳過
export ROCKETMQ_HOME=/usr/lib/rocketmq/rocketmq #這個不是必須,只是方便下面操作
source ~/.bash_profile #使配置生效一下
rm -f ~/logs/rocketmqlogs/*.log # 把日志清了,方便查看
啟動參數
rocketmq默認的內存配置要求對我們的測試服務器要求太高了,改低一點。
vim $ROCKETMQ_HOME/bin/runserver.sh
# 把第39行改成
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g" #最小,啟動,最大都是1g
vim $ROCKETMQ_HOME/bin/runbroker.sh
# 把39行改成
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g" # 同上面
broker配置
vim $ROCKETMQ_HOME/conf/broker.conf
# 加這些配置
brokerIP1 = 192.168.3.20 #此broker的ip
namesvrAddr=192.168.3.20:9876 #nameServer地址,集群的話需要分號分隔。
啟動NameServer
因為需要NameServer來管理Broker,所以先啟動NameServer。默認的日志地址為~/logs/rocketmqlogs
。
nohup sh $ROCKETMQ_HOME/bin/mqnamesrv & # 以忽略掛起信號的方式啟動nameserver
tail -f ~/logs/rocketmqlogs/namesrv.log # 監控的方式查看日志尾部
啟動Broker
消息要通過Broker來分發,NameServer也已經啟動,所以現在可以啟動Broker。
nohup sh $ROCKETMQ_HOME/bin/mqbroker -c $ROCKETMQ_HOME/conf/broker.conf &
tail -f ~/logs/rocketmqlogs/broker.log
web站點查看
這里需要去下載這個,里面有個rocketmq-console。需要修改一個地方。rocketmq-externals/rocketmq-console/src/main/resources/application.properties
,
rocketmq.config.namesrvAddr=192.168.3.20:9876 #改成對應的namesrvAddr 地址
服務器上需要開放幾個端口用於console的連接,如果沒有配置,就是默認的:9876,10911,10909
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload
然后就能查看了。
發送、接收消息
后續通過代碼客戶端的方式來實踐。
關閉服務
sh $ROCKETMQ_HOME/bin/mqshutdown broker #先關了Broker
sh $ROCKETMQ_HOME/bin/mqshutdown namesrv #再關閉nameserver