docker 部署單節點rocketMQ


 

消息隊列的介紹

消息(Message)是指在應用間傳送的數據(比如字符串,json等),消息隊列(Message Queue,簡稱MQ)是一個古老的計算機術語,UNIX進程間通信就用到了消息隊列技術:一個進程把數據寫入某個特定隊列中,其它隊列讀取特定隊列中的數據實現異步通信。而現在我們所說的MQ通常指的是獨立的消息隊列中間件,利用高效可靠的消息傳遞機制進行與平台無關的數據交流,並基於數據通信來進行分布式系統的集成

消息隊列解決的問題

在當今互聯網微服務架構大行其道的情況下,對於各個服務之間的松耦合、高通信要求越來越高,或者要求各個服務之間的事務一致性等問題。以及高並發請求的流量控制等問題。從而分布式消息隊列可以解決大多數生產環境中遇到的問題。包括應用解耦、流量削峰、消息分發、最終一致性等問題。

  • 應用解耦
    比如一個商城系統,包括訂單,庫存。用戶等系統,如果這些系統是高耦合調用,怎么保證其中一個系統出現問題不影響用戶下單的行為。通過消息隊列就可以把庫存系統的數據緩存到消息隊列中,而不影響訂單系統和用戶系統。從而用戶可以正產下單,當庫存系統恢復之后,再從消息隊列中取出數據進行庫存的操作。
  • 流量削峰
    比如秒殺互動,在秒殺期間會短時間內有大量請求,這時候如果通過增加部署大量的機器處理這些請求,當秒殺活動結束之后,請求大量降低,會造成很多資源的浪費。通過消息隊列可以先保存部分請求,之后再勻速處理請求。從而保證了既不需要增加可多機器,又可以處理高並發的請求。
  • 消息分發
    如果一個服務產生的數據對於其他服務有用,在一個服務生產的數據時候通知其他服務,如果后期再有服務需要這些數據,就可能需要修改原服務才能擴展。通過消息隊列可以原服務只要把數據發到消息隊列。需要這些數據的服務訂閱此消息即可。
  • 最終一致性
    庫存系統有自己的數據庫,用戶系統有自己的數據庫,如何保證庫存系統扣完庫存,在用戶系統支付失敗的情況下保證庫存系統的數據正確性呢?在多個服務之間,每個服務都具有自己的本地事務,從而分布式事務就是我們要考慮的問題。通過消息隊列(RocketMq)消息事務可以保證最終的一致性。

消息隊列的組成

  • Broker
    消息服務器,作為server提供消息核心服務
  • Producer
    消息生產者,業務的發起方,生產消息傳輸給broker
  • Consumer
    消息消費者,業務處理方,負責從broker獲取消息並進行業務邏輯處理
  • Topic
    主題,發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ服務器分發到不同的訂閱者,實現消息的廣播
  • Queue
    隊列,點對點模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收
  • Message
    消息體,根據不同通信協議定義的固定格式編碼數據包,來封裝業務數據,實現消息的傳輸。

RocketMq簡介

Apache RocketMQ是一個分布式消息傳遞和流媒體平台,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可伸縮性。

RocketMq特點

  • 靈活擴展高可用
    方便集群配置,其核心四組件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點故障的情況下進行水平擴展。

  • 海量消息堆積能力
    RocketMQ 采用零拷貝原理實現超大的消息的堆積能力,據說單機可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲。

  • 支持順序消息
    可以保證消息消費者按照消息發送的順序對消息進行消費。順序消息分為全局有序和局部有序,一般推薦使用局部有序,即生產者通過將某一類消息按順序發送至同一個隊列來實現。

  • 多種消息過濾方式
    消息過濾分為在服務器端過濾和在消費端過濾。服務器端過濾時可以按照消息消費者的要求做過濾,優點是減少不必要消息傳輸,缺點是增加了消息服務器的負擔,實現相對復雜。消費端過濾則完全由具體應用自定義實現,這種方式更加靈活,缺點是很多無用的消息會傳輸給消息消費者。

  • 支持事務消息
    RocketMQ 除了支持普通消息,順序消息之外還支持事務消息,這個特性對於分布式事務來說提供了又一種解決思路。

  • 延遲消費
    RocketMQ 可以針對消息設置延遲消費。在發送消息是也可以針對message設置setDelayTimeLevel

RocketMq基本概念

  • 名稱服務器
    名稱服務器(NameServer)用來保存 Broker 相關元信息並給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設計成幾乎無狀態的,可以橫向擴展,節點之間相互之間無通信,通過部署多台機器來標記自己是一個偽集群。每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。所以從功能上看應該是和 ZooKeeper 差不多,據說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,后來改為了自己實現的 NameServer
  • 消息
    消息(Message)就是要傳輸的信息。一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條消息也可以擁有一個可選的標簽(Tag)和額處的鍵值對,它們可以用於設置一個業務 key 並在 Broker 上查找此消息以便在開發期間查找問題。
  • 主題
    主題(Topic)可以看做消息的規類,它是消息的第一級類型。比如一個電商系統可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產者和消費者的關系非常松散,一個 Topic 可以有0個、1個、多個生產者向其發送消息,一個生產者也可以同時向不同的 Topic 發送消息。一個 Topic 也可以被 0個、1個、多個消費者訂閱
  • 消息服務器
    消息服務器(Broker)是消息存儲中心,主要作用是接收來自 Producer 的消息並存儲, Consumer 從這里取得消息。它還存儲與消息相關的元數據,包括用戶組、消費進度偏移量、隊列信息等。從部署結構圖中可以看出 Broker 有 Master 和 Slave 兩種類型,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結構上看 Broker 的集群部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。
  • 生產者組
    生產者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常發送一類消息並且發送邏輯一致,所以將這些 Producer 分組在一起。從部署結構上看生產者通過 Producer Group 的名字來標記自己是一個集群
  • 消息隊列
    消息隊列(Message Queue),主題被划分為一個或多個子主題,即消息隊列。一個 Topic 下可以設置多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發出去。下圖 Broker 內部消息情況:

RocketMq集群部署結構

在這里插入圖片描述

    • Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
    • Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server
    • Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
    • Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

 

 

創建目錄

要把容器內的日志目錄和數據目錄映射到宿主機上,防止重啟之后回到原始狀態

mkdir -p /data
cd /data
mkdir -p {conf,logs,store}

創建broker.conf文件,把文件放在conf目錄

vi broker.conf

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


# 所屬集群名字
brokerClusterName=DefaultCluster

# broker 名字,注意此處不同的配置文件填寫的不一樣,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址,分號分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 啟動IP,如果 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解決方式1 加上一句 producer.setVipChannelEnabled(false);,解決方式2 brokerIP1 設置宿主機IP,不要使用docker 內部IP
brokerIP1=192.168.1.179 #####修改為自己的IP 

# 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4

# 是否允許 Broker 自動創建 Topic,建議線下開啟,線上關閉 !!!這里仔細看是 falsefalsefalse
autoCreateTopicEnable=true

# 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true

# Broker 對外服務的監聽端口
listenPort=10911

# 刪除文件時間點,默認凌晨4點
deleteWhen=04

# 文件保留時間,默認48小時
fileReservedTime=120

# commitLog 每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每個文件默認存 30W 條,根據業務情況調整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
# 存儲路徑
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存儲路徑
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消費隊列存儲
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存儲路徑
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存儲路徑
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存儲路徑
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH

# 發消息線程池數量
# sendMessageThreadPoolNums=128
# 拉消息線程池數量
# pullMessageThreadPoolNums=128

創建docker-compose.yml 文件,把文件放在 /data/rocketmq/ 目錄

vi docker-compose.yml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
    networks:
        rmq:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
      - ./conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq

運行

docker-compose -f /data/rocketmq/docker-compose.yml up -d

 

 

 

 

參考:https://zhuanlan.zhihu.com/p/133792711

https://blog.csdn.net/zevin_zheng/article/details/106096146

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM