介紹
RabbitMQ是消息中間件:它接受並轉發消息。
您可以將其視為郵局系統:將要發送的郵件放在郵箱中時,
可以確保郵遞員最終將郵件傳遞給收件人。
以此類推,RabbitMQ是一個郵箱,一個郵局和一個郵遞員。
RabbitMQ與郵局之間的主要區別在於,
它不處理紙張,而是接收,存儲和轉發數據消息的二進制數據。
以下是RabbitMQ和消息發送的術語
- Producer:生產者。負責生產消息。
- Queue:隊列。負責存儲消息。隊列在RabbitMQ中充當郵箱的角色,消息傳遞到RabbitMQ中,只能存儲在隊列中。隊列受主機內存和磁盤大小的約束。本質是一個很大的消息緩沖區。
許多生產者可以將消息發送到一個隊列,許多消費者可以嘗試從一個隊列接收數據。
- Consumer:消費者。負責處理消息。
** 筆者補充
-
Connect:連接。生產者和RabbitMQ服務之間建立的TCP連接。
-
Channel:信道,一條連接可包含多條信道,不同信道之間通信互不干擾。考慮下多線程應用場景,每個線程對應一條信道,而不是對應一條連接,這樣可以提高性能。
-
body:消息主體,要傳遞的數據。
-
exchange:交換器,負責把消息轉發到對應的隊列。交換器本身沒有緩存消息的功能,消息是在隊列中緩存的,如果隊列不存在,則交換器會直接丟棄消息。常用的有四種類型的交換器:direct、fanout、topic、headers。不同類型的交換器有不同的交換規則,交換器會根據交換規則把消息轉發到對應的隊列。
-
exchangeName:交換器名稱,每個交換器對應一個名稱,發送消息時會附帶交換器名稱,根據交換器名稱選擇對應的交換器。
-
BandingKey:綁定鍵,一個隊列可以有一個到多個綁定鍵,通過綁定操作可以綁定交換器和隊列,交換器會根據綁定鍵的名稱找到對應的隊列。
-
RotingKey:路由鍵,發送消息時,需要附帶一條路由鍵,交換器會對路由鍵和綁定鍵進行匹配,如果匹配成功,則消息會轉發到綁定鍵對應的隊列中。
**簡而言之就是:
-
生產者指定路由Key和交換器的名字發送給RabbitMQ服務
-
指定名字的交換器根據路由key去找到綁定的隊列
-
將消息放入隊列當中
-
消費者從隊列中取出消息進行處理
實戰 "Hello World"
golang語言實現
在本教程的這一部分中,我們將用Go編寫兩個小程序。 發送單個消息的生產者和接收消息並打印出來的消費者。 我們將介紹Go RabbitMQ API中的一些細節,僅着眼於此非常簡單的事情。 這是消息傳遞的“ Hello World”。
在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。 中間的框是一個隊列-RabbitMQ代表使用者保留的消息緩沖區。
Go RabbitMQ客戶端庫
RabbitMQ使用多種協議。 本教程使用AMQP 0-9-1,這是一種開放的通用消息傳遞協議。 RabbitMQ有許多不同語言的客戶。 在本教程中,我們將使用Go amqp客戶端。
go get github.com/streadway/amqp
生產者發送數據到隊列
#send.go 生產者,發送消息到消息隊列中
package main
import (
"github.com/streadway/amqp"
"log"
)
func main(){
// 連接RabbitMQ服務器
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 創建一個channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 聲明一個隊列
q, err := ch.QueueDeclare(
"hello", // 隊列名稱
false, // 是否持久化
false, // 是否自動刪除
false, // 是否獨立
false,nil,
)
failOnError(err, "Failed to declare a queue")
// 發送消息到隊列中
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
fmt.Println("send message success\n"
}
// 幫助函數檢測每一個amqp調用
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
消費者從隊列接收數據
package main
import (
"github.com/streadway/amqp"
"log"
)
func main(){
// 連接RabbitMQ服務器
conn, err := amqp.Dial("amqp://admin:admin@47.97.215.189:5672/admin")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 創建一個channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 監聽隊列
q, err := ch.QueueDeclare(
"hello", // 隊列名稱
false, // 是否持久化
false, // 是否自動刪除
false, // 是否獨立
false,nil,
)
failOnError(err, "Failed to declare a queue")
// 消費隊列
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 申明一個goroutine,一遍程序始終監聽
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
// 幫助函數檢測每一個amqp調用
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}