Go RabbitMQ (一)


  • 生產者:生產者是負責發送消息的
  • 隊列:隊列是RabbitMQ用來存儲消息的,受主機內存和磁盤大小的限制,本質上是一個消息的緩沖區。生產者可以將消息發送至隊列中,消費者可以從隊列中接收到消息
  • 消費者:消費者是用來等待接收消息

生產者,消費者,代理可以駐留在不同主機或同一主機,一個應用可以是生產者也可以是消費者

 

 

第一部分   常用操作函數

生產者

 

1.連接RabbitMQ

conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatal(err) } defer conn.Close()

RabbitMQ的連接已經為我們抽象了socket的連接,同時為我們處理了協議版本號和身份認證等等

 

2.創建通道

ch,err := conn.Channel()
if err != nil { log.Fatal(err) } defer ch.Close()

在使用其他API完成任務的時候我們首先通過以上方式創建通道

 

3.聲明隊列並發送數據

在開始發送消息之前我們首先應該聲明一個隊列。聲明隊列之后我們就可以將消息發送至隊列當中

q, err := ch.QueueDeclare(
"hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )

說明: 其中durable設為true則queue持久化,否則不會做持久化。
if err != nil { log.Fatal(err) } body := "Hello World!"
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) if err != nil { log.Fatal(err) }

說明:
其中amqp.Publishing的DeliveryMode如果設為amqp.Persistent則消息會持久化。
需要注意的是如果需要消息持久化Queue也是需要設定為持久化才有效

隊列的聲明是一個冪等性操作,如果不存在該隊列的話則會創建。此處注意,如果隊列存在,修改了隊列參數並不會影響已經存在的隊列,並且會返回錯誤。消息內容是一個字節數組,所以我們必須進行編碼

 

接收者:

連接,創建通道,隊列

在接收端我們同樣需要像發送端一樣連接RabbitMQ,創建通道后再創建隊列,注意此處隊列的創建是跟發送端的隊列完全匹配的。隊列在接收端也創建是因為我們接收端有可能比發送端先啟動,所以為了保證我們要消費的隊列存在我們在此處也進行創建

 

1.消費消息

msgs, err := ch.Consume(
q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatal(err) } forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever

 

 

 

第二部分  消費者消息應答模式

 

1.

接收消息

msgs, err := ch.Consume(
                q.Name,
                "MsgWorkConsumer",
                false,  //Auto Ack
                false,
                false,
                false,
                nil,
            )

其中Auto ack可以設置為true。如果設為true則消費者一接收到就從queue中去除了,如果消費者處理消息中發生意外該消息就丟失了。
如果Auto ack設為false。consumer在處理完消息后,調用msg.Ack(false)后消息才從queue中去除。即便當前消費者處理該消息發生意外,只要沒有執行msg.Ack(false)那該消息就仍然在queue中,不會丟失。

生成的Queue在生成是設定的參數,下次使用時不能更改設定參數,否則會報錯

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM