Golang調用Rabbitmq消息隊列和封裝


前言

介紹Rabbimq

Rabbitmq消息隊列是干嘛的?

簡單的說,消息隊列,引申一下就是傳遞消息用的隊列,也可以稱為傳遞消息的通信方法。用爭搶訂單的快車舉個例子,假如,A用戶發送了一個用車的消息,那么消息隊列要做的就是把A用戶用車的這個消息廣而告之,發送到一個公用隊列當中,司機只管取到消息,而不管是誰發布的,這就是一個簡單的消息隊列例子,Rabbitmq其實就是消息隊列的一種,用的比較多的還可能有Redis,kafka,ActiceMq等等,這個后面的博文里面我會說,這次我們只說Rabbimq消息隊列

Rabbitmq消息隊列的好處是什么?為什么我們要用他?

這個網上有很多類似的玩意,我不說太多,就只說我在使用中感覺比較好的地方。

  1. 分布式,多節點部署。一個集群,保證消息的持久化和高可用,某節點掛了,其他節點可以結力。

  2. 路由Exchange,這個已經提供了內部的幾種實現方法,可以指定路由,也就是指定傳遞的地址。

  3. 多語言支持,我以前干活兒用Python,現在用Go和java,人家無縫對接,多牛逼!

  4. Ack的消息確認機制,這樣就保證了,任務下發時候的穩定性,ack消息確認可以手動,也可以自動,這樣就保證了任務下發時候的可控和監控。

初步開始

簡單的生產者和消費者的模型

講那么多廢話理論,還不如直接開始寫代碼更直觀是吧,所以,奧莉給,干了兄弟們!我們實現一個簡答的生產者,消費者模型。這個不用我多解釋吧,基礎的流程就是,我們定義一個生產者,生產信息到Rabbitmq中,然后再定義一個消費者,把數據從Rabbitmq中取出來,就這么簡單,下面咱們就干了,先講幾個基礎。

Rabbitmq的基礎知識

發送 Publish

發送,你可以理解為上傳,意思就是,上傳一個消息到Rabbitmq當中。它這塊的基礎代碼比較簡單

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一個Rabbimtq連接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //創建一個channel的套接字連接
    ch, _ := conn.Channel()
    //創建一個指定的隊列
    _, err := ch.QueueDeclare(
        "work", // 隊列名
        false,   // durable
        false,   // 不使用刪除?
        false,   // exclusive
        false,   // 不必等待
        nil,     // arguments
    )
    if err != nil {
        fmt.Println(err)
        return
    }
    //定義上傳的消息
    body := "work message"
    //調用Publish上傳消息1到指定的work隊列當中
    err = ch.Publish(
        "",     // exchange
        "work", // 隊列名
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
                ContentType: "text/plain",
                //[]byte化body
                Body:        []byte(body),
        })
}

這樣就完成了上傳消息到work隊列當中。

接收 Consume

接收,顧名思義,就是接收到指定隊列中的信息,信息存在隊列當中,總要被拿出來用吧,放那里又不能下崽兒,所以,拿出來感覺用了才是最重要的。這塊的基礎代碼如下

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一個Rabbimtq連接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //創建一個channel的套接字連接
    ch, _ := conn.Channel()

    msgs, err := ch.Consume(
        "work" // 隊列名
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // 不等待
        nil,    // args
    )
    //定義一個forever,讓他駐留在后台,等待消息,來了就消費
    forever := make(chan bool)

    //執行一個go func 完成任務消費
    go func() {
        for d := range msgs {
            //打印body
            log.Printf("message %s", d.Body)
        }
    }()
    <-forever
}

生產者/消費者模型

上面簡單說了一下rabbimq的發送和接收,這下咱們就要實現一個生產者消費者模型了,這個模型的主要邏輯,就是生產者發送任務到指定的隊列,有一個,或者多個消費者,會在此留守,一有任務,就爭搶並且消費。

生產者邏輯

其實生產者邏輯和上面的發送邏輯差不多,這里給出寫法。

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一個Rabbimtq連接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //創建一個channel的套接字連接
    ch, _ := conn.Channel()
    //創建一個指定的隊列
    q, _ := ch.QueueDeclare(
        "work", // 隊列名
        false,   // durable
        false,   // 不使用刪除?
        false,   // exclusive
        false,   // 不必等待
        nil,     // arguments
    )
    //定義上傳的消息
    body := "work message"
    //調用Publish上傳消息1到指定的work隊列當中
    err = ch.Publish(
        "",     // exchange
        "work", // 隊列名
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
                ContentType: "text/plain",
                //[]byte化body
                Body:        []byte(body),
        })
}

消費者邏輯

消費者邏輯這邊,主要是加了一個qos控制和手動ack,代碼如下

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一個Rabbimtq連接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //創建一個channel的套接字連接
    ch, _ := conn.Channel()
    
    //創建一個qos控制
    err = ch.Qos(
            3,     // 同時最大消費數量(意思就是最多能消費幾個任務)
            0,     // prefetch size
            false, // 全局設定?
        )
    if err != nil {
            return err
            }
    msgs, err := ch.Consume(
        "work" // 隊列名
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // 不等待
        nil,    // args
    )
    //定義一個forever,讓他駐留在后台,等待消息,來了就消費
    forever := make(chan bool)

    //執行一個go func 完成任務消費
    go func() {
        for d := range msgs {
            //打印body
            log.Printf("message %s", string(d.Body))
            //手動ack,不管是否發送完畢。
            d.Ack(false)
        }
    }()
    <-forever
}

Golang封裝Rabbitmq的基礎接口

Rabbitmq會用了吧,上面那個估計比較簡單,但是估摸着你們還想要別的功能,好,那我就慣大家一次,干了兄弟們,奧莉給!

初始化Rabbitmq連接

為了避免每次重復調用Rabbitmq連接,我這里提供一個簡單寫法。

package main

import (
"context"
"fmt"

"github.com/streadway/amqp"
)

//Rabbitmq 初始化rabbitmq連接
type Rabbitmq struct {
        conn *amqp.Connection
        err  error
}

func New(ip string) (*Rabbitmq, error) {
        amqps := fmt.Sprintf("amqp://guest:guest@%s:5672/", ip)
        conn, err := amqp.Dial(amqps)
        if err != nil {
            return nil, err
        }
        rabbitmq := &Rabbitmq{
            conn: conn,
        }
        return rabbitmq, nil
}

創建一個Queue隊列

func (rabbitmq *Rabbitmq) CreateQueue(id string) error {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return err
        }
        _, err = ch.QueueDeclare(
            id,    // name
            true,  // durable
            false, // delete when unused
            false, // exclusive
            false, // no-wait
            nil,   // arguments
        )
        if err != nil {
            return err
        }
        return nil
}

上傳消息到指定的queue中

func (rabbitmq *Rabbitmq) PublishQueue(id string, body string) error {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return err
        }
        err = ch.Publish(
            "",    // exchange
            id,    // routing key
            false, // mandatory
            false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         []byte(body),
            })
        if err != nil {
            return err
        }
        return nil
}

從隊列中取出消息並且消費

func (rabbitmq *Rabbitmq) PublishQueue(id string, body string) error {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return err
        }
        err = ch.Publish(
            "",    // exchange
            id,    // routing key
            false, // mandatory
            false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         []byte(body),
            })
        if err != nil {
            return err
        }
        return nil
}

統計隊列中預備消費的數據

func (rabbitmq *Rabbitmq) GetReadyCount(id string) (int, error) {
        count := 0
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return count, err
        }
        state, err := ch.QueueInspect(id)
        if err != nil {
            return count, err
        }
        return state.Messages, nil
    }

統計消費者/正在消費的數據

func (rabbitmq *Rabbitmq) GetConsumCount(id string) (int, error) {
        count := 0
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return count, err
        }
        state, err := ch.QueueInspect(id)
        if err != nil {
            return count, err
        }
        return state.Consumers, nil
}

清理隊列

func (rabbitmq *Rabbitmq) ClearQueue(id string) (string, error) {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return "", err
        }
        _, err = ch.QueuePurge(id, false)
        if err != nil {
            return "", err
        }
        return "Clear queue success", nil
}

總結

簡單講了一下Rabbimtq是啥,怎么用,我是怎么用的。
完整代碼請訪問我的Github: https://github.com/Alexanderklau/Go_poject/blob/master/Go-Rabbitmq/rabbitmq.go
如果有不懂的歡迎留言!如果能幫大家的我一定會幫!也希望你們指出我的錯誤!一起進步!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM