Go語言系列之RabbitMQ消息隊列


1. RabbitMQ是什么?

  MQ 是什么?隊列是什么,MQ 我們可以理解為消息隊列,隊列我們可以理解為管道。以管道的方式做消息傳遞。

     生活場景:

    1.其實我們在雙11的時候,當我們凌晨大量的秒殺和搶購商品,然后去結算的時候,就會發現,界面會提醒我們,讓我們稍等,以及一些友好的圖片文字提醒。而不是像前幾年的時代,動不動就頁面卡死,報錯等來呈現給用戶。

    在這業務場景中,我們就可以采用隊列的機制來處理,因為同時結算就只能達到這么多。

    2.在我們平時的超市中購物也是一樣,當我們在結算的時候,並不會一窩蜂一樣涌入收銀台,而是排隊結算。這也是隊列機制。

2. RabbitMQ簡介

       AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 下面將重點介紹RabbitMQ中的一些基礎概念,了解了這些概念,是使用好RabbitMQ的基礎。

  • 可靠性(Reliablity):使用了一些機制來保證可靠性,比如持久化、傳輸確認、發布確認。
  • 靈活的路由(Flexible Routing):在消息進入隊列之前,通過Exchange來路由消息。對於典型的路由功能,Rabbit已經提供了一些內置的Exchange來實現。針對更復雜的路由功能,可以將多個Exchange綁定在一起,也通過插件機制實現自己的Exchange。
  • 消息集群(Clustering):多個RabbitMQ服務器可以組成一個集群,形成一個邏輯Broker。
  • 高可用(Highly Avaliable Queues):隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
  • 多種協議(Multi-protocol):支持多種消息隊列協議,如STOMP、MQTT等。
  • 多種語言客戶端(Many Clients):幾乎支持所有常用語言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用戶界面,使得用戶可以監控和管理消息Broker的許多方面。
  • 跟蹤機制(Tracing):如果消息異常,RabbitMQ提供了消息的跟蹤機制,使用者可以找出發生了什么。
  • 插件機制(Plugin System):提供了許多插件,來從多方面進行擴展,也可以編輯自己的插件。

2.1 定義和特征

  1. RbbitMQ是面向消息的中間件,用於組件之間的解耦,主要體現在消息的發送者和消費者之間無強依賴關系

  2. RabbitMQ特點:高可用,可擴展,多語言客戶端,管理界面等;

  3. 主要使用場景:流量削峰,異步處理,應用解耦等;

2.2 安裝

    安裝erlang

# centos7
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.17/rabbitmq-server-3.7.17-1.el7.noarch.rpm
yum install epel-release
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm

 安裝rabbitmq

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-3.8.0-1.el7.noarch.rpm
yum -y install socat
rpm -ivh rabbitmq-server-3.8.0-1.el7.noarch.rpm

 啟動

chkconfig rabbitmq-server on                     # 開機啟動
systemctl start rabbitmq-server.service          # 啟動
systemctl stop rabbitmq-server.service    		# 停止
systemctl restart rabbitmq-server.service		# 重啟
rabbitmqctl status							  # 查看狀態
rabbitmq-plugins enable rabbitmq_management      # 啟動Web管理器

 

    修改配置

vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/ebin/rabbit.app

將:{loopback_users, [<<”guest”>>]}, 改為:{loopback_users, []}, 原因:rabbitmq從3.3.0開始禁止使用guest/guest權限通過   除localhost外的訪問

systemctl restart rabbitmq-server.service    # 重啟服務

3. RabbitMQ核心概念

  • Broker:標識消息隊列服務器實體.
  • Virtual Host:虛擬主機。標識一批交換機、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個vhost本質上就是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在鏈接時指定,RabbitMQ默認的vhost是 /。
  • Exchange:交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
  • Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
  • Banding:綁定,用於消息隊列和交換機之間的關聯。一個綁定就是基於路由鍵將交換機和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
  • Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。新到是建立在真實的TCP連接內地虛擬鏈接,AMQP命令都是通過新到發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說,建立和銷毀TCP都是非常昂貴的開銷,所以引入了信道的概念,以復用一條TCP連接。
  • Connection:網絡連接,比如一個TCP連接。
  • Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
  • Consumer:消息的消費者,表示一個從一個消息隊列中取得消息的客戶端應用程序。
  • Message:消息,消息是不具名的,它是由消息頭和消息體組成。消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。

AMQP消息路由

 

       AMQP中消息的路由過程和JMS存在一些差別。AMQP中增加了Exchange和Binging的角色。生產者把消息發布到Exchange上,消息最終到達隊列並被消費者接收,而Binding決定交換器的消息應該發送到哪個隊列。

  • Exchange類型

      Exchange分發消息時,根據類型的不同分發策略有區別。目前共四種類型:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由鍵(Routing-key),此外headers交換器和direct交換器完全一致,但是性能差了很多,目前幾乎用不到了。所以直接看另外三種類型。)。

       direct

 

消息中的路由鍵(routing key)如果和Binding中的binding key一致,交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配。

     fanout

 

        每個發到fanout類型交換器的消息都會分到所有綁定的隊列上去。fanout交換器不處理該路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout類型轉發消息是最快的。

        topic

 
       topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵(routing-key)和綁定鍵(bingding-key)的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:"#"和"*"。#匹配0個或多個單詞,匹配不多不少一個單詞。

4. RabbitMQ的運行模式

  • 簡單模式
       創建實例
package RabbitMQ

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)

//url格式  amqp://賬號:密碼@rabbitmq服務器地址:端口號/vhost
const MQURL = "amqp://zhangyafei:zhangyafei@182.254.179.186:5672/imooc"

type RabbitMQ struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    // 隊列名稱
    QueueName string
    //交換機
    Exchange string
    //key
    Key string
    // 連接信息
    Mqurl string
}

//創建結構體實例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
    rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
    var err error
    // 創建rabbitmq連接
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "創建連接錯誤!")
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "獲取channel失敗!")
    return rabbitmq
}

//斷開channel和connection
func (r *RabbitMQ) Destory() {
    r.channel.Close()
    r.conn.Close()
}

//錯誤處理函數
func (r *RabbitMQ) failOnErr(err error, message string) {
    if err != nil {
        log.Fatalf("%s:%s", message, err)
        panic(fmt.Sprintf("%s:%s", message, err))
    }
}

// 簡單模式step1: 1.創建簡單模式下的rabbitmq實例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
    return NewRabbitMQ(queueName, "", "")
}

// 簡單模式step2: 2.簡單模式下生產
func (r *RabbitMQ) PublishSimple(message string) {
    // 1. 申請隊列,如果隊列不存在則自動創建,如果存在則跳過創建
    // 保證隊列存在,消息能發送到隊列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        // 是否持久化
        false,
        // 是否為自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    // 2. 發送消息到隊列中
    r.channel.Publish(
        r.Exchange,
        r.QueueName,
        // 如果為true,根據exchange類型和routekey規則,如果無法找到符合條件的隊列,則會把發送的消息返回給發送者
        false,
        // 如果為true,當exchange發送消息到隊列后發現隊列上沒有綁定消費者,則會把消息發還給發送者
        false,
        amqp.Publishing{ContentType: "text/plain", Body: []byte(message)},
    )
}

// 簡單模式step3: 3.簡單模式下消費
func (r *RabbitMQ) ConsumeSimple() {
    // 1. 申請隊列,如果隊列不存在則自動創建,如果存在則跳過創建
    // 保證隊列存在,消息能發送到隊列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        // 是否持久化
        false,
        // 是否為自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    // 2. 接收消息
    msgs, err := r.channel.Consume(
        r.QueueName,
        // 用來區分多個消費者
        "",
        // 是否自動應答
        true,
        // 是否具有排他性
        false,
        // 如果為true,表示不能將同一個conn中的消息發送給這個conn中的消費者
        false,
        // 隊列是否阻塞
        false,
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    forever := make(chan bool)
    // 3. 啟用協程處理消息
    go func() {
        for d := range msgs {
            // 實現我們要處理的邏輯函數
            log.Printf("Received a message: %s", d.Body)
        }
    }()
    log.Printf("[*] waiting for messages, to exit process CTRL+C")
    <-forever
}
創建實例
        生產者
package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
    "fmt"
)

func main()  {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
    rabbitmq.PublishSimple("hello imooc!")
    fmt.Println("發送成功")
}
mainSimplePublish.go

  消費者

package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
)

func main()  {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
    rabbitmq.ConsumeSimple()
}
mainSimpleReceive.go

  • 工作模式
     生產者
package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
    "fmt"
    "strconv"
    "time"
)

func main()  {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
    for i := 0; i<= 100; i++ {
        rabbitmq.PublishSimple("hello imooc!" + strconv.Itoa(i))
        time.Sleep(1 * time.Second)
        fmt.Println(i)
    }
}
mainWorkPublish.go

  消費者

package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
)

func main()  {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
    rabbitmq.ConsumeSimple()
}
mainWorkReceive.go

  • 訂閱模式
      創建實例
// 訂閱模式下創建RabbitMQ實例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
    rabbitmq := NewRabbitMQ("", exchangeName, "")
    var err error
    // 獲取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
    // 獲取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "failed to open a channel")
    return rabbitmq
}

// 訂閱模式下生產
func (r *RabbitMQ) PublishPub(message string) {
    // 1. 嘗試創建交換機
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        "fanout", // 廣播類型
        true,     // 持久化
        false,    // 是否刪除
        false,    // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare a exchange")
    // 2. 發送消息
    err = r.channel.Publish(
        r.Exchange,
        "",
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })
}

// 訂閱模式消費端的代碼
func (r *RabbitMQ) ReceiveSub() {
    // 1. 嘗試創建交換機
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        "fanout", // 廣播類型
        true,     // 持久化
        false,    // 是否刪除
        false,    // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare a exchange")
    // 2. 試探性創建隊列
    q, err := r.channel.QueueDeclare(
        "", // 隨機生產隊列名稱
        false,
        false,
        true,
        false,
        nil,
    )
    r.failOnErr(err, "failed to declare a queue")
    // 綁定隊列到 exchange中
    err = r.channel.QueueBind(
        q.Name,
        "", // 在訂閱模式下,這里的key為空
        r.Exchange,
        false,
        nil)
    // 消費消息
    messages, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    forever := make(chan bool)
    // 3. 啟用協程處理消息
    go func() {
        for d := range messages {
            // 實現我們要處理的邏輯函數
            log.Printf("Received a message: %s", d.Body)
        }
    }()
    log.Printf("[*] waiting for messages, to exit process CTRL+C")
    <-forever
}
創建實例
      生產者
package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
    "fmt"
    "strconv"
    "time"
)

func main() {
    rabbitmq := RabbitMQ.NewRabbitMQPubSub("NewProduct")
    for i := 0; i <= 100; i++ {
        rabbitmq.PublishPub("訂閱模式生產第" + strconv.Itoa(i) + "條數據")
        fmt.Println("訂閱模式生產第" + strconv.Itoa(i) + "條數據")
        time.Sleep(1 * time.Second)
    }
}
mainPub.go
        消費者
package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
)

func main() {
    rabbitmq := RabbitMQ.NewRabbitMQPubSub("NewProduct")
    rabbitmq.ReceiveSub()
}
mainSub.go
 
              
              
              
              
            
  • 路由模式
 
   創建實例、
// 路由模式下創建RabbitMQ實例
func NewRabbitMQRouting(exchangeName string, routingkey string) *RabbitMQ {
    // 創建RabbitMQ實例
    rabbitmq := NewRabbitMQ("", exchangeName, routingkey)
    var err error
    // 獲取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
    // 獲取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "failed to open a channel")
    return rabbitmq
}

// 路由模式發送消息
func (r *RabbitMQ) PublishRouting(message string) {
    // 1. 嘗試創建交換機
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        "direct", // 定向類型
        true,     // 持久化
        false,    // 是否刪除
        false,    // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare a exchange")
    // 2. 發送消息
    err = r.channel.Publish(
        r.Exchange,
        r.Key,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })
}

// 路由模式消費端的代碼
func (r *RabbitMQ) ReceiveRouting() {
    // 1. 試探性的創建交換機
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        "direct", // 廣播類型
        true,     // 持久化
        false,    // 是否刪除
        false,    // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare a exchange")
    // 2. 試探性創建隊列
    q, err := r.channel.QueueDeclare(
        "", // 隨機生產隊列名稱
        false,
        false,
        true,
        false,
        nil,
    )
    r.failOnErr(err, "failed to declare a queue")
    // 綁定隊列到 exchange中
    err = r.channel.QueueBind(
        q.Name,
        r.Key, // 在訂閱模式下,這里的key為空
        r.Exchange,
        false,
        nil)
    // 消費消息
    messages, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    forever := make(chan bool)
    // 3. 啟用協程處理消息
    go func() {
        for d := range messages {
            // 實現我們要處理的邏輯函數
            log.Printf("Received a message: %s", d.Body)
        }
    }()
    log.Printf("[*] waiting for messages, to exit process CTRL+C")
    <-forever
}
創建實例
     生產者
package main

import (
    "RabbitMQ/RabbitMQ/RabbitMQ"
    "fmt"
    "strconv"
    "time"
)

func main()  {
    rabbit_imooc_one := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
    rabbit_imooc_two := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")
    for i := 0; i <= 100; i++ {
        rabbit_imooc_one.PublishRouting("Hello imooc one!" + strconv.Itoa(i))
        rabbit_imooc_two.PublishRouting("Hello imooc two!" + strconv.Itoa(i))
        time.Sleep(1 * time.Second)
        fmt.Println(i)
    }
}
PublishRouting.go

  消費者1

package main

import "RabbitMQ/RabbitMQ/RabbitMQ"

func main()  {
    rabbitmq_imooc_one := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
    rabbitmq_imooc_one.ReceiveRouting()
}
ReceiveRouting1.go

  消費者2

package main

import "RabbitMQ/RabbitMQ/RabbitMQ"

func main()  {
    rabbitmq_imooc_two := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")
    rabbitmq_imooc_two.ReceiveRouting()
}
ReceiveRouting2.go




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM