Golang使用RabbitMQ


本文主要記錄在go語言中使用RabbitMQ的相關示例代碼

關於RabbitMQ的基礎知識參考:https://www.cnblogs.com/williamjie/p/9481774.html

RabbitMQ

本實例采用RabbitMQ中的訂閱模型(Fanout、Direct、Topic)中的Direct消息模型

在RabbitMQ中,無論是生產者和消費者實際上都屬於Client。一個Client發送消息,哪些Client可以收到消息,其核心就在於Exchange,RoutingKey,Queue的關系上

對於mq使用針對消費者和生產者來,主要步驟如下:

首先都需要的步驟(示例代碼中這一過程封裝到提供初始化功能的庫文件中):

  • 創建連接Conn
  • 創建通道Channel

生產者:

  • 通過Channel聲明Queue
  • 通過Channel聲明Exchange(需指定Exchange type)
  • 創建Binding(指定一個BindingKey將Queue綁定到Exchange上)
  • 發送消息(需指定RoutingKey和Exchange)

消費者:

  • 通過Channel聲明Queue
  • 從Queue中取消息

在兩個角色中:

  • 生產者需要關注的是Exchange名稱(因為消息需要指定發送到哪個Exchange)以及Exchange和Queue的綁定關系即Binding,所以下面的示例代碼中將Exchange的定義以及Binding關系都寫在了生產者中(實際上這里的代碼可以放到功能庫中,因為在項目中,這些關系都是通過配置的方式提前寫好的)
  • 消費者只需要關注自己指定的一個Queue,從其中取消息,它對什么交換器,RoutingKey、Binding應該秉持關我毛事的態度

編寫MQ初始化庫

庫提供的功能因人而異設計

package rabbitMq

import (
	"log"

	"github.com/streadway/amqp"
) //導入mq包

// MQURL 格式 amqp://賬號:密碼@rabbitmq服務器地址:端口號/vhost (默認是5672端口)
// 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件設置,也可以啟動后通過netstat -tlnp查看
const MQURL = "amqp://admin:huan91uncc@172.21.138.131:5672/"

type RabbitMQ struct {
	Conn    *amqp.Connection
	Channel *amqp.Channel
	// 隊列名稱
	QueueName string
	// 交換機
	Exchange string
	// routing Key
	RoutingKey string
	//MQ鏈接字符串
	Mqurl string
}

// 創建結構體實例
func NewRabbitMQ(queueName, exchange, routingKey string) *RabbitMQ {
	rabbitMQ := RabbitMQ{
		QueueName:  queueName,
		Exchange:   exchange,
		RoutingKey: routingKey,
		Mqurl:      MQURL,
	}
	var err error
	//創建rabbitmq連接
	rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl)
	checkErr(err, "創建連接失敗")

	//創建Channel
	rabbitMQ.Channel, err = rabbitMQ.Conn.Channel()
	checkErr(err, "創建channel失敗")

	return &rabbitMQ

}

// 釋放資源,建議NewRabbitMQ獲取實例后 配合defer使用
func (mq *RabbitMQ) ReleaseRes() {
	mq.Conn.Close()
	mq.Channel.Close()
}

func checkErr(err error, meg string) {
	if err != nil {
		log.Fatalf("%s:%s\n", meg, err)
	}
}

生產者

package main

import (
	"fmt"
	"mq/rabbitMq"

	"github.com/streadway/amqp"
)

//生產者發布流程
func main() {
	// 初始化mq
	mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
	defer mq.ReleaseRes() // 完成任務釋放資源

	// 1.聲明隊列
	/*
		如果只有一方聲明隊列,可能會導致下面的情況:
			a)消費者是無法訂閱或者獲取不存在的MessageQueue中信息
			b)消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄

		為了避免上面的問題,所以最好選擇兩方一起聲明
		ps:如果客戶端嘗試建立一個已經存在的消息隊列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的
	*/
	_, err := mq.Channel.QueueDeclare( // 返回的隊列對象內部記錄了隊列的一些信息,這里沒什么用
		mq.QueueName, // 隊列名
		true,         // 是否持久化
		false,        // 是否自動刪除(前提是至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。注意:生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列)
		false,        // 是否為排他隊列(排他的隊列僅對“首次”聲明的conn可見[一個conn中的其他channel也能訪問該隊列],conn結束后隊列刪除)
		false,        // 是否阻塞
		nil,          //額外屬性(我還不會用)
	)
	if err != nil {
		fmt.Println("聲明隊列失敗", err)
		return
	}

	// 2.聲明交換器
	err = mq.Channel.ExchangeDeclare(
		mq.Exchange, //交換器名
		"topic",     //exchange type:一般用fanout、direct、topic
		true,        // 是否持久化
		false,       //是否自動刪除(自動刪除的前提是至少有一個隊列或者交換器與這和交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁)
		false,       //設置是否內置的。true表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式
		false,       // 是否阻塞
		nil,         // 額外屬性
	)
	if err != nil {
		fmt.Println("聲明交換器失敗", err)
		return
	}

	// 3.建立Binding(可隨心所欲建立多個綁定關系)
	err = mq.Channel.QueueBind(
		mq.QueueName,  // 綁定的隊列名稱
		mq.RoutingKey, // bindkey 用於消息路由分發的key
		mq.Exchange,   // 綁定的exchange名
		false,         // 是否阻塞
		nil,           // 額外屬性
	)
	// err = mq.Channel.QueueBind(
	// 	mq.QueueName,  // 綁定的隊列名稱
	// 	"routingkey2", // bindkey 用於消息路由分發的key
	// 	mq.Exchange,   // 綁定的exchange名
	// 	false,         // 是否阻塞
	// 	nil,           // 額外屬性
	// )
	if err != nil {
		fmt.Println("綁定隊列和交換器失敗", err)
		return
	}

	// 4.發送消息
	mq.Channel.Publish(
		mq.Exchange,   // 交換器名
		mq.RoutingKey, // routing key
		false,         // 是否返回消息(匹配隊列),如果為true, 會根據binding規則匹配queue,如未匹配queue,則把發送的消息返回給發送者
		false,         // 是否返回消息(匹配消費者),如果為true, 消息發送到queue后發現沒有綁定消費者,則把發送的消息返回給發送者
		amqp.Publishing{ // 發送的消息,固定有消息體和一些額外的消息頭,包中提供了封裝對象
			ContentType: "text/plain",           // 消息內容的類型
			Body:        []byte("hello jochen"), // 消息內容
		},
	)
}

消費者

package main

import (
	"fmt"
	"mq/rabbitMq"
)

// 消費者訂閱
func main() {
	// 初始化mq
	mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
	defer mq.ReleaseRes() // 完成任務釋放資源

	// 1.聲明隊列(兩端都要聲明,原因在生產者處已經說明)
	_, err := mq.Channel.QueueDeclare( // 返回的隊列對象內部記錄了隊列的一些信息,這里沒什么用
		mq.QueueName, // 隊列名
		true,         // 是否持久化
		false,        // 是否自動刪除(前提是至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。注意:生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列)
		false,        // 是否為排他隊列(排他的隊列僅對“首次”聲明的conn可見[一個conn中的其他channel也能訪問該隊列],conn結束后隊列刪除)
		false,        // 是否阻塞
		nil,          // 額外屬性(我還不會用)
	)
	if err != nil {
		fmt.Println("聲明隊列失敗", err)
		return
	}

	// 2.從隊列獲取消息(消費者只關注隊列)consume方式會不斷的從隊列中獲取消息
	msgChanl, err := mq.Channel.Consume(
		mq.QueueName, // 隊列名
		"",           // 消費者名,用來區分多個消費者,以實現公平分發或均等分發策略
		true,         // 是否自動應答
		false,        // 是否排他
		false,        // 是否接收只同一個連接中的消息,若為true,則只能接收別的conn中發送的消息
		true,         // 隊列消費是否阻塞
		nil,          // 額外屬性
	)
	if err != nil {
		fmt.Println("獲取消息失敗", err)
		return
	}

	for msg := range msgChanl {
		// 這里寫你的處理邏輯
		// 獲取到的消息是amqp.Delivery對象,從中可以獲取消息信息
		fmt.Println(string(msg.Body))
		// msg.Ack(true) // 主動應答

	}

}

web管理界面查看結果

關於rabbitMQ的web管理界面如何使用可以看這里

  1. 連接信息:

  2. channel信息:

  3. 交換器信息:

  4. 隊列信息:

  5. Binding信息
    入口:Exchange -> 點擊想看的交換器


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM