Rabbitmq 消費者的推模式與拉模式(go語言版本)


 

 

 RabbitMQ的消費者有兩種模式

實現RabbitMQ的消費者有兩種模式,推模式(Push)拉模式(Pull)

  • 推模式:消息中間件主動將消息推送給消費者
  • 拉模式:消費者主動從消息中間件拉取消息

推模式將消息提前推送給消費者,消費者必須設置一個緩沖區緩存這些消息。好處很明顯,消費者總是有一堆在內存中待處理的消息,所以效率高。缺點是緩沖區可能會溢出。
拉模式在消費者需要時才去消息中間件拉取消息,這段網絡開銷會明顯增加消息延遲,降低系統吞吐量。
選擇推模式還是拉模式需要考慮使用場景。

推模式是最常用的,但是有些情況下推模式並不適用的,比如說:由於某些限制,消費者在某個條件成立時才能消費消息

接受消息 – 推模式
RMQ Server主動把消息推給消費者

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  • queue:隊列名稱。
  • consumer:消費者標簽,用於區分不同的消費者。
  • autoAck:是否自動回復ACK,true為是,回復ACK表示高速服務器我收到消息了。建議為false,手動回復,這樣可控性強。
  • exclusive:設置是否排他,排他表示當前隊列只能給一個消費者使用。
  • noLocal:如果為true,表示生產者和消費者不能是同一個connect。
  • nowait:是否非阻塞,true表示是。阻塞:表示創建交換器的請求發送后,阻塞等待RMQ Server返回信息。非阻塞:不會阻塞等待RMQ Server的返回信息,而RMQ Server也不會返回信息。(不推薦使用)
  • args:直接寫nil,沒研究過,不解釋。

注意下返回值:返回一個<- chan Delivery類型,遍歷返回值,有消息則往下走, 沒有則阻塞。

接受消息 – 拉模式
消費者主動從RMQ Server拉消息

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
  • queue:隊列名稱 。
  • autoAck:是否開啟自動回復。

 

消費者消息確認機制(關於Ack機制)

如果其中一個消費者開始一項漫長的任務而僅部分完成就掛掉,會發生什么情況?

在這種情況下,消費者掛掉,我們將丟失正在處理的消息。我們還將丟失所有發送給該特定消費者但尚未處理的消息。

但是我們不想丟失任何消息。如果一個消費者掛掉,我們希望將消息交付給另一個消費者。

為了確保消息永不丟失,RabbitMQ支持 消息確認消費者發送回一個確認(ACK),告知RabbitMQ特定的消息已被接收處理,並且RabbitMQ才可以刪除此條消息。

在消費者從隊列接收消息時,可以設置 autoAck 來指定是否自動確認。

  • autoAck = true,當消費者收到消息后,會自動發送ACK,RabbitMQ然后從內存或磁盤中刪除消息。
  • autoAck = false,RabbitMQ 會一直等到持有消息的消費者顯式地調用 Ack 命令為止。而且 RabbitMQ 也不會為消息設置過期時間,判斷消息是否需要重新投遞的依據是消費該消息的消費者已斷開連接。這時候,隊列中的消息分為兩個部分:一部分是等待投遞給消費者的消息;一部分是已經投遞,但還沒收到消費者確認信號的消息。如果一直沒有受到確認信號,並且消費消息的消費者斷開連接,RabbitMQ 會重新安排該消息進入隊列,等待投遞給下一個消費者(有可能還是原來的那個消費者)。
  • 推薦手動回復,盡量不要使用autoACK,因autoACK不可控。

如果設置為false后,主要使用兩個函數進行手動回復:

func (ch *Channel) Ack(tag uint64, multiple bool) error
func (me Delivery) Ack(multiple bool) error {
    if me.Acknowledger == nil {
    return errDeliveryNotInitialized
  }
  return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}

簡單看一眼,函數2調用了函數1,本質上兩個函數沒區別。 

這里推薦使用第二個,因為方便。
另外說一下multiple參數。true表示回復當前信道所有未回復的ack,用於批量確認。false表示回復當前條目。

手動回復的例子

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

ACK機制的開發注意事項

如果你設置為手動回復,而且在程序中忘記了ACK,那么后果很嚴重。當消費者退出時候,Message會一直重新分發。然后RabbitMQ會占用越來越多的內容,由於RabbitMQ會長時間運行,因此這個"內存泄漏"是致命的。

 

消費者拒絕消費

func (d Delivery) Reject(requeue bool) error

拒絕本條消息。如果requeue為true,則RMQ會把這條消息重新加入隊列,如果requeue為false,則RMQ會丟棄本條消息。

RMQ官網提供的教程:https://www.rabbitmq.com/getstarted.html 
go-amqp庫函數手冊:https://godoc.org/github.com/streadway/amqp 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM