RocketMq入門
消息隊列的介紹
消息(Message)是指在應用間傳送的數據(比如字符串,json等),消息隊列(Message Queue,簡稱MQ)是一個古老的計算機術語,UNIX進程間通信就用到了消息隊列技術:一個進程把數據寫入某個特定隊列中,其它隊列讀取特定隊列中的數據實現異步通信。而現在我們所說的MQ通常指的是獨立的消息隊列中間件,利用高效可靠的消息傳遞機制進行與平台無關的數據交流,並基於數據通信來進行分布式系統的集成
消息隊列解決的問題
在當今互聯網微服務架構大行其道的情況下,對於各個服務之間的松耦合、高通信要求越來越高,或者要求各個服務之間的事務一致性等問題。以及高並發請求的流量控制等問題。從而分布式消息隊列可以解決大多數生產環境中遇到的問題。包括應用解耦、流量削峰、消息分發、最終一致性等問題。
- 應用解耦
比如一個商城系統,包括訂單,庫存。用戶等系統,如果這些系統是高耦合調用,怎么保證其中一個系統出現問題不影響用戶下單的行為。通過消息隊列就可以把庫存系統的數據緩存到消息隊列中,而不影響訂單系統和用戶系統。從而用戶可以正產下單,當庫存系統恢復之后,再從消息隊列中取出數據進行庫存的操作。 - 流量削峰
比如秒殺互動,在秒殺期間會短時間內有大量請求,這時候如果通過增加部署大量的機器處理這些請求,當秒殺活動結束之后,請求大量降低,會造成很多資源的浪費。通過消息隊列可以先保存部分請求,之后再勻速處理請求。從而保證了既不需要增加可多機器,又可以處理高並發的請求。 - 消息分發
如果一個服務產生的數據對於其他服務有用,在一個服務生產的數據時候通知其他服務,如果后期再有服務需要這些數據,就可能需要修改原服務才能擴展。通過消息隊列可以原服務只要把數據發到消息隊列。需要這些數據的服務訂閱此消息即可。 - 最終一致性
庫存系統有自己的數據庫,用戶系統有自己的數據庫,如何保證庫存系統扣完庫存,在用戶系統支付失敗的情況下保證庫存系統的數據正確性呢?在多個服務之間,每個服務都具有自己的本地事務,從而分布式事務就是我們要考慮的問題。通過消息隊列(RocketMq)消息事務可以保證最終的一致性。
消息隊列的組成
- Broker
消息服務器,作為server提供消息核心服務 - Producer
消息生產者,業務的發起方,生產消息傳輸給broker - Consumer
消息消費者,業務處理方,負責從broker獲取消息並進行業務邏輯處理 - Topic
主題,發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ服務器分發到不同的訂閱者,實現消息的廣播 - Queue
隊列,點對點模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收 - Message
消息體,根據不同通信協議定義的固定格式編碼數據包,來封裝業務數據,實現消息的傳輸。
RocketMq簡介
Apache RocketMQ是一個分布式消息傳遞和流媒體平台,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可伸縮性。
RocketMq特點
-
靈活擴展高可用
方便集群配置,其核心四組件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點故障的情況下進行水平擴展。 -
海量消息堆積能力
RocketMQ 采用零拷貝原理實現超大的消息的堆積能力,據說單機可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲。 -
支持順序消息
可以保證消息消費者按照消息發送的順序對消息進行消費。順序消息分為全局有序和局部有序,一般推薦使用局部有序,即生產者通過將某一類消息按順序發送至同一個隊列來實現。 -
多種消息過濾方式
消息過濾分為在服務器端過濾和在消費端過濾。服務器端過濾時可以按照消息消費者的要求做過濾,優點是減少不必要消息傳輸,缺點是增加了消息服務器的負擔,實現相對復雜。消費端過濾則完全由具體應用自定義實現,這種方式更加靈活,缺點是很多無用的消息會傳輸給消息消費者。 -
支持事務消息
RocketMQ 除了支持普通消息,順序消息之外還支持事務消息,這個特性對於分布式事務來說提供了又一種解決思路。 -
延遲消費
RocketMQ 可以針對消息設置延遲消費。在發送消息是也可以針對message設置setDelayTimeLevel
RocketMq基本概念
- 名稱服務器
名稱服務器(NameServer)用來保存 Broker 相關元信息並給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設計成幾乎無狀態的,可以橫向擴展,節點之間相互之間無通信,通過部署多台機器來標記自己是一個偽集群。每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。所以從功能上看應該是和 ZooKeeper 差不多,據說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,后來改為了自己實現的 NameServer - 消息
消息(Message)就是要傳輸的信息。一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條消息也可以擁有一個可選的標簽(Tag)和額處的鍵值對,它們可以用於設置一個業務 key 並在 Broker 上查找此消息以便在開發期間查找問題。 - 主題
主題(Topic)可以看做消息的規類,它是消息的第一級類型。比如一個電商系統可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產者和消費者的關系非常松散,一個 Topic 可以有0個、1個、多個生產者向其發送消息,一個生產者也可以同時向不同的 Topic 發送消息。一個 Topic 也可以被 0個、1個、多個消費者訂閱 - 消息服務器
消息服務器(Broker)是消息存儲中心,主要作用是接收來自 Producer 的消息並存儲, Consumer 從這里取得消息。它還存儲與消息相關的元數據,包括用戶組、消費進度偏移量、隊列信息等。從部署結構圖中可以看出 Broker 有 Master 和 Slave 兩種類型,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結構上看 Broker 的集群部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。 - 生產者組
生產者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常發送一類消息並且發送邏輯一致,所以將這些 Producer 分組在一起。從部署結構上看生產者通過 Producer Group 的名字來標記自己是一個集群 - 消息隊列
消息隊列(Message Queue),主題被划分為一個或多個子主題,即消息隊列。一個 Topic 下可以設置多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發出去。下圖 Broker 內部消息情況:
RocketMq集群部署結構
- Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
- Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server
- Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
- Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
創建目錄
要把容器內的日志目錄和數據目錄映射到宿主機上,防止重啟之后回到原始狀態
mkdir -p /data cd /data mkdir -p {conf,logs,store}
創建broker.conf文件,把文件放在conf目錄
vi broker.conf
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 所屬集群名字 brokerClusterName=DefaultCluster # broker 名字,注意此處不同的配置文件填寫的不一樣,如果在 broker-a.properties 使用: broker-a, # 在 broker-b.properties 使用: broker-b brokerName=broker-a # 0 表示 Master,> 0 表示 Slave brokerId=0 # nameServer地址,分號分割 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 啟動IP,如果 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed # 解決方式1 加上一句 producer.setVipChannelEnabled(false);,解決方式2 brokerIP1 設置宿主機IP,不要使用docker 內部IP brokerIP1=192.168.1.179 #####修改為自己的IP # 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 # 是否允許 Broker 自動創建 Topic,建議線下開啟,線上關閉 !!!這里仔細看是 false,false,false autoCreateTopicEnable=true # 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # Broker 對外服務的監聽端口 listenPort=10911 # 刪除文件時間點,默認凌晨4點 deleteWhen=04 # 文件保留時間,默認48小時 fileReservedTime=120 # commitLog 每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue 每個文件默認存 30W 條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store # commitLog 存儲路徑 # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog # 消費隊列存儲 # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue # 消息索引存儲路徑 # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index # checkpoint 文件存儲路徑 # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint # abort 文件存儲路徑 # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH # 發消息線程池數量 # sendMessageThreadPoolNums=128 # 拉消息線程池數量 # pullMessageThreadPoolNums=128
創建docker-compose.yml 文件,把文件放在 /data/rocketmq/ 目錄
vi docker-compose.yml
version: '3.5' services: rmqnamesrv: image: foxiswho/rocketmq:server container_name: rmqnamesrv ports: - 9876:9876 volumes: - ./logs:/opt/logs - ./store:/opt/store networks: rmq: aliases: - rmqnamesrv rmqbroker: image: foxiswho/rocketmq:broker container_name: rmqbroker ports: - 10909:10909 - 10911:10911 volumes: - ./logs:/opt/logs - ./store:/opt/store - ./conf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rmqnamesrv networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rmqconsole ports: - 8080:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rmqnamesrv networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq
運行
docker-compose -f /data/rocketmq/docker-compose.yml up -d
參考:https://zhuanlan.zhihu.com/p/133792711
https://blog.csdn.net/zevin_zheng/article/details/106096146