go mod init github.com/ichunt2019/go-rabbitmq
D:\gocode\go-rabbitmq\utils\rabbitmq\receiver.go
主要是實現了rabbimq 生產者 消費者
消費者:實現失敗嘗試機制

package rabbitmq import ( //"errors" "fmt" "github.com/streadway/amqp" "log" ) // 定義全局變量,指針類型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定義生產者接口 type Producer interface { MsgContent() string } // 定義生產者接口 type RetryProducer interface { MsgContent() string } // 定義接收者接口 type Receiver interface { Consumer([]byte) error FailAction([]byte) error } // 定義RabbitMQ對象 type RabbitMQ struct { connection *amqp.Connection Channel *amqp.Channel dns string QueueName string // 隊列名稱 RoutingKey string // key名稱 ExchangeName string // 交換機名稱 ExchangeType string // 交換機類型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver } // 定義隊列交換機對象 type QueueExchange struct { QuName string // 隊列名稱 RtKey string // key值 ExName string // 交換機名稱 ExType string // 交換機類型 Dns string //鏈接地址 } // 鏈接rabbitMQ func (r *RabbitMQ)MqConnect() (err error){ mqConn, err = amqp.Dial(r.dns) r.connection = mqConn // 賦值給RabbitMQ對象 if err != nil { fmt.Printf("關閉mq鏈接失敗 :%s \n", err) } return } // 關閉mq鏈接 func (r *RabbitMQ)CloseMqConnect() (err error){ err = r.connection.Close() if err != nil{ fmt.Printf("關閉mq鏈接失敗 :%s \n", err) } return } // 鏈接rabbitMQ func (r *RabbitMQ)MqOpenChannel() (err error){ mqConn := r.connection r.Channel, err = mqConn.Channel() //defer mqChan.Close() if err != nil { fmt.Printf("MQ打開管道失敗:%s \n", err) } return err } // 鏈接rabbitMQ func (r *RabbitMQ)CloseMqChannel() (err error){ r.Channel.Close() if err != nil { fmt.Printf("關閉mq鏈接失敗 :%s \n", err) } return err } // 創建一個新的操作對象 func NewMq(q QueueExchange) RabbitMQ { return RabbitMQ{ QueueName:q.QuName, RoutingKey:q.RtKey, ExchangeName: q.ExName, ExchangeType: q.ExType, dns:q.Dns, } } func (mq *RabbitMQ) sendMsg (body string) { err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) } } // 用於檢查隊列是否存在,已經存在不需要重復聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 綁定任務 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } if mq.ExchangeName != "" && mq.RoutingKey != ""{ err = mq.Channel.Publish( mq.ExchangeName, // exchange mq.RoutingKey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) }else{ err = mq.Channel.Publish( "", // exchange mq.QueueName, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) } } func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string) { err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) } } //原始路由key oldRoutingKey := args[0] //原始交換機名 oldExchangeName := args[1] table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = oldRoutingKey if oldExchangeName != "" { table["x-dead-letter-exchange"] = oldExchangeName }else{ mq.ExchangeName = "" table["x-dead-letter-exchange"] = "" } table["x-message-ttl"] = int64(20000) //fmt.Printf("%+v",table) //fmt.Printf("%+v",mq) // 用於檢查隊列是否存在,已經存在不需要重復聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 綁定任務 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } header := make(map[string]interface{},1) header["retry_nums"] = retry_nums + int32(1) var ttl_exchange string var ttl_routkey string if(mq.ExchangeName != "" ){ ttl_exchange = mq.ExchangeName }else{ ttl_exchange = "" } if mq.RoutingKey != "" && mq.ExchangeName != ""{ ttl_routkey = mq.RoutingKey }else{ ttl_routkey = mq.QueueName } //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey) err = mq.Channel.Publish( ttl_exchange, // exchange ttl_routkey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), Headers:header, }) if err != nil { fmt.Printf("MQ任務發送失敗:%s \n", err) } } // 監聽接收者接收任務 消費者 func (mq *RabbitMQ) ListenReceiver(receiver Receiver,routineNum int) { err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) } } // 用於檢查隊列是否存在,已經存在不需要重復聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 綁定任務 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } // 獲取消費通道,確保rabbitMQ一個一個發送消息 err = ch.Qos(1, 0, false) msgList, err := ch.Consume(mq.QueueName, "", false, false, false, false, nil) if err != nil { log.Printf("Consume err :%s \n", err) } for msg := range msgList { retry_nums,ok := msg.Headers["retry_nums"].(int32) if(!ok){ retry_nums = int32(0) } // 處理數據 err := receiver.Consumer(msg.Body) if err!=nil { //消息處理失敗 進入延時嘗試機制 if retry_nums < 3{ fmt.Println(string(msg.Body)) fmt.Printf("消息處理失敗 消息開始進入嘗試 ttl延時隊列 \n") retry_msg(msg.Body,retry_nums,QueueExchange{ mq.QueueName, mq.RoutingKey, mq.ExchangeName, mq.ExchangeType, mq.dns, }) }else{ //消息失敗 入庫db fmt.Printf("消息處理3次后還是失敗了 入庫db 釘釘告警 \n") receiver.FailAction(msg.Body) } err = msg.Ack(true) if err != nil { fmt.Printf("確認消息未完成異常:%s \n", err) } }else { // 確認消息,必須為false err = msg.Ack(true) if err != nil { fmt.Printf("消息消費ack失敗 err :%s \n", err) } } } } //消息處理失敗之后 延時嘗試 func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){ //原始隊列名稱 交換機名稱 oldQName := queueExchange.QuName oldExchangeName := queueExchange.ExName oldRoutingKey := queueExchange.RtKey if oldRoutingKey == "" || oldExchangeName == ""{ oldRoutingKey = oldQName } if queueExchange.QuName != "" { queueExchange.QuName = queueExchange.QuName + "_retry_3"; } if queueExchange.RtKey != "" { queueExchange.RtKey = queueExchange.RtKey + "_retry_3"; }else{ queueExchange.RtKey = queueExchange.QuName + "_retry_3"; } //fmt.Printf("%+v",queueExchange) mq := NewMq(queueExchange) mq.MqConnect() defer func(){ mq.CloseMqConnect() }() //fmt.Printf("%+v",queueExchange) mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName) } func Send(queueExchange QueueExchange,msg string){ mq := NewMq(queueExchange) mq.MqConnect() defer func(){ mq.CloseMqConnect() }() mq.sendMsg(msg) } /* runNums 開啟並發執行任務數量 */ func Recv(queueExchange QueueExchange,receiver Receiver,runNums int){ mq := NewMq(queueExchange) mq.MqConnect() defer func(){ mq.CloseMqConnect() }() forever := make(chan bool) for i:=1;i<=runNums;i++{ go func(routineNum int) { defer mq.Channel.Close() // 驗證鏈接是否正常 mq.MqOpenChannel() mq.ListenReceiver(receiver,routineNum) }(i) } <-forever } type retryPro struct { msgContent string }
D:\gocode\go-rabbitmq\demo\recv.go
package main import ( "fmt" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq" "time" ) type RecvPro struct { } //// 實現消費者 消費消息失敗 自動進入延時嘗試 嘗試3次之后入庫db /* 返回值 error 為nil 則表示該消息消費成功 否則消息會進入ttl延時隊列 重復嘗試消費3次 3次后消息如果還是失敗 消息就執行失敗 進入告警 FailAction */ func (t *RecvPro) Consumer(dataByte []byte) error { //time.Sleep(500*time.Microsecond) //return errors.New("頂頂頂頂") fmt.Println(string(dataByte)) time.Sleep(1*time.Second) //return errors.New("頂頂頂頂") return nil } //消息已經消費3次 失敗了 請進行處理 /* 如果消息 消費3次后 仍然失敗 此處可以根據情況 對消息進行告警提醒 或者 補償 入庫db 釘釘告警等等 */ func (t *RecvPro) FailAction(dataByte []byte) error { fmt.Println(string(dataByte)) fmt.Println("任務處理失敗了,我要進入db日志庫了") fmt.Println("任務處理失敗了,發送釘釘消息通知主人") return nil } func main() { t := &RecvPro{} //rabbitmq.Recv(rabbitmq.QueueExchange{ // "a_test_0001", // "a_test_0001", // "", // "", // "amqp://guest:guest@192.168.2.232:5672/", //},t,5) /* runNums: 表示任務並發處理數量 一般建議 普通任務1-3 就可以了 */ rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.2.232:5672/", },t,3) }
D:\gocode\go-rabbitmq\demo\send.go
package main import ( "fmt" _ "fmt" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq" ) func main() { for i := 1;i<10;i++{ body := fmt.Sprintf("{\"order_id\":%d}",i) fmt.Println(body) /** 使用默認的交換機 如果是默認交換機 type QueueExchange struct { QuName string // 隊列名稱 RtKey string // key值 ExName string // 交換機名稱 ExType string // 交換機類型 Dns string //鏈接地址 } 如果你喜歡使用默認交換機 RtKey 此處建議填寫成 RtKey 和 QuName 一樣的值 */ //queueExchange := rabbitmq.QueueExchange{ // "a_test_0001", // "a_test_0001", // "", // "", // "amqp://guest:guest@192.168.2.232:5672/", //} /* 使用自定義的交換機 */ queueExchange := rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.2.232:5672/", } rabbitmq.Send(queueExchange,body) } }
使用說明:
go get github.com/ichunt2019/go-rabbitmq
發送消息
package main import ( "fmt" _ "fmt" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq" ) func main() { for i := 1;i<10;i++{ body := fmt.Sprintf("{\"order_id\":%d}",i) fmt.Println(body) /** 使用默認的交換機 如果是默認交換機 type QueueExchange struct { QuName string // 隊列名稱 RtKey string // key值 ExName string // 交換機名稱 ExType string // 交換機類型 Dns string //鏈接地址 } 如果你喜歡使用默認交換機 RtKey 此處建議填寫成 RtKey 和 QuName 一樣的值 */ //queueExchange := rabbitmq.QueueExchange{ // "a_test_0001", // "a_test_0001", // "", // "", // "amqp://guest:guest@192.168.2.232:5672/", //} /* 使用自定義的交換機 */ queueExchange := rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.2.232:5672/", } rabbitmq.Send(queueExchange,body) } }
消費消息
package main import ( "fmt" "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq" "time" ) type RecvPro struct { } //// 實現消費者 消費消息失敗 自動進入延時嘗試 嘗試3次之后入庫db /* 返回值 error 為nil 則表示該消息消費成功 否則消息會進入ttl延時隊列 重復嘗試消費3次 3次后消息如果還是失敗 消息就執行失敗 進入告警 FailAction */ func (t *RecvPro) Consumer(dataByte []byte) error { //time.Sleep(500*time.Microsecond) //return errors.New("頂頂頂頂") fmt.Println(string(dataByte)) time.Sleep(1*time.Second) //return errors.New("頂頂頂頂") return nil }
//消息已經消費3次 失敗了 請進行處理 /* 如果消息 消費3次后 仍然失敗 此處可以根據情況 對消息進行告警提醒 或者 補償 入庫db 釘釘告警等等 */ func (t *RecvPro) FailAction(dataByte []byte) error { fmt.Println(string(dataByte)) fmt.Println("任務處理失敗了,我要進入db日志庫了") fmt.Println("任務處理失敗了,發送釘釘消息通知主人") return nil } func main() { t := &RecvPro{} //rabbitmq.Recv(rabbitmq.QueueExchange{ // "a_test_0001", // "a_test_0001", // "", // "", // "amqp://guest:guest@192.168.2.232:5672/", //},t,5) /* runNums: 表示任務並發處理數量 一般建議 普通任務1-3 就可以了 */ rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.2.232:5672/", },t,3) } 說明: rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.2.232:5672/", },t,3)
第一個參數 QueueExchange說明
// 定義隊列交換機對象 type QueueExchange struct { QuName string // 隊列名稱 RtKey string // key值 ExName string // 交換機名稱 ExType string // 交換機類型 Dns string //鏈接地址 }
第二個參數 type Receiver interface說明
Consumer | FailAction |
---|---|
拿到消息后,用戶可以處理任務,如果消費成功 返回nil即可,如果處理失敗,返回一個自定義error即可 | 由於消息內部自帶消息失敗嘗試3次機制,3次如果失敗后就沒必要一直存儲在mq,所以此處擴展,可以用作消息補償和告警 |
// 定義接收者接口 type Receiver interface { Consumer([]byte) error FailAction([]byte) error }
第三個參數:runNusm
runNusm |
---|
消息並發數,同時可以處理多少任務 普通任務 設置為1即可 需要並發的設置成3-5即可 |