golang操作RabbitMQ--話題模式


創建連接及RabbitMQ結構體實例代碼見 https://www.cnblogs.com/prince5460/p/11895844.html

1.創建話題模式RabbitMQ實例

func NewRabbitMQTopic(exchangeName, routingKey string) *RabbitMQ {
	//創建RabbitMQ實例
	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
	var err error
	//獲取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
	//獲取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "Failed to open a channel!")
	return rabbitmq
}

2.話題模式發送消息

func (r *RabbitMQ) PublishTopic(message string) {
	//1.嘗試創建交換機
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		//要改成topic
		"topic",
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare an exchange!")

	//2.發送消息
	err = r.channel.Publish(
		r.Exchange,
		//要設置
		r.key,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
}

3.話題模式接收消息

//要注意key規則
//其中"*"用於匹配一個單詞,"#"用於匹配多個單詞(可以是零個)
//匹配test.*表示匹配test.hello,但是test.hello.one需要用test.#才能匹配到
func (r *RabbitMQ) ReceiveTopic() {
	//1.試探性創建交換機
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		//交換機類型
		"topic",
		true,
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare an exchange!")

	//2.試探性創建隊列,注意隊列名稱不要寫
	q, err := r.channel.QueueDeclare(
		"", //隨機生成隊列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare an exchange!")

	//3.綁定隊列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//需要綁定key
		r.key,
		r.Exchange,
		false,
		nil,
	)

	//4.消費消息
	messages, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	forever := make(chan bool)

	go func() {
		for d := range messages {
			log.Printf("Received a message :%s", d.Body)
		}
	}()

	fmt.Println("[*] Waiting for messages,To exit press CTRL+C")
	<-forever
}

4.測試代碼

  • Publish
package main

import (
	"fmt"
	"go-rabbitmq/RabbitMQ"
	"strconv"
	"time"
)

func main() {
	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.one")
	testTwo := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.two")
	for i := 0; i <= 10; i++ {
		testOne.PublishTopic("Hello test topic one:" + strconv.Itoa(i))
		testTwo.PublishTopic("Hello test topic two:" + strconv.Itoa(i))
		time.Sleep(time.Second)
		fmt.Println("publish:", i)
	}
}
  • ReceiveAll
package main

import "go-rabbitmq/RabbitMQ"

func main() {
	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic","#")
	testOne.ReceiveTopic()
}

  • ReveiveOne
package main

import "go-rabbitmq/RabbitMQ"

func main() {
	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.*.one")
	testOne.ReceiveTopic()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM