二、”工作隊列”
在第一節中我們發送接收消息直接從隊列中進行。這節中我們會創建一個工作隊列來分發處理多個工作者中的耗時性任務。
工作隊列主要是為了避免進行一些必須同步等待的資源密集型的任務。實際上我們將這些任務時序話稍后分發完成。我們將某個任務封裝成消息然后發送至隊列,后台運行的工作進程將這些消息取出然后執行這些任務。當你運行多個工作進程的時候,這些任務也會在它們之間共享。
前期准備
上一節的練習中我們發送的是簡單包含“Hello World!”的消息,這節我們還發送字符串不過用此代表更復雜的任務,實際我們這里並沒有真正的任務,像圖片縮放或pdf文件渲染之類的,這里我們假裝我們很忙(即處理的消息任務很耗時),使用time.Sleep函數實現。我們用字符串中的”.”符號的數量代表任務的復雜性,每一個”.”需要耗時1s來執行處理。比如:”Hello…”代表該消息處理耗時3s。
我們稍微修改下上節中send.go代碼,為了可以在命令行直接發送任意數量的消息。該程序將任務發送到我們的隊列,暫且命名為new_task.go:
body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing { DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body)
我們舊的receiver.go為程序也要坐下修改:對每個消息體中的”.”符號它需要偽造一個每秒執行的工作隊列。它將消息從隊列中取出並執行,所以這里暫且命名為work.go:
msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
請注意,我們這里的假任務模擬的是執行時間。如上一節中方式,運行:
shell1$ go run worker.go
shell2$ go run new_task.go
運行work.go:
運行new_task.go:
可以看到,work.go循環監聽消息並打印,new_task.go中,我們接收控制台參數作為消息內容並發送,消息接收后自動應答。
輪轉分發(Round-robin dispatching)
使用任務隊列的一個優點就是有能力更簡單的處理平行任務,如果工作任務堆積之后,我們只需要增加更多的工作進程,可以很簡單的實現規模拓展。
首先,我們同時運行2個工作隊列,都從消息隊列中獲取消息,實際會怎么樣呢?來看看。
你現在需要打開2個窗口,都運行work.go,即work1和work2,這就是我們的2個消費者:C1、C2。
第3個窗口我們用來發送消息到隊列,一旦消費者運行起來后便可以發送消息:
shell3$ go run new_task.go First message.
shell3$ go run new_task.go Second message..
shell3$ go run new_task.go Third message...
shell3$ go run new_task.go Fourth message....
shell3$ go run new_task.go Fifth message.....
然后看下work.go中接收的數據:
默認情況下,RabbitMQ會將隊列中的每條消息有序的分發給每一個消費者,比如這里的work1和work2,平均每個消費者都會獲得相同數量的消息(一個隊列中的同一條消息不會同時發送給超過2個消費者),這種分發消息的方式就是“輪轉分發”,可以開啟3個work試試。
至此完整代碼如下:
new_task.go:
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //連接服務器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明隊列 q, err := ch.QueueDeclare( "hello", // name 隊列名稱 false, // durable 是否持久化,這里false false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //創建請求體 body := bodyFrom(os.Args) //發送消息 err = ch.Publish( "", // exchange 交換器名稱,使用默認 q.Name, // routing key 路由鍵,這里為隊列名稱 false, // mandatory false, amqp.Publishing{ ContentType: "text/plain", //消息類型,文本消息 Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello golang" } else { s = strings.Join(args[1:], " ") } return s }
work.go:
package main import ( "bytes" "fmt" "log" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //鏈接服務器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明隊列 q, err := ch.QueueDeclare( "hello", // name 隊列名稱 false, // durable 持久化標識 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //聲明消費者 msgs, err := ch.Consume( q.Name, // queue 消費的隊列名稱 "", // consumer true, // auto-ack 自動應答 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) //主要用來防止主進程窗口退出 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) //延時x秒 log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
消息應答
完成一個任務處理可能會花費數秒時間,你可能會納悶如果其中一個消費者任務處理時間過長只部分完成就掛掉會怎樣。如果使用以上代碼,一旦RabbitMQ發送一個消息給消費者然后便迅速將該消息從隊列內存中移除。這種情況下,如果你殺掉其中一個工作進程,那該進程正在處理的消息也將丟失。我們同樣,也將丟失所有發送給該進程的未被處理的消息。
但我們並不想丟失這些任務或消息。如果某個進程掛掉,我們期望該消息仍會被發送至其它工作進程。
如果一個進程掛掉,我們希望該消息或任務可以被分發至其它工作進程。
為了確保消息永不丟失,RabbitMQ支持消息應答機制。當消息被接受,處理之后一條應答便會從消費者回傳至發送方,然后RabbitMQ將其刪除。
如果某個消費者掛掉(信道、鏈接關閉或者tcp鏈接丟失)且沒有發送ack應答,RabbitMQ會認為該消息沒有被處理完全然后會將其重新放置到隊列中。通過這種方式你就可以確保消息永不丟失,甚至某個工作進程偶然掛掉的情況。
永遠不會有消息超時這一說,RabbitMQ在工作進程處理掛掉后將會重發消息,這很不錯甚至處理消息要發送很長很長的時間。
默認情況下消息應答是關閉的。是時候使用false(auto-ack配置項)參數將其開啟了:
msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
這里唯一不同的是將auto-ack設置為了false,使用手動應答,然后在代碼中需要調用d.Ack(false),進行手動應答。
使用如上代碼后,即時消息處理時按了Ctrl+C結束了進程,什么也不會丟失。工作進程掛掉后所有未應答的消息將會被重新分發。
消息持久化
我們已經學了如何確保消費者掛掉后任務不丟失的情況,但是一旦RabbitMQ服務器重啟后我們的消息或任務依舊會丟失。
當RabbitMQ服務器停止或崩潰時,它將會丟失多有的隊列和消息,除非你告訴它不要這么做。要做到服務宕機消息不丟失需要做到兩點:我們需要將消息和隊列同時標為持久化。
首先,我們需要確保RabbitMQ不會丟失我們的隊列,為做到此,隊列聲明修改如下:
q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
即使這里被我們這樣修改過,但是在先前的設置中此代碼並不會工作。因為我們已經命名了一個叫做hello的隊列,並且非持久。RabbitMQ不允許定義2個不同參數的隊列,一旦做了將會報錯。但是有一個快速的解決辦法:我們聲明隊列換個名字就行了,如下task_queue,new_task.go:
q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
durable配置項的更改需要同時反映到生產者和消費者的代碼上。
基於這點我們可以確定RabbitMQ重啟后task_queue隊列不會丟失了。現在我們還需要將消息標記為持久:使用amqp.Publishing配置項中的amqp.Persistent值實現:
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing { DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), })
完整的new_task.go的代碼如下:
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //連接服務器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明隊列 q, err := ch.QueueDeclare( "task_queue", // name 隊列名稱 true, // durable 是否持久化,這里true false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //創建請求體 body := bodyFrom(os.Args) //發送消息 err = ch.Publish( "", // exchange 交換器名稱,使用默認 q.Name, // routing key 路由鍵,這里為隊列名稱 false, // mandatory false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", //消息類型,文本消息 Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello golang" } else { s = strings.Join(args[1:], " ") } return s }
work.go如下:
package main import ( "bytes" "fmt" "log" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //鏈接服務器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明隊列 q, err := ch.QueueDeclare( "task_queue", // name 隊列名稱 true, // durable 持久化標識 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //聲明消費者 msgs, err := ch.Consume( q.Name, // queue 消費的隊列名稱 "", // consumer false, // auto-ack 自動應答 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) //主要用來防止主進程窗口退出 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) //延時x秒 log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
這里測試的話,可以使用RabbitMQ自帶的ctl命令進行RabbitMQ應用的重啟,然后看下消息會不會丟失。
公平調度
你可能已經注意到了這種消息分發機制並非我們實際想要的那種,舉例來說有兩個消費者或工作進程,所有奇數的消息都很難處理而所有偶數的消息都便於處理,那么一個工作進程就比較忙碌而另一個就比較輕松,好吧,RabbitMQ實際也不清楚實際的消息分發是怎樣的。
這種情況的發生是因為RabbitMQ僅僅負責分發隊列中的消息。並不查看消費者中的未應答的消息數量。它只是盲目的將消息均發給每個消費者。
為了避免這種情況我們可以將prefetch count項的值配置為1,這將會指示RabbitMQ在同一時間不要發送超過一條消息給每個消費者。換句話說,直到消息被處理和應答之前都不會發送給該消費者任何消息。取而代之的是,它將會發送消息至下一個比較閑的消費者或工作進程。
err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS")
所有完整的實例代碼如下:
首先是new_task.go:
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //連接服務器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明隊列 q, err := ch.QueueDeclare( "task_queue", // name 隊列名稱 true, // durable 是否持久化,這里true false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") //創建請求體 body := bodyFrom(os.Args) //發送消息 err = ch.Publish( "", // exchange 交換器名稱,使用默認 q.Name, // routing key 路由鍵,這里為隊列名稱 false, // mandatory false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", //消息類型,文本消息 Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello golang" } else { s = strings.Join(args[1:], " ") } return s }
然后是work.go:
package main import ( "bytes" "fmt" "log" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //鏈接服務器 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //聲明channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //聲明隊列 q, err := ch.QueueDeclare( "task_queue", // name 隊列名稱 true, // durable 持久化標識 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") //聲明消費者 msgs, err := ch.Consume( q.Name, // queue 消費的隊列名稱 "", // consumer false, // auto-ack 自動應答 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) //主要用來防止主進程窗口退出 go func() { for d := range msgs { d.Ack(false) log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) //延時x秒 log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }