go操作RabbitMQ


RabbitMQ服務器安裝
1、安裝erlang

wget https://www.rabbitmq.com/releases/erlang/erlang-18.2-1.el6.x86_64.rpm

2、安裝RabbitMQ

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el6.noarch.rpm

3、 常用命令

systemctl start rabbitmq-server 啟動
rabbitmq stop 停止
rabbitmq-plugins list 插件命令
rabbitmq-plugins enable rabbitmq_management 安裝管理插件
rabbitmq-plugins disable rabbitmq_management 卸載管理插件

4、 瀏覽器打開

端口號默認:15672<br/>
密碼和用戶名默認:guest
http://127.0.0.1:15672/#/
5、常見錯誤
錯誤提示:zsh: command not found: rabbitmq-plugins<br/>
解決辦法:
第一種:export PATH=/usr/local/Cellar/rabbitmq/3.8.2/sbin/:$PATH<br/>
第二種:1: vim .bash_profile(前提是存在該文件,如果不存在,可以先創建mkdir .bash_profile,之后再執行vi編輯)<br/>
2:export PATH=/usr/local/Cellar/rabbitmq/3.8.2/sbin/sbin/:$PATH

最后:source ~/.bash_profile

## RabbitMQ核心概念
### Virtual Hosts管理
像mysql擁有數據庫的概念並且可以指定用戶對庫和表等操作的權限。那RabbitMQ呢?RabbitMQ也有類似的權限管理。在RabbitMQ中可以虛擬消息服務器VirtualHost,每個VirtualHost相當於一個相對獨立的RabbitMQ服務器,每個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。 相當於mysql的db。Virtual Name一般以/開頭<br/>
1、創建Virtual Hosts:
Admin->Virtual Hosts->Add a new virtual host<br/>
2、創建用戶:Admin->Users->Add a user<br/>
2、對用戶進行授權,點擊需要授權的vhosts->Permissions->Set permission

## RabbitMQ五種模式
url格式:amqp:// 賬號 密碼@地址:端口號/vhost <br/>
1、Simple模式 最簡單最常用的模式,一個消息只能被一個消費者消費<br/>

 

 

2、Work模式,一個消息只能被一個消費者消費

 

 

3、Publish/Subscribe訂閱模式,消息被路由投遞給多個隊列,一個消息被多個消費者獲取,生產端不允許指定消費

 

 

4、Routing路由模式,一個消息被多個消費者獲取,並且消息的目標隊列可以被生產者指定

 

 

5、Topic話題模式,一個消息被多個消息獲取,消息的目標queue可用BindKey以通配符,(#:一個或多個詞,*:一個詞)的方式指定。

 

 

 

示例代碼

package RabbitMQ

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)
//amqp:// 賬號 密碼@地址:端口號/vhost
const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"

type RabbitMQ struct {
    //連接
    conn *amqp.Connection
    //管道
    channel *amqp.Channel
    //隊列名稱
    QueueName string
    //交換機
    Exchange string
    //key Simple模式 幾乎用不到
    Key string
    //連接信息
    Mqurl string
}

//創建RabbitMQ結構體實例
func NewRabbitMQ(queuename string, exchange string,key string) *RabbitMQ {
    rabbitmq := &RabbitMQ{QueueName:queuename,Exchange:exchange,Key:key,Mqurl:MQURL}
    var err error
    //創建rabbitmq連接
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "創建連接錯誤!")
    rabbitmq.channel,err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err,"獲取channel失敗")
    return rabbitmq
}

//斷開channel和connection
func (r *RabbitMQ) Destory() {
    r.channel.Close()
    r.conn.Close()
}

//錯誤處理函數
func (r *RabbitMQ) failOnErr (err error,message string)  {
    if err !=nil {
        log.Fatalf("%s:%s",message,err)
        panic(fmt.Sprintf("%s:%s",message, err))
    }
}

//簡單模式step:1。創建簡單模式下RabbitMQ實例
func NewRabbitMQSimple(queueName string) * RabbitMQ  {
    return NewRabbitMQ(queueName, "", "")
}

//訂閱模式創建rabbitmq實例
func NewRabbitMQPubSub(exchangeName string) * RabbitMQ  {
    //創建rabbitmq實例
    rabbitmq := NewRabbitMQ("", exchangeName, "")
    var err error
    //獲取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
    //獲取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "failed to open a channel!")
    return rabbitmq
}

//訂閱模式生成
func (r *RabbitMQ) PublishPub(message string) {
    //嘗試創建交換機,不存在創建
    err := r.channel.ExchangeDeclare(
        //交換機名稱
        r.Exchange,
        //交換機類型 廣播類型
        "fanout",
        //是否持久化
        true,
        //是否字段刪除
        false,
        //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        nil,
        )
    r.failOnErr(err, "failed to declare an excha" + "nge")

    //2 發送消息
    err = r.channel.Publish(
        r.Exchange,
        "",
        false,
        false,
        amqp.Publishing{
            //類型
            ContentType:"text/plain",
            //消息
            Body:[]byte(message),
        }, )
}

//訂閱模式消費端代碼
func (r * RabbitMQ) RecieveSub()  {
    //嘗試創建交換機,不存在創建
    err := r.channel.ExchangeDeclare(
        //交換機名稱
        r.Exchange,
        //交換機類型 廣播類型
        "fanout",
        //是否持久化
        true,
        //是否字段刪除
        false,
        //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        nil,
    )
    r.failOnErr(err, "failed to declare an excha" + "nge")
    //2試探性創建隊列,創建隊列
    q, err := r.channel.QueueDeclare(
        "",//隨機生產隊列名稱
        false,
        false,
        true,
        false,
        nil,
        )
    r.failOnErr(err, "Failed to declare a queue")
    //綁定隊列到exchange中
    err = r.channel.QueueBind(
        q.Name,
        //在pub/sub模式下,這里的key要為空
        "",
        r.Exchange,
        false,
        nil,
        )
    //消費消息
    message, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
        )
    forever := make(chan bool)
     go func() {
         for d := range message {
             log.Printf("Received a message:%s,", d.Body)
        }
    }()
    fmt.Println("退出請按 Ctrl+C")
    <- forever
}

//話題模式 創建RabbitMQ實例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
    //創建rabbitmq實例
    rabbitmq := NewRabbitMQ("", exchagne, routingKey)
    var err error
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err,"failed     to connect rabbingmq!")
    rabbitmq.channel,err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "failed to open a channel")
    return  rabbitmq
}



//話題模式發送信息
func (r * RabbitMQ) PublishTopic(message string) {
    //嘗試創建交換機,不存在創建
    err := r.channel.ExchangeDeclare(
        //交換機名稱
        r.Exchange,
        //交換機類型 話題模式
        "topic",
        //是否持久化
        true,
        //是否字段刪除
        false,
        //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        nil,
    )
    r.failOnErr(err, "topic failed to declare an excha" + "nge")
    //2發送信息
    err = r.channel.Publish(
        r.Exchange,
        //要設置
        r.Key,
        false,
        false,
        amqp.Publishing{
            //類型
            ContentType:"text/plain",
            //消息
            Body:[]byte(message),
        }, )
}

//話題模式接收信息
//要注意key
//其中* 用於匹配一個單詞,#用於匹配多個單詞(可以是零個)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic()  {
    //嘗試創建交換機,不存在創建
    err := r.channel.ExchangeDeclare(
        //交換機名稱
        r.Exchange,
        //交換機類型 話題模式
        "topic",
        //是否持久化
        true,
        //是否字段刪除
        false,
        //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        nil,
    )
    r.failOnErr(err, "failed to declare an excha" + "nge")
    //2試探性創建隊列,創建隊列
    q, err := r.channel.QueueDeclare(
        "",//隨機生產隊列名稱
        false,
        false,
        true,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare a queue")
    //綁定隊列到exchange中
    err = r.channel.QueueBind(
        q.Name,
        //在pub/sub模式下,這里的key要為空
        r.Key,
        r.Exchange,
        false,
        nil,
    )
    //消費消息
    message, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    forever := make(chan bool)
    go func() {
        for d := range message {
            log.Printf("Received a message:%s,", d.Body)
        }
    }()
    fmt.Println("退出請按 Ctrl+C")
    <- forever
}


//路由模式 創建RabbitMQ實例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
    //創建rabbitmq實例
    rabbitmq := NewRabbitMQ("", exchagne, routingKey)
    var err error
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err,"failed     to connect rabbingmq!")
    rabbitmq.channel,err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "failed to open a channel")
    return  rabbitmq
}

//路由模式發送信息
func (r * RabbitMQ) PublishRouting(message string) {
    //嘗試創建交換機,不存在創建
    err := r.channel.ExchangeDeclare(
        //交換機名稱
        r.Exchange,
        //交換機類型 廣播類型
        "direct",
        //是否持久化
        true,
        //是否字段刪除
        false,
        //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        nil,
    )
    r.failOnErr(err, "failed to declare an excha" + "nge")
    //發送信息
    err = r.channel.Publish(
        r.Exchange,
        //要設置
        r.Key,
        false,
        false,
        amqp.Publishing{
            //類型
            ContentType:"text/plain",
            //消息
            Body:[]byte(message),
        }, )
}

//路由模式接收信息
func (r *RabbitMQ) RecieveRouting()  {
    //嘗試創建交換機,不存在創建
    err := r.channel.ExchangeDeclare(
        //交換機名稱
        r.Exchange,
        //交換機類型 廣播類型
        "direct",
        //是否持久化
        true,
        //是否字段刪除
        false,
        //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        nil,
    )
    r.failOnErr(err, "failed to declare an excha" + "nge")
    //2試探性創建隊列,創建隊列
    q, err := r.channel.QueueDeclare(
        "",//隨機生產隊列名稱
        false,
        false,
        true,
        false,
        nil,
    )
    r.failOnErr(err, "Failed to declare a queue")
    //綁定隊列到exchange中
    err = r.channel.QueueBind(
        q.Name,
        //在pub/sub模式下,這里的key要為空
        r.Key,
        r.Exchange,
        false,
        nil,
    )
    //消費消息
    message, err := r.channel.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    forever := make(chan bool)
    go func() {
        for d := range message {
            log.Printf("Received a message:%s,", d.Body)
        }
    }()
    fmt.Println("退出請按 Ctrl+C")
    <- forever
}
//簡單模式Step:2、簡單模式下生產代碼
func (r *RabbitMQ) PublishSimple (message string) {
    //1、申請隊列,如果隊列存在就跳過,不存在創建
    //優點:保證隊列存在,消息能發送到隊列中
    _, err := r.channel.QueueDeclare(
        //隊列名稱
        r.QueueName,
        //是否持久化
        false,
        //是否為自動刪除 當最后一個消費者斷開連接之后,是否把消息從隊列中刪除
        false,
        //是否具有排他性 true表示自己可見 其他用戶不能訪問
        false,
        //是否阻塞 true表示要等待服務器的響應
        false,
        //額外數學系
        nil,
        )
    if err != nil {
        fmt.Println(err)
    }

    //2.發送消息到隊列中
    r.channel.Publish(
        //默認的Exchange交換機是default,類型是direct直接類型
        r.Exchange,
        //要賦值的隊列名稱
        r.QueueName,
        //如果為true,根據exchange類型和routkey規則,如果無法找到符合條件的隊列那么會把發送的消息返回給發送者
        false,
        //如果為true,當exchange發送消息到隊列后發現隊列上沒有綁定消費者,則會把消息還給發送者
        false,
        //消息
        amqp.Publishing{
            //類型
            ContentType:"text/plain",
            //消息
            Body:[]byte(message),
        })
}

func (r *RabbitMQ) ConsumeSimple() {
    //1、申請隊列,如果隊列存在就跳過,不存在創建
    //優點:保證隊列存在,消息能發送到隊列中
    _, err := r.channel.QueueDeclare(
        //隊列名稱
        r.QueueName,
        //是否持久化
        false,
        //是否為自動刪除 當最后一個消費者斷開連接之后,是否把消息從隊列中刪除
        false,
        //是否具有排他性
        false,
        //是否阻塞
        false,
        //額外數學系
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    //接收消息
    msgs, err := r.channel.Consume(
        r.QueueName,
        //用來區分多個消費者
        "",
        //是否自動應答
        true,
        //是否具有排他性
        false,
        //如果設置為true,表示不能同一個connection中發送的消息傳遞給這個connection中的消費者
        false,
        //隊列是否阻塞
        false,
        nil,
        )
    if err!=nil {
        fmt.Println(err)
    }
    forever := make(chan bool)

    //啟用協程處理
    go func() {
        for d := range msgs {
            //實現我們要處理的邏輯函數
            log.Printf("Received a message:%s",d.Body)
            //fmt.Println(d.Body)
        }
    }()

    log.Printf("【*】warting for messages, To exit press CCTRAL+C")
    <- forever
}

測試

//Simple模式 發送者
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
rabbitmq.PublishSimple("hello imooc!")
//接收者
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
rabbitmq.ConsumeSimple()

//訂閱模式發送者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
    for i :=0; i<=100 ; i++ {
        rabbitmq.PublishPub("訂閱模式生產第" + strconv.Itoa(i) + "條數據")
        fmt.Println(i)
        time.Sleep(1 * time.Second)
    }
//接收者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
rabbitmq.RecieveSub()

//路由模式發送者
imoocOne := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
    imoocTwo := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")

    for i :=0; i<=10; i++  {
        imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
        imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
        time.Sleep(1 * time.Second)
        fmt.Println(i)
    }
//接收者
rabbitmq := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
rabbitmq.RecieveRouting()

//Topic模式發送者
imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
    imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.four")

    for i :=0; i<=10; i++  {
        imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
        imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
        time.Sleep(1 * time.Second)
        fmt.Println(i)
    }
//Topic接收者
rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#")
rabbitmq.RecieveTopic()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM