go 操作RabbitMQ


1.RMQ的安裝

docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=用戶名 -e RABBITMQ_DEFAULT_PASS=密碼 rabbitmq:3-management

  • 三個端口映射,分別表示
5672:連接生產者、消費者的端口
15672:WEB管理頁面的端口
25672:分布式集群的端口

2.基本概念

  • amqp:高級消息隊列協議,即一種消息中間件協議,RMQ是amqp協議的一個具體實現。RMQ使用Erlang語言實現的,具有很好的並發能力,具體歷史請百度,這里主要關心怎么用。
  • 生產者將消息發送至交換器;交換器再發送至隊列,最后發送至消費者
  • 交換器有四種類型,fanout、direct、topic三種類型,header類型沒用過,不關注。
fanout
一對多,根據綁定發送到每一個隊列,
常用於發布訂閱

direct
默認模式,一對一關系,根據routingkey與bindingjkey
一一對應匹配,發送消息

關於topic模式
以 ‘.’ 來分割單詞。
‘#’ 表示一個或多個單詞。
‘*’ 表示一個單詞。
如:
RoutingKey為:
aaa.bbb.ccc
BindingKey可以為:
*.bbb.ccc
aaa.#

3.庫中重要的方法

  • 創建交換器
func (ch *Channel) ExchangeDeclare(
	name string,  //交換器的名稱
	kind string, //表示交換器的類型。有四種常用類型:direct、fanout、topic、headers
	durable bool, //是否持久化,true表示是。持久化表示會把交換器的配置存盤,當RMQ Server重啟后,會自動加載交換器
	autoDelete bool, //是否自動刪除,true表示,當所有綁定都與交換器解綁后,會自動刪除此交換器。
	internal bool,  //是否為內部,true表示是。客戶端無法直接發送msg到內部交換器,只有交換器可以發送msg到內部交換器。
	noWait bool, //是否非阻塞, 阻塞:表示創建交換器的請求發送后,阻塞等待RMQ Server返回信息。非阻塞:不會阻塞等待RMQ
	args Table
) error
  • 創建隊列
func (ch *Channel) QueueDeclare(
	name string,  //隊列名稱
	durable bool,  //是否持久化,true為是。持久化會把隊列存盤,服務器重啟后,不會丟失隊列以及隊列內的信息
	autoDelete bool,  //是否刪除,當所有消費者都斷開時,隊列會自動刪除。
	exclusive bool,   //是否排他,true為是。如果設置為排他,則隊列僅對首次聲明他的連接可見,並在連接斷開時自動刪除。
	noWait bool, //是否非阻塞
	args Table) (Queue, error)
  • 隊列與交換器綁定,key,表示要綁定的鍵,交換器以此來分發
func (ch *Channel) QueueBind(
	name,  //隊列名字,確定哪個隊列
	key, // 對應圖中BandingKey,表示要綁定的鍵。
	exchange string,  //交換器的名字
	noWait bool,  //是否非阻塞
	args Table) error
  • 交換器之間的綁定
func (ch *Channel) ExchangeBind(
	destination,  //目的交換器,通常是內部交換器。
	key,    //對應BandingKey,表示要綁定的鍵。
	source string,  //源交換器
	noWait bool,   //是否非阻塞
	args Table) error
  • 發送消息
func (ch *Channel) Publish(
		exchange,  //要發送的交換機
		key string,  //路由鍵,與之相關的綁定鍵對應
		mandatory, 
		immediate bool, 
		msg Publishing   //要發送的消息,msg對應一個Publishing結構
		) error
		
//Publishing 結構體
type Publishing struct {
        Headers Table
        // Properties
        ContentType     string  //消息的類型,通常為“text/plain”
        ContentEncoding string  //消息的編碼,一般默認不用寫
        DeliveryMode    uint8   //消息是否持久化,2表示持久化,0或1表示非持久化。
        Body []byte  //消息主體
        Priority        uint8  //消息的優先級 0 to 9
        CorrelationId   string    // correlation identifier
        ReplyTo         string    // address to to reply to (ex: RPC)
        Expiration      string    // message expiration spec
        MessageId       string    // message identifier
        Timestamp       time.Time // message timestamp
        Type            string    // message type name
        UserId          string    // creating user id - ex: "guest"
        AppId           string    // creating application id
}
		
  • 消費者接收消息--推模式
func (ch *Channel) Consume(
	queue string,  //隊列名稱 
	consumer string,  //消費者標簽,用於區分不同的消費者
	autoAck string,  //是否自動回復ACK,true為是,回復ACK表示高速服務器我收到消息了。建議為false,手動回復,這樣可控性強
	exclusive bool,  //設置是否排他,排他表示當前隊列只能給一個消費者使用
	noLocal bool, //如果為true,表示生產者和消費者不能是同一個connect
	noWait bool, //是否非阻塞
	args Table) (<-chan Delivery, error)
  • 消費者接收消息--拉模式
func (ch *Channel) Get(
	queue string, 
	autoAck bool) (msg Delivery, ok bool, err error)
  • 手動回復消息
func (ch *Channel) Ack(tag uint64, multiple bool) error

func (me Delivery) Ack(multiple bool) error {
        if me.Acknowledger == nil {
                return errDeliveryNotInitialized
        }
        return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}

func (d Delivery) Reject(requeue bool) error
Publish – mandatory參數
  • false:當消息無法通過交換器匹配到隊列時,會丟棄消息。
  • true:當消息無法通過交換器匹配到隊列時,會調用basic.return通知生產者。
  • 注:不建議使用,因會使程序邏輯變得復雜,可以通過備用交換機來實現類似的功能。
Publish – immediate參數
  • true:當消息到達Queue后,發現隊列上無消費者時,通過basic.Return返回給生產者。

  • false:消息一直緩存在隊列中,等待生產者。

  • 注:不建議使用此參數,遇到這種情況,可用TTL和DLX方法代替(后面會介紹

  • Qos
    func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

  • 注意:這個在推送模式下非常重要,通過設置Qos用來防止消息堆積。

  • prefetchCount:消費者未確認消息的個數。

  • prefetchSize :消費者未確認消息的大小。

  • global :是否全局生效,true表示是。全局生效指的是針對當前connect里的所有channel都生效

4.代碼示例

生產者

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

消費者

package main

import (
	"github.com/streadway/amqp"
	"log"
)

func main() {
	conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	DealWithError(err,"Failed to connect to RabbitMQ")
	defer conn.Close()

	ch,err := conn.Channel()
	DealWithError(err,"Failed to open a channel")
	defer ch.Close()
	//聲明交換器
	ch.ExchangeDeclare(
		"logs",
		"fanout",
		true,
		false,
		false,
		false,
		nil,
		)
	DealWithError(err,"Failed to declare an exchange")
	//聲明了隊列
	q,err := ch.QueueDeclare(
		"", //隊列名字為rabbitMQ自動生成
		false,
		false,
		true,
		false,
		nil,
		)
	DealWithError(err,"Failed to declare an exchange")
	//交換器跟隊列進行綁定,交換器將接收到的消息放進隊列中
	err = ch.QueueBind(
		q.Name,
		"",
		"logs",
		false,
		nil,
		)
	DealWithError(err,"Failed to bind a queue")
	msgs,err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
		)
	DealWithError(err,"Failed to register a consumer")
	forever := make(chan bool)
	go func() {
		for d := range msgs{
			log.Printf(" [x] %s",d.Body)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

func DealWithError(err error,msg string)  {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM