三、”發布訂閱”
上一節的練習中我們創建了一個工作隊列。隊列中的每條消息都會被發送至一個工作進程。這節,我們將做些完全不同的事情——我們將發送單個消息發送至多個消費者。這種模式就是廣為人知的“發布訂閱”模式。
為了說明這種模式,我們將構建一個簡單的日志系統。包括2個應用程序,一個傳送日志消息另一個接收並打印這些消息。
我們的日志系統中每一個運作的接收端程序都會收到這些消息。這種方式下,我們就可以運行一個接收端發送日志消息至硬盤,同時可以運行另一個接收端將日志打印到屏幕上。
理論上講,已發布的日志消息將會被廣播到所有的接收者。
交換器(Exchange)
之前的幾節練習中我們發送接收消息都是在隊列中進行,是時候介紹下RabbitMQ完整的消息傳遞模式了。
先來迅速的回顧下我們之前章節:
- 一個生產者就是一個用來發送消息的應用程序
- 一個 隊列好比存儲消息的緩存buffer
- 一個消費者就是一個用戶應用程序用來接收消息
RabbitMQ消息傳遞模型的核心思想是生產者從來不會直接發送消息至隊列。事實上,生產者經常都不知道消息會被分發至哪個隊列。
相反的是,生產者僅僅發送消息至交換器。交換器是非常簡單的東西:一邊從生產者那邊接收消息一邊發送這些消息至隊列。交換器必須准確的知道這些被接收的消息該如何處理。它應該被添加到某個特定隊列?或者添加到多個隊列?甚至直接放棄。具體的傳輸規則就是通過交換器類型來定義的。
交換器類型有四種:direct、topic、headers、fanout。這節我們主要關注最后一種——fanout。讓我們來創建一個fanout類型的交換器,命名為logs:
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
正如你從名字中猜測的一樣,它僅僅廣播所有消息到所有已知的接收隊列。實際上這正是我們需要的日志系統。
備注:之前的幾節練習中我們並不知道交換器,但我們依然能夠將消息發送至隊列中,之所以可以實現是因為我們使用了默認的交換器,使用空字符串表示。
回顧下之前我們發送消息是這樣子的:
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
這里我們可以使用默認也可以自己命名交換器:如果路由鍵存在的話,消息會被路由到加上路由鍵參數的地址,注意fanout類型會直接忽略路由鍵的存在。
以下是修改后的代碼:
err = ch.ExchangeDeclare( "logs", // name 定義一個名為logs的交換器 "fanout", // type 交換器類型為fanout即廣播類型 true, // durable 持久化 false, // auto-deleted 無隊列綁定時是否自動刪除 false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange 指定消息發送的交換器名稱 "", // routing key 因為fanout類型會自動忽略路由鍵,所以這里的路由鍵參數任意,一般不填 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
臨時隊列
你可能記得之前我們聲明隊列的時候都會指定一個隊列名稱(記得hello和task_queue?)。隊列的命名對我們來說至關重要——我們需要將工作進程指向同一個隊列。當你需要在消費者和生產者之間共享隊列的話聲明隊列就顯得很重要。
但這對我們的日志系統來說無關重要。我們需要監聽的是所有的日志消息,而不是他們中的某一類。我們只關注當前流中的消息而不關注舊的那些。解決這個我們需要做兩件事。
首先,每當鏈接RabbitMQ的時候我們需要創建一個新的、空的隊列。為做到這點,我們必須創建一個名稱隨機的隊列,甚至更好的實現方式是——讓服務端給我們自動生成一個隨機的隊列。
其次,一旦消費者鏈接斷開,該隊列便會自動刪除。
在amqp客戶端中,當我們給一個隊列名稱設定為空字符串時,我們就創建了一個非持久化的生成隊列:
q, err := ch.QueueDeclare( "", // name 滿足第一點:服務端自動產生隨機隊列 false, // durable false, // delete when usused true, // exclusive 滿足第二點:連接斷開立即刪除 false, // no-wait nil, // arguments )
當該方法返回的時候,聲明好的隊列便包含一個由RabbitMQ生成的隨機隊列名稱。舉例來說,隊列名稱形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg這種的。
當消費者的鏈接宣布關閉后,隊列便像exclusive參數設置的那樣,自動刪除。
綁定
我們已經創建了一個fanout類型的交換器和一個隊列,現在我們需要告訴交換器將消息發送至我們的隊列。這種交換器和隊列中的關聯關系就叫做綁定。
err = ch.QueueBind( q.Name, // queue name 綁定的隊列名稱 "", // routing key 綁定的路由鍵 "logs", // exchange 綁定的交換器名稱 false, nil )
從現在起,logs交換器便能發送消息至我們的隊列。
糅合在一起
生產者的程序,也就是發送消息端,跟之前幾節的發送代碼差不多。最重要的是我們現在要發送消息到logs交換器而非默認的交換器。發送的時候我們可以設置一個路由鍵,但是對於fanout類型的交換器來說它將被忽略。下面就是發送日志方的代碼:
// rabbitmq_3_emit_log.go project main.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s:%s", err, msg) panic(fmt.Sprintf("%s:%s", err, msg)) } } func bodyForm(args []string) string { var s string if len(args) < 2 || os.Args[1] == "" { s = "Hello World! This is a test!" } else { s = strings.Join(args[1:], " ") } return s } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "failed to dial rabbitmq server") defer conn.Close() ch, err := conn.Channel() failOnError(err, "failed to declare the channel") defer ch.Close() //聲明一個交換器,交換器名稱logs,類型fanout err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err, "failed to declare the exchange") body := bodyForm(os.Args) //發送消息到交換器 err = ch.Publish("logs", "", false, false, amqp.Publishing{ Body: []byte(body), ContentType: "text/plain", }) failOnError(err, "failed to publish the message") }
備注:這里發送方並不需要聲明隊列之類的,不像之前的代碼需要聲明,這里的發送方唯一關聯的是交換器,所以只需聲明交換器並發送消息至交換器即可。
正如你想的那樣,鏈接建立后我們聲明交換器,這一步是必須的因為發送消息到一個不存在的交換器是完全禁止的。
如果該交換器上面沒有隊列綁定的話那么發送至該交換器的消息將全部丟失,但這對我們來時ok;如果沒有消費者我們會安全地丟棄這些消息。
下面是日志接收方的代碼:
// rabbitmq_3_receive_logs.go project main.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s:%s", err, msg) panic(fmt.Sprintf("%s:%s", err, msg)) } } func bodyForm(args []string) string { var s string if len(args) < 2 || os.Args[1] == "" { s = "Hello World! This is a test!" } else { s = strings.Join(args[1:], " ") } return s } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "failed to dial rabbitmq server") defer conn.Close() ch, err := conn.Channel() failOnError(err, "failed to declare the channel") defer ch.Close() //聲明一個交換器,交換器名稱logs,類型fanout err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err, "failed to declare the exchange") //聲明一個隊列 q, err := ch.QueueDeclare("", false, false, true, false, nil) failOnError(err, "failed to declare the queue") //設置綁定(第二個參數為路由鍵,這里為空) err = ch.QueueBind(q.Name, "", "logs", false, nil) failOnError(err, "failed to bind the queue") //注冊一個消費者 msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) failOnError(err, "Failed to register a consumer") forever := make(<-chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
如果你想將日志保存到文件,執行如下命令:
go run receive_logs.go > logs_from_rabbit.log
如果你僅僅想在屏幕上查看日志,開啟一個新的控制台執行如下命令:
go run receive_logs.go
當然了,你最后還要發出日志才行:
go run emit_log.go
使用rabbitmqctl list_bindings命令可以直接查看所有的綁定,如運行2個receive_logs.go程序你就會看到如下輸出:
rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
實際效果:
分別開啟兩個控制台,均監聽相同隊列,同時收到消息並打印了,說明兩個隨機的隊列均收到了logs交換器發來的消息,發送方略。