在工作中發現,有些時候消息因為某些原因在消費一次后,如果消息失敗,這時候不ack,消息就回一直重回隊列首部,造成消息擁堵。
如是有了如下思路:
消息進入隊列前,header默認有參數 retry_num=0 表示嘗試次數;
消費者在消費時候的,如果消息失敗,就把消息插入另外一個隊列(隊列abc);該隊列abc 綁定一個死信隊列(原始消費的隊列),這樣形成一個回路;
當消息失敗后,消息就進入隊列abc,隊列abc擁有ttl過期時間,ttl過期時間到了后,該消息進入死信隊列(死信隊列剛好是剛開始我們消費的隊列);
這樣消息就又回到原始消費隊列尾部了;
最后可以通過隊列消息頭部的header參數retry_num 可以控制消息消費多少次后,直接插入db日志;
db日志可以記錄交換機 路由,queuename,這樣,可以做一個后台管理,可以手動一次把消息重新放入隊列,進行消息(因為有時間消費隊列里面可能在請求其它服務,其它服務也可能會掛掉)
這時候消息無論你消費多少次都沒有用,但是入庫db后,可以一鍵重回隊列消息(當我們知道服務已經正常后)
圖解:
附上代碼
git clone https://github.com/sunlongv520/go-msgserver.git

package rabbitmq import ( "errors" "fmt" "github.com/streadway/amqp" "sync" "time" ) // 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定義生產者接口 type Producer interface { MsgContent() string } // 定義生產者接口 type RetryProducer interface { MsgContent() string } // 定義接收者接口 type Receiver interface { Consumer([]byte) error } // 定義RabbitMQ對象 type RabbitMQ struct { connection *amqp.Connection channel *amqp.Channel dns string queueName string // 隊列名稱 routingKey string // key名稱 exchangeName string // 交換機名稱 exchangeType string // 交換機類型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver mu sync.RWMutex wg sync.WaitGroup } // 定義隊列交換機對象 type QueueExchange struct { QuName string // 隊列名稱 RtKey string // key值 ExName string // 交換機名稱 ExType string // 交換機類型 Dns string //鏈接地址 } // 鏈接rabbitMQ func (r *RabbitMQ)mqConnect() (err error){ //var err error //RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", "guest", "guest", "192.168.2.232", 5672) //mqConn, err = amqp.Dial(RabbitUrl) mqConn, err = amqp.Dial(r.dns) r.connection = mqConn // 賦值給RabbitMQ對象 if err != nil { return err //fmt.Printf("MQ打開鏈接失敗:%s \n", err) } mqChan, err = mqConn.Channel() r.channel = mqChan // 賦值給RabbitMQ對象 if err != nil { return err //fmt.Printf("MQ打開管道失敗:%s \n", err) } return err } // 關閉RabbitMQ連接 func (r *RabbitMQ)mqClose() { // 先關閉管道,再關閉鏈接 err := r.channel.Close() if err != nil { fmt.Printf("MQ管道關閉失敗:%s \n", err) } err = r.connection.Close() if err != nil { fmt.Printf("MQ鏈接關閉失敗:%s \n", err) } } // 創建一個新的操作對象 func New(q *QueueExchange) *RabbitMQ { return &RabbitMQ{ queueName:q.QuName, routingKey:q.RtKey, exchangeName: q.ExName, exchangeType: q.ExType, dns:q.Dns, } } // 啟動RabbitMQ客戶端,並初始化 func (r *RabbitMQ) Start() (err error){ // 開啟監聽生產者發送任務 for _, producer := range r.producerList { err = r.listenProducer(producer) } // 開啟監聽接收者接收任務 for _, receiver := range r.receiverList { //r.listenReceiver(receiver) r.wg.Add(1) go func() { err = r.listenReceiver(receiver) }() } r.wg.Wait() time.Sleep(time.Microsecond*100) return err } type SendRbmqPro struct { msgContent string } // 實現生產者 func (t *SendRbmqPro) MsgContent() string { return t.msgContent } // 注冊發送指定隊列指定路由的生產者 func (r *RabbitMQ) RegisterProducer(msg string) { a := &SendRbmqPro{msgContent:msg} a.MsgContent() r.producerList = append(r.producerList, a) } // 發送任務 func (r *RabbitMQ) listenProducer(producer Producer) (err error){ // 驗證鏈接是否正常,否則重新鏈接 if r.channel == nil { err = r.mqConnect() if err !=nil { return err } } err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, false, nil) if err != nil { fmt.Printf("MQ注冊交換機失敗:%s \n", err) } _, err = r.channel.QueueDeclare(r.queueName, true, false, false, false, nil) if err != nil { fmt.Printf("MQ注冊隊列失敗:%s \n", err) } // 隊列綁定 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil) if err != nil { fmt.Printf("MQ綁定隊列失敗:%s \n", err) } header := make(map[string]interface{},1) header["retry_nums"] = int32(0) // 發送任務消息 err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(producer.MsgContent()), Headers:header, }) if err != nil { fmt.Printf("MQ任務發送失敗:%s \n", err) } return err } func (r *RabbitMQ) listenRetryProducer(producer RetryProducer,retry_nums int32 ,args ...string) { fmt.Println("消息處理失敗,進入延時隊列.....") //defer r.mqClose() // 驗證鏈接是否正常,否則重新鏈接 if r.channel == nil { r.mqConnect() } err := r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, false, nil) if err != nil { fmt.Printf("MQ注冊交換機失敗:%s \n", err) return } //原始路由key oldRoutingKey := args[0] //原始交換機名 oldExchangeName := args[1] table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = oldRoutingKey table["x-dead-letter-exchange"] = oldExchangeName table["x-message-ttl"] = int64(20000) _, err = r.channel.QueueDeclare(r.queueName, true, false, false, false, table) if err != nil { fmt.Printf("MQ注冊隊列失敗:%s \n", err) return } // 隊列綁定 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil) if err != nil { fmt.Printf("MQ綁定隊列失敗:%s \n", err) return } header := make(map[string]interface{},1) header["retry_nums"] = retry_nums + int32(1) // 發送任務消息 err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(producer.MsgContent()), Headers:header, }) if err != nil { fmt.Printf("MQ任務發送失敗:%s \n", err) return } } // 注冊接收指定隊列指定路由的數據接收者 func (r *RabbitMQ) RegisterReceiver(receiver Receiver) { r.mu.Lock() r.receiverList = append(r.receiverList, receiver) r.mu.Unlock() } // 監聽接收者接收任務 消費者 func (r *RabbitMQ) listenReceiver(receiver Receiver) (err error) { // 處理結束關閉鏈接 defer r.mqClose() defer r.wg.Done() //defer // 驗證鏈接是否正常 if r.channel == nil { err = r.mqConnect() if err != nil{ return errors.New(fmt.Sprintf("MQ注冊隊列失敗:%s \n", err)) } } // 用於檢查隊列是否存在,已經存在不需要重復聲明 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, false, nil) if err != nil { return errors.New(fmt.Sprintf("MQ注冊隊列失敗:%s \n", err)) } // 綁定任務 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil) if err != nil { return errors.New(fmt.Sprintf("綁定隊列失敗:%s \n", err)) } // 獲取消費通道,確保rabbitMQ一個一個發送消息 err = r.channel.Qos(1, 0, false) msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil) if err != nil { return errors.New(fmt.Sprintf("獲取消費通道異常:%s \n", err)) } for msg := range msgList { retry_nums,ok := msg.Headers["retry_nums"].(int32) if(!ok){ retry_nums = int32(0) } // 處理數據 err := receiver.Consumer(msg.Body) if err!=nil { //消息處理失敗 進入延時嘗試機制 if retry_nums < 3{ r.retry_msg(msg.Body,retry_nums) }else{ //消息失敗 入庫db fmt.Printf("消息處理失敗 入庫db") } err = msg.Ack(true) if err != nil { return errors.New(fmt.Sprintf("確認消息未完成異常:%s \n", err)) } }else { // 確認消息,必須為false err = msg.Ack(true) if err != nil { return errors.New(fmt.Sprintf("確認消息完成異常:%s \n", err)) } return err } } return } type retryPro struct { msgContent string } // 實現生產者 func (t *retryPro) MsgContent() string { return t.msgContent } //消息處理失敗之后 延時嘗試 func(r *RabbitMQ) retry_msg(Body []byte,retry_nums int32){ queueName := r.queueName+"_retry_3" routingKey := r.queueName+"_retry_3" exchangeName := r.exchangeName queueExchange := &QueueExchange{ queueName, routingKey, exchangeName, "direct", r.dns, } mq := New(queueExchange) msg := fmt.Sprintf("%s",Body) t := &retryPro{ msg, } mq.listenRetryProducer(t,retry_nums,r.routingKey,exchangeName) }

package main import ( "fmt" "github.com/ichunt2019/go-msgserver/utils/rabbitmq" "time" "errors" ) type RecvPro struct { } //// 實現消費者 消費消息失敗 自動進入延時嘗試 嘗試3次之后入庫db func (t *RecvPro) Consumer(dataByte []byte) error { fmt.Println(string(dataByte)) return errors.New("頂頂頂頂") //return nil } func main() { //消費者實現 下面接口即可 //type Receiver interface { // Consumer([]byte) error //} t := &RecvPro{} queueExchange := &rabbitmq.QueueExchange{ "fengkong_static_count", "fengkong_static_count", "fengkong_exchange", "direct", "amqp://guest:guest@192.168.2.232:5672/", } for{ mq := rabbitmq.New(queueExchange) mq.RegisterReceiver(t) err :=mq.Start() if err != nil{ fmt.Println(err) } time.Sleep(time.Second) } }

package main import ( "fmt" _ "fmt" "github.com/ichunt2019/go-msgserver/utils/rabbitmq" "strconv" ) func main() { queueExchange := &rabbitmq.QueueExchange{ "b_test_rabbit", "b_test_rabbit", "b_test_rabbit_mq", "direct", "amqp://guest:guest@192.168.2.232:5672/", } mq := rabbitmq.New(queueExchange) for i := 0;i<10;i++{ mq.RegisterProducer("這是測試任務"+strconv.Itoa(i)) } err := mq.Start() if(err != nil){ fmt.Println("發送消息失敗") } }