RabbitMQ使用 prefetch_count優化隊列的消費,使用死信隊列和延遲隊列實現消息的定時重試,golang版本


RabbitMQ 的優化

channel

生產者,消費者和 RabbitMQ 都會建立連接。為了避免建立過多的 TCP 連接,減少資源額消耗。

AMQP 協議引入了信道(channel),多個 channel 使用同一個 TCP 連接,起到對 TCP 連接的復用。

不過 channel 的連接數是有上限的,過多的連接會導致復用的 TCP 擁堵。

const (
	maxChannelMax = (2 << 15) - 1
	defaultChannelMax = (2 << 10) - 1
)

通過http://github.com/streadway/amqp這個client來連接 RabbitMQ,這里面定義了最大值65535和默認最大值2047。

prefetch Count

什么是prefetch Count,先舉個栗子:

假定 RabbitMQ 隊列有 N 個消費隊列,RabbitMQ 隊列中的消息將以輪詢的方式發送給消費者。

消息的數量是 M,那么每個消費者得到的數據就是 M%N。如果某一台的機器中的消費者,因為自身的原因,或者消息本身處理所需要的時間很久,消費的很慢,但是其他消費者分配的消息很快就消費完了,然后處於閑置狀態,這就造成資源的浪費,消息隊列的吞吐量也降低了。

這時候prefetch Count就登場了,通過引入prefetch Count來避免消費能力有限的消息隊列分配過多的消息,而消息處理能力較好的消費者沒有消息處理的情況。

RabbitM 會保存一個消費者的列表,每發送一條消息都會為對應的消費者計數,如果達到了所設定的上限,那么 RabbitMQ 就不會向這個消費者再發送任何消息。直到消費者確認了某條消息之后 RabbitMQ 將相應的計數減1,之后消費者可以繼續接收消息,直到再次到達計數上限。這種機制可以類比於 TCP/IP 中的"滑動窗口"。

所以消息不會被處理速度很慢的消費者過多霸占,能夠很好的分配到其它處理速度較好的消費者中。通俗的說就是消費者最多從 RabbitMQ 中獲取的未消費消息的數量。

prefetch Count數量設置為多少合適呢?大概就是30吧,具體可以參見Finding bottlenecks with RabbitMQ 3.3

談到了prefetch Count,我們還要看了 global 這個參數,RabbitMQ 為了提升相關的性能,在 AMQPO-9-1 協議之上重新定義了 global 這個參數

global 參數 AMQPO-9-1 RabbitMQ
false 信道上所有的消費者都需要遵從 prefetchC unt 的限 信道上新的消費者需要遵從 prefetchCount 的限定值定值
true 當前通信鏈路(Connection) 上所有的消費者都要遵從 prefetchCount 的限定值,就是同一Connection上的消費者共享 信道上所有的消費者都需要遵從 prefetchCunt 的上限,就是同一信道上的消費者共享

prefetchSize:預讀取的單條消息內容大小上限(包含),可以簡單理解為消息有效載荷字節數組的最大長度限制,0表示無上限,單位為 B。

如果prefetch Count為 0 呢,表示預讀取的消息數量沒有上限。

舉個錯誤使用的栗子:

之前一個隊列的消費者消費速度過慢,prefetch Count為0,然后新寫了一個消費者,prefetch Count設置為30,並且起了10個pod,來處理消息。老的消費者還沒有下線也在處理消息。

但是發現消費速度還是很慢,有大量的消息處於 unacked 。如果明白prefetch Count的含義其實就已經可以猜到問題的原因了。

老的消費者prefetch Count為0,所以很多 unacked 消息都被它持有了,雖然新加了幾個新的消費者,但是都處於空閑狀態,最后停掉了prefetch Count為0的消費者,很快消費速度就正常了。

死信隊列

什么是死信隊列

一般消息滿足下面幾種情況就會消息變成死信

  • 消息被否定確認,使用 channel.basicNackchannel.basicReject ,並且此時 requeue 屬性被設置為false;

  • 消息過期,消息在隊列的存活時間超過設置的 TT L時間;

  • 隊列達到最大長度,消息隊列的消息數量已經超過最大隊列長度。

當一個消息滿足上面的幾種條件變成死信(dead message)之后,會被重新推送到死信交換器(DLX ,全稱為 Dead-Letter-Exchange)。綁定 DLX 的隊列就是死信隊列。

所以死信隊列也並不是什么特殊的隊列,只是綁定到了死信交換機中了,死信交換機也沒有什么特殊,我們只是用這個來處理死信隊列了,和別的交換機沒有本質上的區別。

對於需要處理死信隊列的業務,跟我們正常的業務處理一樣,也是定義一個獨有的路由key,並對應的配置一個死信隊列進行監聽,然后 key 綁定的死信交換機中。

使用場景

當消息的消費出現問題時,出問題的消息不被丟失,進行消息的暫存,方便后續的排查處理。

代碼實現

死信隊列的使用,可參看下文,配合延遲隊列實現消息重試的機制。

延遲隊列

什么是延遲隊列

延遲隊列就是用來存儲進行延遲消費的消息。

什么是延遲消息?

就是不希望消費者馬上消費的消息,等待指定的時間才進行消費的消息。

使用場景

1、關閉空閑連接。服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉;

2、清理過期數據業務上。比如緩存中的對象,超過了空閑時間,需要從緩存中移出;

3、任務超時處理。在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求;

4、下單之后如果三十分鍾之內沒有付款就自動取消訂單;

5、訂餐通知:下單成功后60s之后給用戶發送短信通知;

6、當訂單一直處於未支付狀態時,如何及時的關閉訂單,並退還庫存;

7、定期檢查處於退款狀態的訂單是否已經退款成功;

8、新創建店鋪,N天內沒有上傳商品,系統如何知道該信息,並發送激活短信;

9、定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行。

總結下來就是一些延遲處理的業務場景

實現延遲隊列的方式

RabbitMQ 中本身並沒有直接提供延遲隊列的功能,可以通過死信隊列和 TTL 。來實現延遲隊的功能。

先來了解下過期時間 TTL,消息一旦超過設置的 TTL 值,就會變成死信。這里需要注意的是 TTL 的單位是毫秒。設置過期時間一般與兩種方式

  • 1、通過隊列屬性設置,隊列中的消息有相同的過期時間;

  • 2、通過消息本身單獨設置,每條消息有自己的的過期時間。

如果兩種一起設置,消息的 TTL 以兩者之間較小的那個數值為准。

上面兩種 TTL 過期時間,消息隊列的處理是不同的。第一種,消息一旦過期就會從消息隊列中刪除,第二種,消息過期了不會馬上進行刪除操作,刪除的操作,是在投遞到消費者之前進行判斷的。

第一種方式中相同過期時間的消息是在同一個隊列中,所以過期的消息總是在頭部,只要在頭部進行掃描就好了。第二種方式,過期的時間不同,但是消息是在同一個消息隊列中的,如果要清理掉所有過期的時間就需要遍歷所有的消息,當然這也是不合理的,所以會在消息被消費的時候,進行過期的判斷。這個處理思想和 redis 過期 key 的清理有點神似。

Queue TTL

通過 channel.queueDeclare 方法中的 x-expires 參數可以控制隊列被自動刪除前處於未使用狀態的時間。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,並且在過期時間段內也未調用過 Basic.Get 命令。

	if _, err := channel.QueueDeclare("delay.3s.test",
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-expires":                 3000,
		},
	); err != nil {
		return err
	}
Message TTL

對於 Message TTL 設置有兩種方式

  • Per-Queue Message TTL

通過在 queue.declare 中設置 x-message-ttl 參數,可以控制在當前隊列中,消息的過期時間。不過同一個消息被投到多個隊列中,設置x-message-ttl的隊列,里面消息的過期,不會對其他隊列中相同的消息有影響。不同隊列處理消息的過期是隔離的。

	if _, err := channel.QueueDeclare("delay.3s.test",
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-message-ttl":             3000,
		},
	); err != nil {
		return err
	}
  • Per-Message TTL

通過 expiration 就可以設置每條消息的過期時間,需要注意的是 expiration 是字符串類型。

	delayQ := "delay.3s.test"
	if _, err := channel.QueueDeclare(delayQ,
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
		},
	); err != nil {
		return err
	}

	if err := channel.Publish("", delayQ, false, false, amqp.Publishing{
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
		Expiration:   "3000",
	}); err != nil {
		return err
	}

通過延遲隊列來處理延遲消費的場景,可以借助於死信隊列來處理

延遲隊列通常的使用:消費者訂閱死信隊列 deadQueue,然后需要延遲處理的消息都發送到 delayNormal 中。然后 delayNormal 中的消息 TTL 過期時間到了,消息會被存儲到死信隊列 deadQueue。我們只需要正常消費,死信隊列 deadQueue 中的數據就行了,這樣就實現對數據延遲消費的邏輯了。

使用 Queue TTL 設置過期時間

舉個線上處理消息重傳的的栗子:

消費者處理隊列中的消息,一個消息在處理的過程中,會出現錯誤,針對某些特性的錯誤,希望這些消息能夠退回到隊列中,過一段時間在進行消費。當然,如果不進行 Ack,或者 Ack 之后重推到隊列中,消費者就能再次進行重試消費。但是這樣會有一個問題,消費隊列中消息消費很快,剛重推的消息馬上就到了隊列頭部,消費者可能馬上又拿到這個消息,然后一直處於重試的死循環,影響其他消息的消費。這時候延遲隊列就登場了,我們可以借助於延遲隊列,設置特定的延遲時間,讓這些消息的重試,發生到之后某個時間點。並且重試一定次數之后,就可以選擇丟棄這個消息了。

來看下流程圖:

mq

具體的處理步驟:

1、生產者推送消息到 work-exchange 中,然后發送到 work-queue 隊列;

2、消費者訂閱 work-queue 隊列,這是正常的業務消費;

3、對於需要進行延遲重試的消息,發送到延遲隊列中;

4、延遲隊列會綁定一個死信系列,死信隊列的 exchange 和 routing-key,就是上面正常處理業務 work-queue 消息隊里的 exchange 和 routing-key,這樣過期的消息就能夠重推到業務的隊列中,每次重推到延遲隊列的時候會記錄消息重推的次數,如果達到我們設定的上限,就可以丟棄數據,落庫或其他的操作了;

5、所以消費者只需要監聽處理 work-queue 隊列就可以了;

6、無用的延遲隊列,到了刪除的時間節點,會進行自動的刪除。

上代碼,文中 Demo 的地址 👏🏻

func (b *Broker) readyConsumes(ps *params) (bool, error) {
	key := ps.key
	channel, err := b.getChannel(key)
	if err != nil {
		return true, err
	}

	queue, err := b.declare(channel, key, ps)
	if err != nil {
		return true, err
	}

	if err := channel.Qos(ps.prefetch, 0, false); err != nil {
		return true, fmt.Errorf("channel qos error: %s", err)
	}

	deliveries, err := channel.Consume(queue.Name, "", false, false, false, false, nil)
	if err != nil {
		return true, fmt.Errorf("queue consume error: %s", err)
	}

	channelClose := channel.NotifyClose(make(chan *amqp.Error))

	pool := make(chan struct{}, ps.concurrency)

	go func() {
		for i := 0; i < ps.concurrency; i++ {
			pool <- struct{}{}
		}
	}()

	for {
		select {
		case err := <-channelClose:
			b.channels.Delete(key)
			return true, fmt.Errorf("channel close: %s", err)
		case d := <-deliveries:
			if ps.concurrency > 0 {
				<-pool
			}
			go func() {
				var flag HandleFLag

				switch flag = ps.Handle(d.Body); flag {
				case HandleSuccess:
					d.Ack(false)
				case HandleDrop:
					d.Nack(false, false)
					// 處理需要延遲重試的消息
				case HandleRequeue:
					if err := b.retry(ps, d); err != nil {
						d.Nack(false, true)
					} else {
						d.Ack(false)
					}
				default:
					d.Nack(false, false)
				}

				if ps.concurrency > 0 {
					pool <- struct{}{}
				}
			}()
		}
	}
}

func (b *Broker) retry(ps *params, d amqp.Delivery) error {
	channel, err := b.conn.Channel()
	if err != nil {
		return err
	}
	defer channel.Close()

	retryCount, _ := d.Headers["x-retry-count"].(int32)
	// 判斷嘗試次數的上限
	if int(retryCount) >= len(ps.retryQueue) {
		return nil
	}

	delay := ps.retryQueue[retryCount]
	delayDuration := time.Duration(delay) * time.Millisecond
	delayQ := fmt.Sprintf("delay.%s.%s.%s", delayDuration.String(), b.exchange, ps.key)

	if _, err := channel.QueueDeclare(delayQ,
		true, false, false, false, amqp.Table{
			// 配置死信發送的exchange和routing-key
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			// 消息的過期時間
			"x-message-ttl":             delay,
			// 延遲隊列自動刪除的時間設置
			"x-expires":                 delay * 2,
		},
	); err != nil {
		return err
	}

	// exchange為空使用Default Exchange
	return channel.Publish("", delayQ, false, false, amqp.Publishing{
		// 設置嘗試的次數
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
	})
}

測試一下

先使用docker 啟動一個 RabbitMQ

$ sudo mkdir -p /usr/local/docker-rabbitmq/data

$ docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v /usr/local/docker-rabbitmq/data:/var/lib/rabbitmq --hostname rabbitmq -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.7.7-management

賬號,密碼是 admin

const (
	DeadTestExchangeQueue = "dead-test-delayed-queue_queue"
)

func main() {

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)

	broker := rabbitmq.NewBroker("amqp://admin:admin@127.0.0.1:5672", &rabbitmq.ExchangeConfig{
		Name: "worker-exchange",
		Type: "direct",
	})

	broker.LaunchJobs(
		rabbitmq.NewDefaultJobber(
			"dead-test-key",
			HandleMessage,
			rabbitmq.WithPrefetch(30),
			rabbitmq.WithQueue(DeadTestExchangeQueue),
			rabbitmq.WithRetry(help.FIBONACCI, help.Retry{
				Delay: "5s",
				Max:   6,
				Queue: []string{
					DeadTestExchangeQueue,
				},
			}),
		),
	)

	for {
		s := <-ch
		switch s {
		case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
			fmt.Println("job-test-exchange service exit")
			time.Sleep(time.Second)
			return
		case syscall.SIGHUP:
		default:
			return
		}
	}
}

func HandleMessage(data []byte) error {
	fmt.Println("receive message", "message", string(data))

	return rabbitmq.HandleRequeue
}

接收到的消息,直接進行重試,我們來看下,延遲隊列的執行

啟動之后,先來看下消息隊列的面板

mq

通過控制面板 push 一條數據

mq

可以看到消息在延遲隊列中的執行過程,並且沒有再用的延遲隊列,會在設置的過期時間點,進行自動刪除

mq mq

最后可以看到這條消息被反復重試了多次

mq

最后達到我們設置的重試上限之后,消息就會被丟失了

使用 Message TTL 設置過期時間

使用 Message TTL這種方式,我們的隊列會存在時序的問題,這里來展開分析下:

使用 Message TTL這種方式,所有設置過期的消息是會放到一個隊列中的。因為消息的出隊是一條一條出的,只有第一個消息被消費了,才能處理第二條消息。如果第一條消息過期10s,第二條過期1s。第二條肯定比第一條先過期,理論上,應該先處理第二條。但是有上面討論的限制,如果第一條沒有被消費,第二條消息是不能被處理的。這就造成了時序問題,當然如果使用Queue TTL就不會有這種情況了,應為相同過期時間的消息在同一個隊列中,所以隊列頭部的消息總是最先過期的消息。那么這種情況如何去避免呢?

可以使用rabbitmq-delayed-message-exchange插件處理。rabbitmq-delayed-message-exchange插件地址

實現原理:

安裝插件后會生成新的Exchange類型x-delayed-message,處理的原則是延遲投遞。當接收到延遲消息之后,並不是直接投遞到目標隊列中,而是會把消息存儲到 mnesia 數據庫中,什么是 mnesia 可參考Mnesia 數據庫。當延遲時間到了的時候,通過x-delayed-message推送到目標隊列中。然后去消費目標隊列,就能避免過期的時序問題了。

來看下如何使用

這里使用一台虛擬機來演示,首先安裝 RabbitMQ,安裝過程可參考RabbitMQ 3.8.5

然后下載 rabbitmq-delayed-message-exchange 插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

$ cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/plugins

# 查看安裝的插件
$ rabbitmq-plugins list
  Listing plugins with pattern ".*" ...
   Configured: E = explicitly enabled; e = implicitly enabled
   | Status: * = running on rabbit@centos7-1
   |/
  [  ] rabbitmq_amqp1_0                  3.8.5
  [  ] rabbitmq_auth_backend_cache       3.8.5
  [  ] rabbitmq_auth_backend_http        3.8.5
  [  ] rabbitmq_auth_backend_ldap        3.8.5
  [  ] rabbitmq_auth_backend_oauth2      3.8.5
  [  ] rabbitmq_auth_mechanism_ssl       3.8.5
  [  ] rabbitmq_consistent_hash_exchange 3.8.5
  [E*] rabbitmq_delayed_message_exchange 3.9.0
  [  ] rabbitmq_event_exchange           3.8.5
  [  ] rabbitmq_federation               3.8.5

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
$ systemctl restart rabbitmq-server

修改上面的栗子,使用x-delayed-message

上代碼,demo地址 👏🏻

func (b *Broker) declareDelay(key string, job Jobber) error {
	keyNew := fmt.Sprintf("delay.%s", key)

	channel, err := b.getChannel(fmt.Sprintf("delay.%s", key))
	if err != nil {
		return err
	}
	defer channel.Close()

	exchangeNew := fmt.Sprintf("delay.%s", b.exchange)

	if err := channel.ExchangeDeclare(exchangeNew, "x-delayed-message", true, false, false, false, nil); err != nil {
		return fmt.Errorf("exchange declare error: %s", err)
	}

	queue, err := channel.QueueDeclare(fmt.Sprintf("delay.%s", job.Queue()), true, false, false, false, amqp.Table{
		"x-dead-letter-exchange":    b.exchange,
		"x-dead-letter-routing-key": key,
	})
	if err != nil {
		return fmt.Errorf("queue declare error: %s", err)
	}
	if err = channel.QueueBind(queue.Name, keyNew, exchangeNew, false, nil); err != nil {
		return fmt.Errorf("queue bind error: %s", err)
	}
	return nil
}

func (b *Broker) retry(ps *params, d amqp.Delivery) error {
	channel, err := b.conn.Channel()
	if err != nil {
		return err
	}
	defer channel.Close()

	retryCount, _ := d.Headers["x-retry-count"].(int32)

	if int(retryCount) >= len(ps.retryQueue) {
		return nil
	}
	fmt.Println("消息重試次數", retryCount+1)

	delay := ps.retryQueue[retryCount]

	if err := channel.ExchangeDeclare(fmt.Sprintf("delay.%s", b.exchange), "x-delayed-message", true, false, false, false, amqp.Table{
		"x-delayed-type": "direct",
	}); err != nil {
		return err
	}

	return channel.Publish(fmt.Sprintf("delay.%s", b.exchange), fmt.Sprintf("delay.%s", ps.key), false, false, amqp.Publishing{
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
		Expiration:   fmt.Sprintf("%d", delay),
	})
}

設置重試隊列中的消息類型是x-delayed-message,這樣就能使用剛剛下來的插件了。

通過面板推送一條消息之后,看下運行的結果

mq

其中dead-test-delayed-message_queue就是我們正常業務消費的隊列,delay.dead-test-delayed-message_queue存儲的是需要進行延遲消費的消息,這里面的消息,會在過期的時候通過死信的機制,被重推到dead-test-delayed-message_queue

看下控制台的輸出信息

mq

使用插件還是Queue TTL處理延遲隊列呢?

rabbitmq-delayed-message-exchange 相關限制:

  • 1、該插件不支持延遲消息的復制,在 RabbitMQ 鏡像集群模式下,如果其中的一個節點宕機,會存在消息不可用,只能等該節點重新啟動,才可以恢復;

  • 2、目前該插件只支持在磁盤節點上使用,當前還不支持ram節點;

  • 3、不適合具有大量延遲消息的情況(例如:數千或數百萬的延遲消息)。

This plugin is considered to be experimental yet fairly stable and potential suitable for production use as long as the user is aware of its limitations.
This plugin is not commercially supported by Pivotal at the moment but it doesn't mean that it will be abandoned or team RabbitMQ is not interested in improving it in the future. It is not, however, a high priority for our small team.
So, give it a try with your workload and decide for yourself.

這是官方對此的解釋,大概意思就是,這個還處於試驗階段,但還是相對穩定的。團隊對此插件的更新優先級不是很高,所以如果我們遇到問題了,可能還需要自己去修改。

如果有能力更改這個插件,畢竟這個是 erlang 寫的,那么就可以選擇這個了。

優點也是很明顯,開箱即用,處理邏輯比較簡單。

Queue TTL相關限制

如果我們需要處理的延遲數據的時間類型很多,那么就需要創建很多的隊列。當然,這個方案的優點就是透明,穩定,遇到問題容易排查。

參考

【Finding bottlenecks with RabbitMQ 3.3】https://blog.rabbitmq.com/posts/2014/04/finding-bottlenecks-with-rabbitmq-3-3
【你真的了解延時隊列嗎】https://juejin.cn/post/6844903648397525006
【RabbitMQ實戰指南】https://book.douban.com/subject/27591386/
【人工智能 rabbitmq 基於rabbitmq】https://www.dazhuanlan.com/ajin121212/topics/1209139
【rabbitmq-delayed-message-exchange】https://blog.51cto.com/kangfs/4115341
【Scheduling Messages with RabbitMQ】https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
【Centos7安裝RabbitMQ最新版3.8.5,史上最簡單實用安裝步驟】https://blog.csdn.net/weixin_40584261/article/details/106826044
【RabbitMQ中 prefetch_count,死信隊列和延遲隊列的使用】https://boilingfrog.github.io/2022/01/07/rabbitmq中高級特性的使用/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM