rabbitMQ工作隊列
在之前內容中我們通過一個隊列實現了消息的發送跟接收。接下來我們創建工作隊列(Work Queue),用於在多個工作者之間分配耗時的任務
工作隊列(任務隊列)背后的核心主要是避免立即執行資源密集型的任務,必須等待其工作完成。我們將任務封裝為消息后將其發送到隊列,后台的工作進程將彈出任務並最終執行,當我們運行很多Worker時候,任務將在它們之間共享
round-robin 調度
- 使用任務隊列的優點之一就是能夠輕松的並行化工作
- 默認情況下,RabbitMQ會將每一條信息按照消費者順序發送給一個消費者,這樣平均每個消費者會接收到相同數量的消息,這種消息分發的模式叫做round-robin(啟動多個接收端,然后發送多個消息試試)
message acknowledgment(消息確認)
為了確保消息不會丟失,RabbitMQ支持消息確認,消費者消費了一個消息之后會發送一個ack給RabbitMQ,這樣RabbitMQ就可以刪除掉這個消息
如果一個消費者異常(通道關閉或鏈接關閉或TCP鏈接丟失)沒有發送ACK給rabbitMQ,rabbitMQ會將該消息重新放入隊列當中。此時如果有其他消費者在線,rabbitMQ會重新將該消息再次投遞到另一個消費者
-
手動確認ACK
- 手動確認ACK我們可以在創建消費者的時候將auto-ack設置為false,一旦我們消費消息任務完畢的時候使用d.Ack(false)來確認ack,告訴RabbitMQ該消息可以刪除
msgs,err := ch.Consume( q.Name, "", false,//將autoAck設置為false,則需要在消費者每次消費完成 // 消息的時候調用d.Ack(false)來告訴RabbitMQ該消息已經消費 false, false, false, nil, ) FailError(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") //multiple為true的時候:此次交付和之前沒有確認的交付都會在通過同一個通道交付,這在批量處理的時候很有用 //為false的時候只交付本次。只有該方法執行了,RabbitMQ收到該確認才會將消息刪除 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
使用以上設置后,我們可以保證即使worker在執行任務的時候意外退出也不會丟失消息。在worker意外退出的不久之后消息將會被重新投遞。確認ack必須使用接收到消息的通道,如果使用不同的通道將會導致一個通道協議異常
-
忘記確認ack
- 在開發的時候經常會忘記對消費過的消息進行ack確認,這是一個很嚴重的錯誤,可以使用以下命令查看RabbitMQ中有多少消息在准備中或是未確認的: sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
hello 0 1
```
* messages_ready:未投遞的消息
* messages_unacknowledged:投遞未收到回復的消息
消息持久
我們已經知道如何確保即使消費者意外退出的情況下保證任務不會丟失。但是如果RabbitMQ服務停止的話任務還是會丟失。當RabbitMQ退出或異常的時候,它將會丟失隊列和消息,除非你設置RabbitMQ的兩個地方:將隊列和消息進行標記為持久的
-
首先設置隊列durable為true
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
```
RabbitMQ不允許使用不同參數重新定義一個已經存在的隊列,所以隊列已經存在的話修改了上面的配置后運行程序是不會改變已經存在的隊列的
-
然后設置消息為持久化存儲:
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
```
注意:設置消息持久化並不能保證消息不會丟失,因為仍然有一小段時間片處於RabbitMQ收到消息但是還沒保存,它可能只是保存在內存當中。但是已經滿足我們的基本使用,如果你需要強保證的話可以使用**publisher confirms**
公平調度(Fair dispatch)
- RabbitMQ的默認消息分配不能夠滿足我們的需要,比如有兩個消費者,其中一個消費者經常忙碌的狀態,另外一個消費者幾乎不做任何工作,但是RabbitMQ仍然均勻的在兩者之間調度消息。這是因為RabbitMQ只做隊列當中的消息調度而沒有查看某個消費者中未確認的消息,它只是盲目的將第n條消息發送給第n個消費者
- 解決以上問題我們可以設置prefetch count數值為1,這樣只有當消費者消費完消息並返回ack確認后RabbitMQ才會給其分發消息,否則只會將消息分發給其他空閑狀態的消費者
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
注意:消費者必須要設置,生產者不用設置
完整代碼
- new_task.go
func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failError(err,"send:Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
failError(err,"Failed to open a channel")
defer ch.Close()
q,err := ch.QueueDeclare(
"task_queue",
true,// 設置為true之后RabbitMQ將永遠不會丟失隊列,否則重啟或異常退出的時候會丟失
false,
false,
false,
nil,
)
failError(err,"Failed to declare a queue")
fmt.Println(q.Name)
body := bodyFrom(os.Args)
//生產者將消息發送到默認交換器中,不是發送到隊列中
ch.Publish(
"",//默認交換器
q.Name,//使用隊列的名字來當作route-key是因為聲明的每一個隊列都有一個隱式路由到默認交換器
false,
false,
amqp.Publishing{
DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
Body:[]byte(body),
})
failError(err,"Failed to publish a message")
log.Printf(" [x] Sent %s",body)
}
func bodyFrom(args []string)string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "hello"
}else {
s = strings.Join(args[1:]," ")
}
return s
}
func failError(err error,msg string) {
if err != nil {
log.Fatal("%s : %s",msg,err)
}
}
- Worker.go
func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailError1(err,"receive:Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
FailError1(err,"receive:Failed to open a channel")
defer ch.Close()
q,err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
err = ch.Qos(
1, //// 在沒有返回ack之前,最多只接收1個消息
0,
false,
)
FailError1(err,"Failed to set Qos")
msgs,err := ch.Consume(
q.Name,
"",
false,//將autoAck設置為false,則需要在消費者每次消費完成
// 消息的時候調用d.Ack(false)來告訴RabbitMQ該消息已經消費
false,
false,
false,
nil,
)
FailError1(err,"Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
fmt.Println()
time.Sleep(t * time.Second)
log.Printf("Done")
//multiple為true的時候:此次交付和之前沒有確認的交付都會在通過同一個通道交付,這在批量處理的時候很有用
//為false的時候只交付本次。只有該方法執行了,RabbitMQ收到該確認才會將消息刪除
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func FailError1(err error,msg string) {
if err != nil {
log.Fatal("%s : %s",msg,err)
}
}