前言:如果你對rabbitmq基本概念都不懂,可以移步此篇博文查閱消息隊列RabbitMQ
一、單發單收
在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個隊列-RabbitMQ代表使用者保留的消息緩沖區。
單發單收模式下:一發一收
發送端只需要創建隊列,然后向隊列發送消息。
接收端也需要創建隊列,因為如果接收端先啟動,沒有此隊列就會報錯,雖然發送端和接收端都創建此隊列,但rabbitmq還是很智能的,它只會創建一次。
需要注意的地方:
1.發送端和接收端都需要創建同名隊列
2.接收端指定從這個同名隊列中接收消息

發送端
package main
import (
"RabbitMQ"
"strconv"
"strings"
"time"
)
func main(){
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
send_mq := rabbitMQ.New("鏈接","hello")
i := 0
for{
time.Sleep(time.Second*5)
greetings := []string{"Helloworld!",strconv.Itoa(i)}
send_mq.Send("hello",strings.Join( greetings, " "))
i = i+1
}
}
接收端
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
}
}
二、工作隊列Work Queue
工作隊列和單發單收模式比起來,接收端可以有多個,接收端多了以后就會出現數據分配問題,發過來的數據到底該被哪個接收端接收,所以有兩種模式:
公平分發:每個接收端接收消息的概率是相等的,發送端會循環依次給每個接收端發送消息,圖一是公平分發,公平分發是rabbitmq默認模式。
公平派遣:保證接收端在處理完某個任務,並發送確認信息后,RabbitMQ才會向它推送新的消息,在此之間若是有新的消息話,將會被推送到其它接收端,若所有的接收端都在處理任務,那么就會等待,圖二為公平派遣。
注意:使用公平派遣模式時,消費者設置atuoack為false,需要手動回復ack。
關閉自動應答是為了消費者邏輯處理結束前不接受下一條消息,這樣哪個消費者邏輯處理的快,接收的消息自然就多,從而實現公平分發。
圖一:

圖二:

公平分發模式下的發送端和接收端
發送端
package main
import (
"RabbitMQ"
"strconv"
"strings"
"time"
)
func main(){
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
send_mq := rabbitMQ.New("鏈接","hello")
i := 0
for{
time.Sleep(time.Second*5)
greetings := []string{"Helloworld!",strconv.Itoa(i)}
send_mq.Send("hello",strings.Join( greetings, " "))
i = i+1
}
}
接收端1
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie 1 Received a message: %s", d.Body)
}
}()
}
}
接收端2
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie 1 Received a message: %s", d.Body)
}
}()
}
}
公平派遣模式下的發送端和接收端
公平派遣模式下發送端與公平分發相同,接收端只需要加一端配置代碼
我們可以將預取計數設置為1。這告訴RabbitMQ一次不要給工人一個以上的消息。換句話說,在處理並確認上一條消息之前,不要將新消息發送給工作人員。而是將其分派給不忙的下一個工作程序。
//配置隊列參數
func (q *RabbitMQ)Qos(){
e := q.channel.Qos(1,0,false)
failOnError(e,"無法設置QoS")
}
接收端
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
//配置公平派遣
receive_mq.Qos()
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie 2 Received a message: %s", d.Body)
}
}()
}
}
官方在這里介紹了出現以下兩種問題的解決辦法:
1.當接收者掛掉的時候,我們將丟失發送給接收端還沒有處理的消息。
2.當rabbitmq服務器掛了,我們怎么保證我們的消息不丟失。
具體參考:https://www.rabbitmq.com/tutorials/tutorial-two-go.html
三、發布/訂閱 Publish/Subscribe
發布訂閱模式下多了一個概念:exchange,如何理解這個exchange,exchange的作用就是類似路由器,發送端發送消息需要帶有routing key 就是路由鍵,服務器會根據路由鍵將消息從交換器路由到隊列上去,所以發送端和接收端之間有了中介。

exchange有多個種類:direct,fanout,topic,header(非路由鍵匹配,功能和direct類似,很少用)。
首先介紹exchange下的fanout exchange,它會將發到這個exchange的消息廣播到關注此exchange的所有接收端上。
廣播模式下(1:N):
發送端連接到rabbitmq后,創建exchange,需要指定交換機的名字和類型,fanout為廣播,然后向此exchange發送消息,其它就不用管了。
接收端的執行流程在程序備注中。
注意:廣播模式下的exchange是發送端是不需要帶路由鍵的哦。
package main
import (
"RabbitMQ"
"strconv"
"strings"
"time"
)
func main(){
ch := rabbitMQ.Connect("amqp://user:password@ip:port/")
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
i := 0
for{
time.Sleep(1)
greetings := []string{"Helloworld!",strconv.Itoa(i)}
ch.Publish("exchange1",strings.Join( greetings, " "),"")
i = i+1
}
}
接收端1
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先創建自己隊列
// 2.創建交換機
// 3.將自己綁定到交換機上
// 4.接收交換機上發過來的消息
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
//2
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
//3
// 隊列綁定到exchange
receive_mq.Bind("exchange1","")
//4
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
}
}
接收端2
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先創建自己隊列
// 2.創建交換機
// 3.將自己綁定到交換機上
// 4.接收交換機上發過來的消息
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
//3
// 隊列綁定到exchange
receive_mq.Bind("exchange1","")
//4
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
}
}
四、路由Routing
路由模式其實就是全值匹配模式(direct),發送端發送消息需要帶有路由鍵,就是下面發送端程序的routing key1,是一個字符串,發送端發給exchange,路由模式下的exchange會匹配這個路由鍵,如下面這個圖,發送者發送時帶有orange此路由鍵時,這條消息只會被轉發給Q1隊列,如果路由鍵沒有匹配上的怎么辦?,全值匹配,沒有匹配到,那么所有接收者都接收不到消息,消息只會發送給匹配的隊列,接收端的路由鍵是綁定exchange的時候用的。
注意:接收隊列可以綁定多個路由鍵到exchange上,比如下面,當發送路由鍵為black,green,會被Q2接收。
發送端
package main
import (
"RabbitMQ"
"strconv"
"strings"
"time"
)
func main(){
ch := rabbitMQ.Connect("amqp://user:password@ip:port/")
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")
i := 0
for{
time.Sleep(1)
greetings := []string{"Helloworld!",strconv.Itoa(i)}
if i%2 ==1 {
//如果是奇數
ch.Publish("exchange",strings.Join( greetings, " "),"routing key1")
} else{
ch.Publish("exchange",strings.Join( greetings, " "),"routing key2")
}
i = i+1
}
}
接收端1
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己隊列
// 2.創建交換機
// 3.將自己綁定到交換機上
// 4.接收交換機上發過來的消息
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")
//3
receive_mq.Bind("exchange","routing key1")
//4
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
}
}
接收端2
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己隊列
// 2.創建交換機
// 3.將自己綁定到交換機上
// 4.接收交換機上發過來的消息
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")
//3
receive_mq.Bind("exchange","routing key2")
//4
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
}
}
五、Topic類型的exchange
前面的direct是全值匹配,那么topic就可以部分匹配,又可以全值匹配,比direct更加靈活。
消息發送到topic類型的exchange上時不能隨意指定routing_key(一定是指由一系列由點號連接單詞的字符串,單詞可以是任意的,但一般都會與消息或多或少的有些關聯)。Routing key的長度不能超過255個字節。
Binding key也一定要是同樣的方式。Topic類型的exchange就像一個直接的交換:一個由生產者指定了確定routing key的消息將會被推送給所有Binding key能與之匹配的消費者。然而這種綁定有兩種特殊的情況:
- *(星號):可以(只能)匹配一個單詞
- #(井號):可以匹配多個單詞(或者零個)
下邊來舉個例子:

在這個例子中,我們將會發送一些描述動物的消息。Routing key的第一個單詞是描述速度的,第二個單詞是描述顏色的,第三個是描述物種的:“<speed>.<colour>.<species>”。
這里我們創建三個Binding:Binding key為”*.orange.*”的Q1,和binding key為”*.*.rabbit”和”lazy.#”的Q2。
這些binding可以總結為:
- Q1對所有橘色的(orange)的動物感興趣;
- Q2希望能拿到所有兔子的(rabbit)信息,還有比較懶惰的(lazy.#)動物信息。
一條以” quick.orange.rabbit”為routing key的消息將會推送到Q1和Q2兩個queue上,routing key為“lazy.orange.elephant”的消息同樣會被推送到Q1和Q2上。但如果routing key為”quick.orange.fox”的話,消息只會被推送到Q1上;routing key為”lazy.brown.fox”的消息會被推送到Q2上,routing key為"lazy.pink.rabbit”的消息也會被推送到Q2上,但同一條消息只會被推送到Q2上一次。
如果在發送消息時所指定的exchange和routing key在消費者端沒有對應的exchange和binding key與之綁定的話,那么這條消息將會被丟棄掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing為”lazy.orange.male.rabbit”的消息,將會被推到Q2上。
Topic類型的exchange:
Topic類型的exchange是很強大的,也可以實現其它類型的exchange。
- 當一個隊列被綁定為binding key為”#”時,它將會接收所有的消息,此時和fanout類型的exchange很像。
- 當binding key不包含”*”和”#”時,這時候就很像direct類型的exchange。
發送端
package main
import (
"RabbitMQ"
"time"
)
func main(){
ch := rabbitMQ.Connect("amqp://user:password@ip/")
rabbitMQ.NewExchange("amqp://user:password@ip/","exchange","topic")
for{
time.Sleep(1)
ch.Publish("exchange","hello world","lazy.brown.fox")
}
}
接收端
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己隊列
// 2.創建交換機
// 3.將自己綁定到交換機上
// 4.接收交換機上發過來的消息
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
//2
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.orange.*")
//4
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
}
}
接收端2
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己隊列
// 2.創建交換機
// 3.將自己綁定到交換機上
// 4.接收交換機上發過來的消息
//第一個參數指定rabbitmq服務器的鏈接,第二個參數指定創建隊列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.*.rabbit")
receive_mq.Bind("exchange","lazy.#")
//4
for{
//接收消息時,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
}
}
六、rabbitmq部分封裝代碼及准備工作
目錄參考:

准備工作:
1.我們再創建go項目時,首先指定gopath目錄,然后在目錄下創建bin、src、pkg目錄。
2.下載github.com/streadway/amqp包,會自動添加到項目的pkg目錄下。
go get github.com/streadway/amqp
3.在rabbitmq服務器上創建用戶,指定管理員,並賦予訪問權限。
4.rabbitmq封裝
package rabbitMQ
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
)
//聲明隊列類型
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
//連接服務器
func Connect(s string) * RabbitMQ{
//連接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"連接Rabbitmq服務器失敗!")
ch ,e :=conn.Channel()
failOnError(e,"無法打開頻道!")
mq := new(RabbitMQ)
mq.channel =ch
return mq
}
//初始化單個消息隊列
//第一個參數:rabbitmq服務器的鏈接,第二個參數:隊列名字
func New(s string,name string) * RabbitMQ{
//連接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"連接Rabbitmq服務器失敗!")
ch ,e :=conn.Channel()
failOnError(e,"無法打開頻道!")
q,e := ch.QueueDeclare(
name,//隊列名
false,//是否開啟持久化
true,//不使用時刪除
false, //排他
false, //不等待
nil, //參數
)
failOnError(e,"初始化隊列失敗!")
mq := new(RabbitMQ)
mq.channel =ch
mq.Name =q.Name
return mq
}
//批量初始化消息隊列
//第一個參數:rabbitmq服務器的鏈接,第二個參數:隊列名字列表
//聲明交換機
func (q *RabbitMQ)QueueDeclare(queue string){
_,e := q.channel.QueueDeclare(queue,false,true,false,false,nil)
failOnError(e,"聲明交換機!")
}
//刪除交換機
func (q *RabbitMQ)QueueDelete(queue string){
_,e := q.channel.QueueDelete(queue,false,true,false)
failOnError(e,"刪除隊列失敗!")
}
//配置隊列參數
func (q *RabbitMQ)Qos(){
e := q.channel.Qos(1,0,false)
failOnError(e,"無法設置QoS")
}
//配置交換機參數
//初始化交換機
//第一個參數:rabbitmq服務器的鏈接,第二個參數:交換機名字,第三個參數:交換機類型
func NewExchange(s string,name string,typename string){
//連接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"連接Rabbitmq服務器失敗!")
ch ,e :=conn.Channel()
failOnError(e,"無法打開頻道!")
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e,"初始化交換機失敗!")
}
//刪除交換機
func (q *RabbitMQ)ExchangeDelete(exchange string){
e := q.channel.ExchangeDelete(exchange,false,true)
failOnError(e,"綁定隊列失敗!")
}
//綁定消息隊列到哪個exchange
func (q *RabbitMQ)Bind(exchange string,key string){
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e,"綁定隊列失敗!")
q.exchange = exchange
}
//向消息隊列發送消息
//Send方法可以往某個消息隊列發送消息
func (q *RabbitMQ) Send(queue string,body interface{}){
str,e := json.Marshal(body)
failOnError(e,"消息序列化失敗!")
e = q.channel.Publish(
"",//交換
queue,//路由鍵
false, //必填
false, //立即
amqp.Publishing{
ReplyTo:q.Name,
Body:[]byte(str),
})
msg := "向隊列:"+q.Name+"發送消息失敗!"
failOnError(e,msg)
}
//向exchange發送消息
//Publish方法可以往某個exchange發送消息
func (q *RabbitMQ) Publish(exchange string,body interface{},key string) {
str,e := json.Marshal(body)
failOnError(e,"消息序列化失敗!")
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo:q.Name,
Body:[]byte(str)},
)
failOnError(e,"向路由發送消息失敗!")
}
//接收某個消息隊列的消息
func (q * RabbitMQ) Consume() <-chan amqp.Delivery{
c,e :=q.channel.Consume(
q.Name,//指定從哪個隊列中接收消息
"",
true,
false,
false,
false,
nil,
)
failOnError(e,"接收消息失敗!")
return c
}
//關閉隊列連接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
//錯誤處理函數
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
其中函數參數解析可以參考:Rabbitmq詳解(基於go語言)

