RabbitMQ
簡介
RabbitMQ是一個消息代理,用來負責接收和轉發消息。
術語
- 生產者:生產者是負責發送消息的
- 隊列:隊列是RabbitMQ用來存儲消息的,受主機內存和磁盤大小的限制,本質上是一個消息的緩沖區。生產者可以將消息發送至隊列中,消費者可以從隊列中接收到消息
- 消費者:消費者是用來等待接收消息
生產者,消費者,代理可以駐留在不同主機或同一主機,一個應用可以是生產者也可以是消費者
Hello World
接下來我們來實現RabbitMQ的“Hello World”,生產者將“Hello World”發送進隊列中,消費者將其接收並打印
- RabbitMQ客戶端的安裝
- RabbitMQ實現了很多協議,在這里我們使用的是的AMQP 0-9-1,這是一種用於消息傳遞的開放通用協議。同時有很多關於RabbitMQ的客戶端,在這里我們使用的是Go amqp客戶端
- 安裝:
**go get github.com/streadway/amqp
發送
-
連接RabbitMQ
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatal(err) } defer conn.Close()
RabbitMQ的連接已經為我們抽象了socket的連接,同時為我們處理了協議版本號和身份認證等等
-
創建通道
ch,err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close()
在使用其他API完成任務的時候我們首先通過以上方式創建通道
-
在開始發送消息之前我們首先應該聲明一個隊列。聲明隊列之后我們就可以將消息發送至隊列當中
q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments
)
if err != nil {
log.Fatal(err)
}
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatal(err)
}
```
隊列的聲明是一個冪等性操作,如果不存在該隊列的話則會創建。此處注意,如果隊列存在,修改了隊列參數並不會影響已經存在的隊列,並且會返回錯誤。消息內容是一個字節數組,所以我們必須進行編碼
接收
-
連接,創建通道,隊列
在接收端我們同樣需要像發送端一樣連接RabbitMQ,創建通道后再創建隊列,注意此處隊列的創建是跟發送端的隊列完全匹配的。隊列在接收端也創建是因為我們接收端有可能比發送端先啟動,所以為了保證我們要消費的隊列存在我們在此處也進行創建
-
消費消息
msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args
)
if err != nil {
log.Fatal(err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
```
使用通道消費隊列中的消息,當隊列有消息的時候將會異步的推送給我們