golang操作RabbitMQ--Simple模式與Work模式


MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。RabbitMQ是MQ的一種。RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。

1.創建RabbitMQ實例

package RabbitMQ

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const MQURL = "amqp://testuser:123456@127.0.0.1:5672/my_vhost"

type RabbitMQ struct {
	conn    *amqp.Connection
	channel *amqp.Channel
	//隊列名稱
	QueueName string
	//交換機
	Exchange string
	//key
	key string
	//連接信息
	Mqurl string
}

//創建RabbitMQ結構體實例
func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, key: key, Mqurl: MQURL}
	var err error
	//創建RabbitMQ連接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "創建連接錯誤!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "獲取channel失敗!")
	return rabbitmq
}

//斷開channel和connection
func (r *RabbitMQ) Destroy() {
	r.channel.Close()
	r.conn.Close()
}

//錯誤處理函數
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", message, err)
		panic(fmt.Sprintf("%s,%s", message, err))
	}
}

2.Simple模式

//創建簡單模式下RabbitMQ實例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

//簡單模式下生產代碼
func (r *RabbitMQ) PublishSimple(message string) {
	//1.申請隊列,如果隊列不存在會自動創建,如果存在則跳過創建
	//保證隊列存在,消息隊列能發送到隊列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//額外屬性
		nil,
	)
	if err != nil {
		fmt.Println("QueueDeclare:", err)
	}

	//2.發送消息到隊列中
	err = r.channel.Publish(
		r.Exchange,
		r.QueueName,
		//如果為true,根據exchange類型和routekey規則,如果無法找到符合條件的隊列那么會把發送的消息返回給發送者
		false,
		//如果為true,當exchange發送消息隊列到隊列后發現隊列上沒有綁定消費者,則會把消息發還給發送者
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	if err != nil {
		fmt.Println("Publish:", err)
	}
}

//簡單模式下消費代碼
func (r *RabbitMQ) ConsumeSimple() {
	//1.申請隊列,如果隊列不存在會自動創建,如果存在則跳過創建
	//保證隊列存在,消息隊列能發送到隊列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//額外屬性
		nil)
	if err != nil {
		fmt.Println("QueueDeclare:", err)
	}

	//2.接受消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用來區分多個消費者
		"",
		//是否自動應答
		true,
		//是否具有排他性
		false,
		//如果設置為true,表示不能將同一個connection中發送消息傳遞給這個connection中的消費者
		false,
		//隊列消費是否阻塞
		false,
		nil)
	if err != nil {
		fmt.Println("Consume:", err)
	}

	forever := make(chan bool)
	//3.啟用協程處理消息
	go func() {
		for d := range msgs {
			//實現我們要處理的邏輯函數
			log.Printf("Received a message:%s", d.Body)
		}
	}()
	log.Printf("[*] Waiting for messages,To exit press CTRL+C\n")
	<-forever
}
  • 簡單模式publish
package main

import (
	"fmt"
	"go-rabbitmq/RabbitMQ"
)

func main()  {
	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
	rabbitmq.PublishSimple("Hello test!")
	fmt.Println("發送成功!")
}

  • 簡單模式recevie
package main

import "go-rabbitmq/RabbitMQ"

func main() {
	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
	rabbitmq.ConsumeSimple()
}

3.Work模式

work模式生產消費代碼與simple模式相同

  • work模式publish
package main

import (
	"fmt"
	"go-rabbitmq/RabbitMQ"
	"strconv"
	"time"
)

func main() {
	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")

	for i := 0; i <= 100; i++ {
		rabbitmq.PublishSimple("Hello test!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}
  • work模式receive1
package main

import "go-rabbitmq/RabbitMQ"

func main() {
	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
	rabbitmq.ConsumeSimple()
}
  • work模式receive2
package main

import "go-rabbitmq/RabbitMQ"

func main() {
	rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
	rabbitmq.ConsumeSimple()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM