golang監聽rabbitmq消息隊列任務斷線自動重連接


需求背景:

goalng常駐內存任務腳本監聽rbmq執行任務

任務腳本由supervisor來管理

  當rabbitmq長時間斷開連接會出現如下圖 進程處於fatal狀態

 

 假如因為不可抗拒因素,rabbitmq服務器內存滿了或者其它原因導致rabbitmq消息隊列服務停止了

如果是短時間的停止重啟,supervisor是可以即時喚醒該程序。如果服務器長時間沒有恢復正常運行,程序就會出現fatal進程啟動失敗的狀態,此時可以通過告警來提醒開發人員

 

 如果以上告警能時時通知運維人員此問題可以略過了。今天討論的是如果在長時間斷開連接還能在服務器恢復正常情況下自動實現重連。

 

 

實現重連方式很多,下面實現方式比較簡單

使用通道實現通知

通知可以被看作是特殊的請求/回應用例。在一個通知用例中,我們並不關心回應的值,我們只關心回應是否已發生。 所以我們常常使用空結構體類型struct{}來做為通道的元素類型,因為空結構體類型的尺寸為零,能夠節省一些內存(雖然常常很少量)。

向一個通道發送一個值來實現單對單通知

我們已知道,如果一個通道中無值可接收,則此通道上的下一個接收操作將阻塞到另一個協程發送一個值到此通道為止。 所以一個協程可以向此通道發送一個值來通知另一個等待着從此通道接收數據的協程。

 

  1. Recv方法創建ampq鏈接
  2. 啟動協程開始執行任務
    1.   MqOpenChannel 打開一個channel通道處理amqp消息
    2.        拿到消息 處理任務

   3,協程中捕獲異常發送消息到 taskQuit <- struct{}{}

  4,主進程監聽taskQuit管道 開始嘗試重新鏈接amqp

    4.1如果在規定時間內鏈接成功  重新鏈接成功后啟動新的協程處理任務

       4.2如果在規定時間內沒有鏈接成功   則退出主進程

send.go 生產者

package main

import (
    "fmt"
    _ "fmt"
    "github.com/ichunt2019/golang-rabbitmq-2022/utils/rabbitmq"
)

func main() {


    for i := 0;i<20;i++{
        body := fmt.Sprintf("{\"order_id\":%d}",i)
        fmt.Println(body)

        /**
            使用默認的交換機
            如果是默認交換機
            type QueueExchange struct {
            QuName  string           // 隊列名稱
            RtKey   string           // key值
            ExName  string           // 交換機名稱
            ExType  string           // 交換機類型
            Dns     string              //鏈接地址
            }
            如果你喜歡使用默認交換機
            RtKey  此處建議填寫成 RtKey 和 QuName 一樣的值
         */

        queueExchange := rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            "direct",
            "amqp://guest:guest@192.168.1.169:5672/",
        }

        _ = rabbitmq.Send(queueExchange,body)

    }
}
View Code

 

 

recv.go 消費者

package main

import (
    "fmt"
    "github.com/ichunt2019/golang-rabbitmq-2022/utils/rabbitmq"
    "time"
)

type RecvPro struct {

}

//// 實現消費者 消費消息失敗 自動進入延時嘗試  嘗試3次之后入庫db
/*
返回值 error 為nil  則表示該消息消費成功
否則消息會進入ttl延時隊列  重復嘗試消費3次
3次后消息如果還是失敗 消息就執行失敗  進入告警 FailAction
 */
func (t *RecvPro) Consumer(dataByte []byte) error {
    time.Sleep(time.Second*1)
    //return errors.New("頂頂頂頂")
    fmt.Println(string(dataByte))
    //time.Sleep(1*time.Second)
    //return errors.New("頂頂頂頂")
    return nil
}

//消息已經消費3次 失敗了 請進行處理
/*
如果消息 消費3次后 仍然失敗  此處可以根據情況 對消息進行告警提醒 或者 補償  入庫db  釘釘告警等等
 */
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
    fmt.Println(string(dataByte))
    fmt.Println(err)
    fmt.Println("任務處理失敗了,我要進入db日志庫了")
    fmt.Println("任務處理失敗了,發送釘釘消息通知主人")
    return nil
}



func main() {
    processTask := &RecvPro{}

    /*
        runNums: 表示任務並發處理數量  一般建議 普通任務1-3    就可以了
        maxTryConnTimeFromMinute:表示最大嘗試時間  分鍾
     */
    err := rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.1.169:5672/",
    },
    processTask,4,2)
    if(err != nil){
        fmt.Println(err)
    }

}
View Code

 

 

receiver.go   golang-rbmq工具包

package rabbitmq

import (
    "errors"
    "strconv"
    "time"

    //"errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
)


// 定義全局變量,指針類型
var mqConn *amqp.Connection
var mqChan *amqp.Channel

// 定義生產者接口
type Producer interface {
    MsgContent() string
}

// 定義生產者接口
type RetryProducer interface {
    MsgContent() string
}

// 定義接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction(error , []byte)  error
}

// 定義RabbitMQ對象
type RabbitMQ struct {
    connection *amqp.Connection
    Channel *amqp.Channel
    dns string
    QueueName   string            // 隊列名稱
    RoutingKey  string            // key名稱
    ExchangeName string           // 交換機名稱
    ExchangeType string           // 交換機類型
    producerList []Producer
    retryProducerList []RetryProducer
    receiverList []Receiver
}

// 定義隊列交換機對象
type QueueExchange struct {
    QuName  string           // 隊列名稱
    RtKey   string           // key值
    ExName  string           // 交換機名稱
    ExType  string           // 交換機類型
    Dns     string              //鏈接地址
}



// 鏈接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){

    mqConn, err = amqp.Dial(r.dns)
    r.connection = mqConn   // 賦值給RabbitMQ對象

    if err != nil {
        fmt.Printf("rbmq鏈接失敗  :%s \n", err)
    }

    return
}

// 關閉mq鏈接
func (r *RabbitMQ)CloseMqConnect() (err error){

    err = r.connection.Close()
    if err != nil{
        fmt.Printf("關閉mq鏈接失敗  :%s \n", err)
    }
    return
}

// 鏈接rabbitMQ
func (r *RabbitMQ)MqOpenChannel() (err error){
    mqConn := r.connection
    r.Channel, err = mqConn.Channel()
    //defer mqChan.Close()
    if err != nil {
        fmt.Printf("MQ打開管道失敗:%s \n", err)
    }
    return err
}

// 鏈接rabbitMQ
func (r *RabbitMQ)CloseMqChannel() (err error){
    r.Channel.Close()
    if err != nil {
        fmt.Printf("關閉mq鏈接失敗  :%s \n", err)
    }
    return err
}




// 創建一個新的操作對象
func NewMq(q QueueExchange) RabbitMQ {
    return RabbitMQ{
        QueueName:q.QuName,
        RoutingKey:q.RtKey,
        ExchangeName: q.ExName,
        ExchangeType: q.ExType,
        dns:q.Dns,
    }
}

func (mq *RabbitMQ) sendMsg (body string) (err error)  {
    err = mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }

    defer func() {
        _ = mq.Channel.Close()
    }()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
        }
    }


    // 用於檢查隊列是否存在,已經存在不需要重復聲明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
    if err != nil {
        log.Printf("QueueDeclare err :%s \n", err)
    }
    // 綁定任務
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s \n", err)
        }
    }

    if mq.ExchangeName != "" && mq.RoutingKey != ""{
        err = mq.Channel.Publish(
            mq.ExchangeName,     // exchange
            mq.RoutingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
                DeliveryMode: 2,
            })
    }else{
        err = mq.Channel.Publish(
            "",     // exchange
            mq.QueueName, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
                DeliveryMode: 2,
            })
    }
    return

}


/*
發送延時消息
 */
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
    err =mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }
    defer mq.Channel.Close()

    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            return
        }
    }


    if ttl <= 0{
        return errors.New("發送延時消息,ttl參數是必須的")
    }

    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = mq.RoutingKey
    table["x-dead-letter-exchange"] = mq.ExchangeName
    table["x-message-ttl"] = ttl*1000

    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    // 用於檢查隊列是否存在,已經存在不需要重復聲明
    ttlstring := strconv.FormatInt(ttl,10)
    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)
    if err != nil {
        return
    }
    // 綁定任務
    if routingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
        if err != nil {
            return
        }
    }

    header := make(map[string]interface{},1)

    header["retry_nums"] = 0

    var ttl_exchange string
    var ttl_routkey string

    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
    }else{
        ttl_exchange = ""
    }


    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = routingKey
    }else{
        ttl_routkey = queueName
    }

    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
    if err != nil {
        return

    }
    return
}


func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }
    defer mq.Channel.Close()

    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
        }
    }

    //原始路由key
    oldRoutingKey := args[0]
    //原始交換機名
    oldExchangeName := args[1]

    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = oldRoutingKey
    if oldExchangeName != "" {
        table["x-dead-letter-exchange"] = oldExchangeName
    }else{
        mq.ExchangeName = ""
        table["x-dead-letter-exchange"] = ""
    }

    table["x-message-ttl"] = int64(20000)

    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    // 用於檢查隊列是否存在,已經存在不需要重復聲明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
    if err != nil {
        log.Printf("QueueDeclare err :%s \n", err)
    }
    // 綁定任務
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s \n", err)
        }
    }

    header := make(map[string]interface{},1)

    header["retry_nums"] = retry_nums + int32(1)

    var ttl_exchange string
    var ttl_routkey string

    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
    }else{
        ttl_exchange = ""
    }


    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = mq.RoutingKey
    }else{
        ttl_routkey = mq.QueueName
    }

    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
    if err != nil {
        fmt.Printf("MQ任務發送失敗:%s \n", err)

    }

}


// 監聽接收者接收任務 消費者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }
    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
        }
    }


    // 用於檢查隊列是否存在,已經存在不需要重復聲明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
    if err != nil {
        log.Printf("QueueDeclare err :%s \n", err)
    }
    // 綁定任務
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s \n", err)
        }
    }
    // 獲取消費通道,確保rabbitMQ一個一個發送消息
    err =  ch.Qos(1, 0, false)
    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
    if err != nil {
        log.Printf("Consume err :%s \n", err)
    }
    for msg := range msgList {
        retry_nums,ok := msg.Headers["retry_nums"].(int32)
        if(!ok){
            retry_nums = int32(0)
        }
        // 處理數據
        err := receiver.Consumer(msg.Body)
        if err!=nil {
            //消息處理失敗 進入延時嘗試機制
            if retry_nums < 3{
                fmt.Println(string(msg.Body))
                fmt.Printf("消息處理失敗 消息開始進入嘗試  ttl延時隊列 \n")
                retry_msg(msg.Body,retry_nums,QueueExchange{
                        mq.QueueName,
                        mq.RoutingKey,
                        mq.ExchangeName,
                        mq.ExchangeType,
                        mq.dns,
                    })
            }else{
                //消息失敗 入庫db
                fmt.Printf("消息處理3次后還是失敗了 入庫db 釘釘告警 \n")
                receiver.FailAction(err,msg.Body)
            }
            err = msg.Ack(true)
            if err != nil {
                fmt.Printf("確認消息未完成異常:%s \n", err)
            }
        }else {
            // 確認消息,必須為false
            err = msg.Ack(true)

            if err != nil {
                fmt.Printf("消息消費ack失敗 err :%s \n", err)
            }
        }

    }
}

//消息處理失敗之后 延時嘗試
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
    //原始隊列名稱 交換機名稱
    oldQName := queueExchange.QuName
    oldExchangeName := queueExchange.ExName
    oldRoutingKey := queueExchange.RtKey
    if oldRoutingKey == "" || oldExchangeName == ""{
        oldRoutingKey = oldQName
    }

    if queueExchange.QuName != "" {
        queueExchange.QuName = queueExchange.QuName + "_retry_3";
    }

    if queueExchange.RtKey != "" {
        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
    }else{
        queueExchange.RtKey = queueExchange.QuName + "_retry_3";
    }

//fmt.Printf("%+v",queueExchange)

    mq := NewMq(queueExchange)
    _ = mq.MqConnect()

    defer func(){
        _ = mq.CloseMqConnect()
    }()
    //fmt.Printf("%+v",queueExchange)
    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)


}


func Send(queueExchange QueueExchange,msg string) (err error){
    mq := NewMq(queueExchange)
    err = mq.MqConnect()
    if err != nil{
        return
    }

    defer func(){
        _ = mq.CloseMqConnect()
    }()

    err = mq.sendMsg(msg)

    return
}

//發送延時消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
    mq := NewMq(queueExchange)
    err = mq.MqConnect()
    if err != nil{
        return
    }
    defer func(){
        _ = mq.CloseMqConnect()
    }()
    err = mq.sendDelayMsg(msg,ttl)
    return
}


/*
runNums  開啟並發執行任務數量
 */
func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){
    var (
        exitTask bool
        maxTryConnNums int  //rbmq鏈接失敗后多久嘗試一次
        runNums int
        maxTryConnTimeFromMinute int
    )

    if(len(otherParams) <= 0){
        runNums = 1
        maxTryConnTimeFromMinute = 0
    }else if(len(otherParams) == 1){
        runNums = otherParams[0]
        maxTryConnTimeFromMinute = 0
    }else if(len(otherParams) == 2){
        runNums = otherParams[0]
        maxTryConnTimeFromMinute = otherParams[1]
    }


    //maxTryConnNums := 360 //rbmq鏈接失敗后最大嘗試次數
    //maxTryConnTime := time.Duration(10) //rbmq鏈接失敗后多久嘗試一次
    maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq鏈接失敗后最大嘗試次數
    maxTryConnTime := time.Duration(6) //rbmq鏈接失敗后多久嘗試一次
    mq := NewMq(queueExchange)
    //鏈接rabbitMQ
    err = mq.MqConnect()
    if(err != nil){
        return
    }

    defer func() {
        if panicErr := recover(); panicErr != nil{
            fmt.Println(recover())
            err = errors.New(fmt.Sprintf("%s",panicErr))
        }
    }()

    //rbmq斷開鏈接后 協程退出釋放信號
    taskQuit:= make(chan struct{}, 1)
    //嘗試鏈接rbmq
    tryToLinkC := make(chan struct{}, 1)

    //最大嘗試次數
    tryToLinkMaxNums := make(chan struct{}, 1)

    maxTryNums := 0 //嘗試重啟次數

    //開始執行任務
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);
    }

    //如果rbmq斷開連接后 嘗試重新建立鏈接
    var tryToLink = func() {
        for {
            maxTryNums += 1
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            }
            if(maxTryNums > maxTryConnNums){
                tryToLinkMaxNums <- struct{}{}
                break
            }
            //如果鏈接斷開了 10秒重新嘗試鏈接一次
            time.Sleep(time.Second * maxTryConnTime)
        }
        return
    }
    scheduleTimer := time.NewTimer(time.Millisecond*300)
    exitTask = true
    for{
        select {
        case <-tryToLinkC: //建立鏈接成功后 重新開啟協程執行任務
            fmt.Println("重新開啟新的協程執行任務")
            go Recv2(mq,receiver,taskQuit);
        case <-tryToLinkMaxNums://rbmq超出最大鏈接次數 退出任務
            fmt.Println("rbmq鏈接超過最大嘗試次數!")
            exitTask = false
            err = errors.New("rbmq鏈接超過最大嘗試次數!")
        case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接
            fmt.Println("rbmq斷開連接后 開始嘗試重新建立鏈接")
             go tryToLink()
        case <- scheduleTimer.C:
            //fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~")
        }
        // 重置調度間隔
        scheduleTimer.Reset(time.Millisecond*300)
        if !exitTask{
            break
        }
    }
    fmt.Println("exit")
    return
}


func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq鏈接失敗,協程任務退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
            return
        }()
        // 驗證鏈接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
            return
        }
        mq.ListenReceiver(receiver)
}


type retryPro struct {
    msgContent   string
}
View Code

 

 

核心代碼分析:

/*
runNums  開啟並發執行任務數量
 */
func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){
    var (
        exitTask bool
        maxTryConnNums int  //rbmq鏈接失敗后多久嘗試一次
        runNums int
        maxTryConnTimeFromMinute int
    )

    if(len(otherParams) <= 0){
        runNums = 1
        maxTryConnTimeFromMinute = 0
    }else if(len(otherParams) == 1){
        runNums = otherParams[0]
        maxTryConnTimeFromMinute = 0
    }else if(len(otherParams) == 2){
        runNums = otherParams[0]
        maxTryConnTimeFromMinute = otherParams[1]
    }


    //maxTryConnNums := 360 //rbmq鏈接失敗后最大嘗試次數
    //maxTryConnTime := time.Duration(10) //rbmq鏈接失敗后多久嘗試一次
    maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq鏈接失敗后最大嘗試次數
    maxTryConnTime := time.Duration(6) //rbmq鏈接失敗后多久嘗試一次
    mq := NewMq(queueExchange)
    //鏈接rabbitMQ
    err = mq.MqConnect()
    if(err != nil){
        return
    }

    defer func() {
        if panicErr := recover(); panicErr != nil{
            fmt.Println(recover())
            err = errors.New(fmt.Sprintf("%s",panicErr))
        }
    }()

    //rbmq斷開鏈接后 協程退出釋放信號
    taskQuit:= make(chan struct{}, 1)
    //嘗試鏈接rbmq
    tryToLinkC := make(chan struct{}, 1)

    //最大嘗試次數
    tryToLinkMaxNums := make(chan struct{}, 1)

    maxTryNums := 0 //嘗試重啟次數

    //開始執行任務
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);
    }

    //如果rbmq斷開連接后 嘗試重新建立鏈接
    var tryToLink = func() {
        for {
            maxTryNums += 1
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            }
            if(maxTryNums > maxTryConnNums){
                tryToLinkMaxNums <- struct{}{}
                break
            }
            //如果鏈接斷開了 10秒重新嘗試鏈接一次
            time.Sleep(time.Second * maxTryConnTime)
        }
        return
    }
    scheduleTimer := time.NewTimer(time.Millisecond*300)
    exitTask = true
    for{
        select {
        case <-tryToLinkC: //建立鏈接成功后 重新開啟協程執行任務
            fmt.Println("重新開啟新的協程執行任務")
            go Recv2(mq,receiver,taskQuit);
        case <-tryToLinkMaxNums://rbmq超出最大鏈接次數 退出任務
            fmt.Println("rbmq鏈接超過最大嘗試次數!")
            exitTask = false
            err = errors.New("rbmq鏈接超過最大嘗試次數!")
        case <- taskQuit ://rbmq斷開連接后 開始嘗試重新建立鏈接
            fmt.Println("rbmq斷開連接后 開始嘗試重新建立鏈接")
             go tryToLink()
        case <- scheduleTimer.C:
            //fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~")
        }
        // 重置調度間隔
        scheduleTimer.Reset(time.Millisecond*300)
        if !exitTask{
            break
        }
    }
    fmt.Println("exit")
    return
}


func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq鏈接失敗,協程任務退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
            return
        }()
        // 驗證鏈接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
            return
        }
        mq.ListenReceiver(receiver)
}

 

 

 

場景一:rbmq斷開連接  ,開始嘗試鏈接,鏈接次數超過最大次數  放棄鏈接  

 

場景二:rbmq斷開連接  ,開始嘗試鏈接,鏈接次數未超過最大次數,重新建議鏈接 開始消費任務

 

最新源碼倉庫地址:https://github.com/sunlongv520/golang-rabbitmq

 

其它:該rabbitmq包實現中包含了,消息處理失敗重試機制,有興趣的同學可以看看

  (重試和重連接是兩個概念)

  重連接 :rabbitmq鏈接失敗導致任務失敗,此時要等待rabbitmq服務器恢復正常后才能再次啟動協程處理任務

  重試:rabbitmq服務正常,消息消費進程也正常,但是消息處理失敗。嘗試多次消費消息后還是失敗就ack消息,在整個重試過程中不會阻塞消費

 

消息重試機制:https://www.cnblogs.com/sunlong88/p/11982741.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM