MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。RabbitMQ是MQ的一種。RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。
1.創建RabbitMQ實例
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
const MQURL = "amqp://testuser:123456@127.0.0.1:5672/my_vhost"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
//隊列名稱
QueueName string
//交換機
Exchange string
//key
key string
//連接信息
Mqurl string
}
//創建RabbitMQ結構體實例
func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, key: key, Mqurl: MQURL}
var err error
//創建RabbitMQ連接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "創建連接錯誤!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "獲取channel失敗!")
return rabbitmq
}
//斷開channel和connection
func (r *RabbitMQ) Destroy() {
r.channel.Close()
r.conn.Close()
}
//錯誤處理函數
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s,%s", message, err))
}
}
2.Simple模式
//創建簡單模式下RabbitMQ實例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
//簡單模式下生產代碼
func (r *RabbitMQ) PublishSimple(message string) {
//1.申請隊列,如果隊列不存在會自動創建,如果存在則跳過創建
//保證隊列存在,消息隊列能發送到隊列中
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否為自動刪除
false,
//是否具有排他性
false,
//是否阻塞
false,
//額外屬性
nil,
)
if err != nil {
fmt.Println("QueueDeclare:", err)
}
//2.發送消息到隊列中
err = r.channel.Publish(
r.Exchange,
r.QueueName,
//如果為true,根據exchange類型和routekey規則,如果無法找到符合條件的隊列那么會把發送的消息返回給發送者
false,
//如果為true,當exchange發送消息隊列到隊列后發現隊列上沒有綁定消費者,則會把消息發還給發送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
fmt.Println("Publish:", err)
}
}
//簡單模式下消費代碼
func (r *RabbitMQ) ConsumeSimple() {
//1.申請隊列,如果隊列不存在會自動創建,如果存在則跳過創建
//保證隊列存在,消息隊列能發送到隊列中
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否為自動刪除
false,
//是否具有排他性
false,
//是否阻塞
false,
//額外屬性
nil)
if err != nil {
fmt.Println("QueueDeclare:", err)
}
//2.接受消息
msgs, err := r.channel.Consume(
r.QueueName,
//用來區分多個消費者
"",
//是否自動應答
true,
//是否具有排他性
false,
//如果設置為true,表示不能將同一個connection中發送消息傳遞給這個connection中的消費者
false,
//隊列消費是否阻塞
false,
nil)
if err != nil {
fmt.Println("Consume:", err)
}
forever := make(chan bool)
//3.啟用協程處理消息
go func() {
for d := range msgs {
//實現我們要處理的邏輯函數
log.Printf("Received a message:%s", d.Body)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRL+C\n")
<-forever
}
- 簡單模式publish
package main
import (
"fmt"
"go-rabbitmq/RabbitMQ"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
rabbitmq.PublishSimple("Hello test!")
fmt.Println("發送成功!")
}
- 簡單模式recevie
package main
import "go-rabbitmq/RabbitMQ"
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
rabbitmq.ConsumeSimple()
}
3.Work模式
work模式生產消費代碼與simple模式相同
- work模式publish
package main
import (
"fmt"
"go-rabbitmq/RabbitMQ"
"strconv"
"time"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
for i := 0; i <= 100; i++ {
rabbitmq.PublishSimple("Hello test!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
- work模式receive1
package main
import "go-rabbitmq/RabbitMQ"
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
rabbitmq.ConsumeSimple()
}
- work模式receive2
package main
import "go-rabbitmq/RabbitMQ"
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
rabbitmq.ConsumeSimple()
}