本文主要記錄在go語言中使用RabbitMQ的相關示例代碼
關於RabbitMQ的基礎知識參考:https://www.cnblogs.com/williamjie/p/9481774.html
RabbitMQ
本實例采用RabbitMQ中的訂閱模型(Fanout、Direct、Topic)中的Direct消息模型
在RabbitMQ中,無論是生產者和消費者實際上都屬於Client。一個Client發送消息,哪些Client可以收到消息,其核心就在於Exchange,RoutingKey,Queue的關系上
對於mq使用針對消費者和生產者來,主要步驟如下:
首先都需要的步驟(示例代碼中這一過程封裝到提供初始化功能的庫文件中):
- 創建連接Conn
- 創建通道Channel
生產者:
- 通過Channel聲明Queue
- 通過Channel聲明Exchange(需指定Exchange type)
- 創建Binding(指定一個
BindingKey
將Queue綁定到Exchange上) - 發送消息(需指定
RoutingKey
和Exchange)
消費者:
- 通過Channel聲明Queue
- 從Queue中取消息
在兩個角色中:
- 生產者需要關注的是Exchange名稱(因為消息需要指定發送到哪個Exchange)以及Exchange和Queue的綁定關系即Binding,所以下面的示例代碼中將Exchange的定義以及Binding關系都寫在了生產者中(實際上這里的代碼可以放到功能庫中,因為在項目中,這些關系都是通過配置的方式提前寫好的)
- 消費者只需要關注自己指定的一個Queue,從其中取消息,它對什么交換器,RoutingKey、Binding應該秉持關我毛事的態度
編寫MQ初始化庫
庫提供的功能因人而異設計
package rabbitMq
import (
"log"
"github.com/streadway/amqp"
) //導入mq包
// MQURL 格式 amqp://賬號:密碼@rabbitmq服務器地址:端口號/vhost (默認是5672端口)
// 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件設置,也可以啟動后通過netstat -tlnp查看
const MQURL = "amqp://admin:huan91uncc@172.21.138.131:5672/"
type RabbitMQ struct {
Conn *amqp.Connection
Channel *amqp.Channel
// 隊列名稱
QueueName string
// 交換機
Exchange string
// routing Key
RoutingKey string
//MQ鏈接字符串
Mqurl string
}
// 創建結構體實例
func NewRabbitMQ(queueName, exchange, routingKey string) *RabbitMQ {
rabbitMQ := RabbitMQ{
QueueName: queueName,
Exchange: exchange,
RoutingKey: routingKey,
Mqurl: MQURL,
}
var err error
//創建rabbitmq連接
rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl)
checkErr(err, "創建連接失敗")
//創建Channel
rabbitMQ.Channel, err = rabbitMQ.Conn.Channel()
checkErr(err, "創建channel失敗")
return &rabbitMQ
}
// 釋放資源,建議NewRabbitMQ獲取實例后 配合defer使用
func (mq *RabbitMQ) ReleaseRes() {
mq.Conn.Close()
mq.Channel.Close()
}
func checkErr(err error, meg string) {
if err != nil {
log.Fatalf("%s:%s\n", meg, err)
}
}
生產者
package main
import (
"fmt"
"mq/rabbitMq"
"github.com/streadway/amqp"
)
//生產者發布流程
func main() {
// 初始化mq
mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
defer mq.ReleaseRes() // 完成任務釋放資源
// 1.聲明隊列
/*
如果只有一方聲明隊列,可能會導致下面的情況:
a)消費者是無法訂閱或者獲取不存在的MessageQueue中信息
b)消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄
為了避免上面的問題,所以最好選擇兩方一起聲明
ps:如果客戶端嘗試建立一個已經存在的消息隊列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的
*/
_, err := mq.Channel.QueueDeclare( // 返回的隊列對象內部記錄了隊列的一些信息,這里沒什么用
mq.QueueName, // 隊列名
true, // 是否持久化
false, // 是否自動刪除(前提是至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。注意:生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列)
false, // 是否為排他隊列(排他的隊列僅對“首次”聲明的conn可見[一個conn中的其他channel也能訪問該隊列],conn結束后隊列刪除)
false, // 是否阻塞
nil, //額外屬性(我還不會用)
)
if err != nil {
fmt.Println("聲明隊列失敗", err)
return
}
// 2.聲明交換器
err = mq.Channel.ExchangeDeclare(
mq.Exchange, //交換器名
"topic", //exchange type:一般用fanout、direct、topic
true, // 是否持久化
false, //是否自動刪除(自動刪除的前提是至少有一個隊列或者交換器與這和交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁)
false, //設置是否內置的。true表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式
false, // 是否阻塞
nil, // 額外屬性
)
if err != nil {
fmt.Println("聲明交換器失敗", err)
return
}
// 3.建立Binding(可隨心所欲建立多個綁定關系)
err = mq.Channel.QueueBind(
mq.QueueName, // 綁定的隊列名稱
mq.RoutingKey, // bindkey 用於消息路由分發的key
mq.Exchange, // 綁定的exchange名
false, // 是否阻塞
nil, // 額外屬性
)
// err = mq.Channel.QueueBind(
// mq.QueueName, // 綁定的隊列名稱
// "routingkey2", // bindkey 用於消息路由分發的key
// mq.Exchange, // 綁定的exchange名
// false, // 是否阻塞
// nil, // 額外屬性
// )
if err != nil {
fmt.Println("綁定隊列和交換器失敗", err)
return
}
// 4.發送消息
mq.Channel.Publish(
mq.Exchange, // 交換器名
mq.RoutingKey, // routing key
false, // 是否返回消息(匹配隊列),如果為true, 會根據binding規則匹配queue,如未匹配queue,則把發送的消息返回給發送者
false, // 是否返回消息(匹配消費者),如果為true, 消息發送到queue后發現沒有綁定消費者,則把發送的消息返回給發送者
amqp.Publishing{ // 發送的消息,固定有消息體和一些額外的消息頭,包中提供了封裝對象
ContentType: "text/plain", // 消息內容的類型
Body: []byte("hello jochen"), // 消息內容
},
)
}
消費者
package main
import (
"fmt"
"mq/rabbitMq"
)
// 消費者訂閱
func main() {
// 初始化mq
mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
defer mq.ReleaseRes() // 完成任務釋放資源
// 1.聲明隊列(兩端都要聲明,原因在生產者處已經說明)
_, err := mq.Channel.QueueDeclare( // 返回的隊列對象內部記錄了隊列的一些信息,這里沒什么用
mq.QueueName, // 隊列名
true, // 是否持久化
false, // 是否自動刪除(前提是至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。注意:生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列)
false, // 是否為排他隊列(排他的隊列僅對“首次”聲明的conn可見[一個conn中的其他channel也能訪問該隊列],conn結束后隊列刪除)
false, // 是否阻塞
nil, // 額外屬性(我還不會用)
)
if err != nil {
fmt.Println("聲明隊列失敗", err)
return
}
// 2.從隊列獲取消息(消費者只關注隊列)consume方式會不斷的從隊列中獲取消息
msgChanl, err := mq.Channel.Consume(
mq.QueueName, // 隊列名
"", // 消費者名,用來區分多個消費者,以實現公平分發或均等分發策略
true, // 是否自動應答
false, // 是否排他
false, // 是否接收只同一個連接中的消息,若為true,則只能接收別的conn中發送的消息
true, // 隊列消費是否阻塞
nil, // 額外屬性
)
if err != nil {
fmt.Println("獲取消息失敗", err)
return
}
for msg := range msgChanl {
// 這里寫你的處理邏輯
// 獲取到的消息是amqp.Delivery對象,從中可以獲取消息信息
fmt.Println(string(msg.Body))
// msg.Ack(true) // 主動應答
}
}
web管理界面查看結果
關於rabbitMQ的web管理界面如何使用可以看這里
-
連接信息:
-
channel信息:
-
交換器信息:
-
隊列信息:
-
Binding信息
入口:Exchange -> 點擊想看的交換器