1.RMQ的安裝
docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=用戶名 -e RABBITMQ_DEFAULT_PASS=密碼 rabbitmq:3-management
- 三個端口映射,分別表示
5672:連接生產者、消費者的端口
15672:WEB管理頁面的端口
25672:分布式集群的端口
2.基本概念
- amqp:高級消息隊列協議,即一種消息中間件協議,RMQ是amqp協議的一個具體實現。RMQ使用Erlang語言實現的,具有很好的並發能力,具體歷史請百度,這里主要關心怎么用。
- 生產者將消息發送至交換器;交換器再發送至隊列,最后發送至消費者
- 交換器有四種類型,fanout、direct、topic三種類型,header類型沒用過,不關注。
fanout
一對多,根據綁定發送到每一個隊列,
常用於發布訂閱
direct
默認模式,一對一關系,根據routingkey與bindingjkey
一一對應匹配,發送消息
關於topic模式
以 ‘.’ 來分割單詞。
‘#’ 表示一個或多個單詞。
‘*’ 表示一個單詞。
如:
RoutingKey為:
aaa.bbb.ccc
BindingKey可以為:
*.bbb.ccc
aaa.#
3.庫中重要的方法
- 創建交換器
func (ch *Channel) ExchangeDeclare(
name string, //交換器的名稱
kind string, //表示交換器的類型。有四種常用類型:direct、fanout、topic、headers
durable bool, //是否持久化,true表示是。持久化表示會把交換器的配置存盤,當RMQ Server重啟后,會自動加載交換器
autoDelete bool, //是否自動刪除,true表示,當所有綁定都與交換器解綁后,會自動刪除此交換器。
internal bool, //是否為內部,true表示是。客戶端無法直接發送msg到內部交換器,只有交換器可以發送msg到內部交換器。
noWait bool, //是否非阻塞, 阻塞:表示創建交換器的請求發送后,阻塞等待RMQ Server返回信息。非阻塞:不會阻塞等待RMQ
args Table
) error
- 創建隊列
func (ch *Channel) QueueDeclare(
name string, //隊列名稱
durable bool, //是否持久化,true為是。持久化會把隊列存盤,服務器重啟后,不會丟失隊列以及隊列內的信息
autoDelete bool, //是否刪除,當所有消費者都斷開時,隊列會自動刪除。
exclusive bool, //是否排他,true為是。如果設置為排他,則隊列僅對首次聲明他的連接可見,並在連接斷開時自動刪除。
noWait bool, //是否非阻塞
args Table) (Queue, error)
- 隊列與交換器綁定,key,表示要綁定的鍵,交換器以此來分發
func (ch *Channel) QueueBind(
name, //隊列名字,確定哪個隊列
key, // 對應圖中BandingKey,表示要綁定的鍵。
exchange string, //交換器的名字
noWait bool, //是否非阻塞
args Table) error
- 交換器之間的綁定
func (ch *Channel) ExchangeBind(
destination, //目的交換器,通常是內部交換器。
key, //對應BandingKey,表示要綁定的鍵。
source string, //源交換器
noWait bool, //是否非阻塞
args Table) error
- 發送消息
func (ch *Channel) Publish(
exchange, //要發送的交換機
key string, //路由鍵,與之相關的綁定鍵對應
mandatory,
immediate bool,
msg Publishing //要發送的消息,msg對應一個Publishing結構
) error
//Publishing 結構體
type Publishing struct {
Headers Table
// Properties
ContentType string //消息的類型,通常為“text/plain”
ContentEncoding string //消息的編碼,一般默認不用寫
DeliveryMode uint8 //消息是否持久化,2表示持久化,0或1表示非持久化。
Body []byte //消息主體
Priority uint8 //消息的優先級 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
}
- 消費者接收消息--推模式
func (ch *Channel) Consume(
queue string, //隊列名稱
consumer string, //消費者標簽,用於區分不同的消費者
autoAck string, //是否自動回復ACK,true為是,回復ACK表示高速服務器我收到消息了。建議為false,手動回復,這樣可控性強
exclusive bool, //設置是否排他,排他表示當前隊列只能給一個消費者使用
noLocal bool, //如果為true,表示生產者和消費者不能是同一個connect
noWait bool, //是否非阻塞
args Table) (<-chan Delivery, error)
- 消費者接收消息--拉模式
func (ch *Channel) Get(
queue string,
autoAck bool) (msg Delivery, ok bool, err error)
- 手動回復消息
func (ch *Channel) Ack(tag uint64, multiple bool) error
func (me Delivery) Ack(multiple bool) error {
if me.Acknowledger == nil {
return errDeliveryNotInitialized
}
return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}
func (d Delivery) Reject(requeue bool) error
Publish – mandatory參數
- false:當消息無法通過交換器匹配到隊列時,會丟棄消息。
- true:當消息無法通過交換器匹配到隊列時,會調用basic.return通知生產者。
- 注:不建議使用,因會使程序邏輯變得復雜,可以通過備用交換機來實現類似的功能。
Publish – immediate參數
-
true:當消息到達Queue后,發現隊列上無消費者時,通過basic.Return返回給生產者。
-
false:消息一直緩存在隊列中,等待生產者。
-
注:不建議使用此參數,遇到這種情況,可用TTL和DLX方法代替(后面會介紹
-
Qos
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
-
注意:這個在推送模式下非常重要,通過設置Qos用來防止消息堆積。
-
prefetchCount:消費者未確認消息的個數。
-
prefetchSize :消費者未確認消息的大小。
-
global :是否全局生效,true表示是。全局生效指的是針對當前connect里的所有channel都生效
4.代碼示例
生產者
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
消費者
package main
import (
"github.com/streadway/amqp"
"log"
)
func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
DealWithError(err,"Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
DealWithError(err,"Failed to open a channel")
defer ch.Close()
//聲明交換器
ch.ExchangeDeclare(
"logs",
"fanout",
true,
false,
false,
false,
nil,
)
DealWithError(err,"Failed to declare an exchange")
//聲明了隊列
q,err := ch.QueueDeclare(
"", //隊列名字為rabbitMQ自動生成
false,
false,
true,
false,
nil,
)
DealWithError(err,"Failed to declare an exchange")
//交換器跟隊列進行綁定,交換器將接收到的消息放進隊列中
err = ch.QueueBind(
q.Name,
"",
"logs",
false,
nil,
)
DealWithError(err,"Failed to bind a queue")
msgs,err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
DealWithError(err,"Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs{
log.Printf(" [x] %s",d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
func DealWithError(err error,msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}