Go RabbitMQ(三)發布訂閱模式


RabbitMQ

  • 在上一節中我們創建了工作隊列,並且假設每一個任務都能夠准確的到達對應的worker。在本節中我們將介紹如何將一個消息傳遞到多個消費者,這也就是所說的發布訂閱模式
  • 為了驗證該模式我們使用兩個建立一個簡單的打印系統,一個負責發出消息,另一個負責接收並打印。在該系統多個receiver中,其中一個直接將日志寫入到硬盤,另一個負責從屏幕上查看日志
  • 在之前的簡介中,我們可以作以下簡單總結:
    • 生產者負責發送消息
    • 隊列是一個存儲消息的緩沖區
    • 消費者負責接收消息

RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列,實際上,通常生產者甚至不知道消息是否被傳遞到某個隊列。

相反,生產者只能向交換器發送消息。交換器一邊接收來自生產者發布的消息一邊將消息放入到隊列當中。可以通過exchangeType來設置交換器對消息的處理,比如拼接到指定的隊列,或是拼接到多個隊列中,或是丟棄。

exchange Type有以下幾種:direct,topic,headers,fanout。我們先使用最后一種創建相應的交換器並取名logs:

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

fanout模式就是廣播所有接收到的消息到它已知的所有隊列當中

使用以下命令可以羅列RabbitMQ中所有的交換器:
sudo rabbitmqctl list_exchanges

在之前的例子中我們沒有使用交換器但是依舊可以發送消息到隊列當中,說明我們已經使用了默認的交換器,我們可以看下以前的代碼:

err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

在這里我們使用了默認的交換器:消息將被依據routering_key指定的名字路由到隊列中.

一旦我們定義好了交換器,則可以在生產者發送消息的時候使用:

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange
  "",     // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

臨時隊列

我們想要獲取所有日志消息不只是子集,同時我們只對當前的信息流感興趣,為了解決這個問題我們需要兩個東西:

首先,我們需要一個新的空的隊列不管我們是否有鏈接Rabbit,我們可以使用一個隨機名字創建一個隊列,或是讓系統指定給我們

其次,一旦我們斷開與消費者的鏈接,隊列必須自動刪除。

在amqp客戶端中,當我們使用一個空的名字創建一個隊列的時候:

q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when usused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)

當我們得到其返回的隊列的時候,隊列實例將會包含一個由RabbitMQ產生的名字,差不多這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
當我們鏈接關閉的時候,隊列將被刪除因為它被聲明為exclusive

綁定

在前面我們已經創建了一個fanout類型的交換器和一個隊列,接下來我們我們需要讓交換器將消息發送到我們隊列中,將交換器(exchange)和隊列(queue)關聯起來稱為綁定

err = ch.QueueBind(
  q.Name, // 隊列名 name
  "",     // routing key
  "logs", // 交換器名
  false,
  nil
)

經過以上關聯之后,logs交換器就會將消息拼接到我們的隊列當中。

羅列出所有的綁定:
rabbitmqctl list_bindings

完整代碼如下:

emit.go

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

receive.go

package main

import (
	"github.com/streadway/amqp"
	"log"
)

func main() {
	conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	DealWithError(err,"Failed to connect to RabbitMQ")
	defer conn.Close()

	ch,err := conn.Channel()
	DealWithError(err,"Failed to open a channel")
	defer ch.Close()
	//聲明交換器
	ch.ExchangeDeclare(
		"logs",
		"fanout",
		true,
		false,
		false,
		false,
		nil,
		)
	DealWithError(err,"Failed to declare an exchange")
	//聲明了隊列
	q,err := ch.QueueDeclare(
		"", //隊列名字為rabbitMQ自動生成
		false,
		false,
		true,
		false,
		nil,
		)
	DealWithError(err,"Failed to declare an exchange")
	//交換器跟隊列進行綁定,交換器將接收到的消息放進隊列中
	err = ch.QueueBind(
		q.Name,
		"",
		"logs",
		false,
		nil,
		)
	DealWithError(err,"Failed to bind a queue")
	msgs,err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
		)
	DealWithError(err,"Failed to register a consumer")
	forever := make(chan bool)
	go func() {
		for d := range msgs{
			log.Printf(" [x] %s",d.Body)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

func DealWithError(err error,msg string)  {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM