- 生產者:生產者是負責發送消息的
- 隊列:隊列是RabbitMQ用來存儲消息的,受主機內存和磁盤大小的限制,本質上是一個消息的緩沖區。生產者可以將消息發送至隊列中,消費者可以從隊列中接收到消息
- 消費者:消費者是用來等待接收消息
生產者,消費者,代理可以駐留在不同主機或同一主機,一個應用可以是生產者也可以是消費者
第一部分 常用操作函數
生產者
1.連接RabbitMQ
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatal(err) } defer conn.Close()
RabbitMQ的連接已經為我們抽象了socket的連接,同時為我們處理了協議版本號和身份認證等等
2.創建通道
ch,err := conn.Channel()
if err != nil { log.Fatal(err) } defer ch.Close()
在使用其他API完成任務的時候我們首先通過以上方式創建通道
3.聲明隊列並發送數據
在開始發送消息之前我們首先應該聲明一個隊列。聲明隊列之后我們就可以將消息發送至隊列當中
q, err := ch.QueueDeclare(
"hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )
說明: 其中durable設為true則queue持久化,否則不會做持久化。
if err != nil { log.Fatal(err) } body := "Hello World!"
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) if err != nil { log.Fatal(err) }
說明:
其中amqp.Publishing的DeliveryMode如果設為amqp.Persistent則消息會持久化。
需要注意的是如果需要消息持久化Queue也是需要設定為持久化才有效
隊列的聲明是一個冪等性操作,如果不存在該隊列的話則會創建。此處注意,如果隊列存在,修改了隊列參數並不會影響已經存在的隊列,並且會返回錯誤。消息內容是一個字節數組,所以我們必須進行編碼
接收者:
連接,創建通道,隊列
在接收端我們同樣需要像發送端一樣連接RabbitMQ,創建通道后再創建隊列,注意此處隊列的創建是跟發送端的隊列完全匹配的。隊列在接收端也創建是因為我們接收端有可能比發送端先啟動,所以為了保證我們要消費的隊列存在我們在此處也進行創建
1.消費消息
msgs, err := ch.Consume(
q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatal(err) } forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
第二部分 消費者消息應答模式
1.
接收消息
msgs, err := ch.Consume(
q.Name,
"MsgWorkConsumer",
false, //Auto Ack
false,
false,
false,
nil,
)
其中Auto ack可以設置為true。如果設為true則消費者一接收到就從queue中去除了,如果消費者處理消息中發生意外該消息就丟失了。
如果Auto ack設為false。consumer在處理完消息后,調用msg.Ack(false)后消息才從queue中去除。即便當前消費者處理該消息發生意外,只要沒有執行msg.Ack(false)那該消息就仍然在queue中,不會丟失。
生成的Queue在生成是設定的參數,下次使用時不能更改設定參數,否則會報錯