RabbitMQ官方教程一Hello World(GOLANG語言實現)


介紹

RabbitMQ是消息中間件:它接受並轉發消息。
您可以將其視為郵局系統:將要發送的郵件放在郵箱中時,
可以確保郵遞員最終將郵件傳遞給收件人。
以此類推,RabbitMQ是一個郵箱,一個郵局和一個郵遞員。

RabbitMQ與郵局之間的主要區別在於,
它不處理紙張,而是接收,存儲和轉發數據消息的二進制數據。

以下是RabbitMQ和消息發送的術語

  • Producer:生產者。負責生產消息。

生產者

  • Queue:隊列。負責存儲消息。隊列在RabbitMQ中充當郵箱的角色,消息傳遞到RabbitMQ中,只能存儲在隊列中。隊列受主機內存和磁盤大小的約束。本質是一個很大的消息緩沖區。
    許多生產者可以將消息發送到一個隊列,許多消費者可以嘗試從一個隊列接收數據。

隊列

  • Consumer:消費者。負責處理消息。

** 筆者補充

  • 參考連接

  • Connect:連接。生產者和RabbitMQ服務之間建立的TCP連接。

  • Channel:信道,一條連接可包含多條信道,不同信道之間通信互不干擾。考慮下多線程應用場景,每個線程對應一條信道,而不是對應一條連接,這樣可以提高性能。

  • body:消息主體,要傳遞的數據。

  • exchange:交換器,負責把消息轉發到對應的隊列。交換器本身沒有緩存消息的功能,消息是在隊列中緩存的,如果隊列不存在,則交換器會直接丟棄消息。常用的有四種類型的交換器:direct、fanout、topic、headers。不同類型的交換器有不同的交換規則,交換器會根據交換規則把消息轉發到對應的隊列。

  • exchangeName:交換器名稱,每個交換器對應一個名稱,發送消息時會附帶交換器名稱,根據交換器名稱選擇對應的交換器。

  • BandingKey:綁定鍵,一個隊列可以有一個到多個綁定鍵,通過綁定操作可以綁定交換器和隊列,交換器會根據綁定鍵的名稱找到對應的隊列。

  • RotingKey:路由鍵,發送消息時,需要附帶一條路由鍵,交換器會對路由鍵和綁定鍵進行匹配,如果匹配成功,則消息會轉發到綁定鍵對應的隊列中。

**簡而言之就是:

  1. 生產者指定路由Key和交換器的名字發送給RabbitMQ服務

  2. 指定名字的交換器根據路由key去找到綁定的隊列

  3. 將消息放入隊列當中

  4. 消費者從隊列中取出消息進行處理

生產者

運行圖

**linux安裝RabbitMQ服務

**Docker安裝RabbitMQ服務

實戰 "Hello World"

golang語言實現

在本教程的這一部分中,我們將用Go編寫兩個小程序。 發送單個消息的生產者和接收消息並打印出來的消費者。 我們將介紹Go RabbitMQ API中的一些細節,僅着眼於此非常簡單的事情。 這是消息傳遞的“ Hello World”。

在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。 中間的框是一個隊列-RabbitMQ代表使用者保留的消息緩沖區。

運行圖

Go RabbitMQ客戶端庫
RabbitMQ使用多種協議。 本教程使用AMQP 0-9-1,這是一種開放的通用消息傳遞協議。 RabbitMQ有許多不同語言的客戶。 在本教程中,我們將使用Go amqp客戶端。

go get github.com/streadway/amqp

生產者發送數據到隊列

運行圖

#send.go 生產者,發送消息到消息隊列中

package main

import (
	"github.com/streadway/amqp"
	"log"
)

func main(){
	// 連接RabbitMQ服務器
	conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 創建一個channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	
        // 聲明一個隊列
	q, err  := ch.QueueDeclare(
		"hello",			// 隊列名稱
		false,			// 是否持久化
		false,		// 是否自動刪除
		false,			// 是否獨立
		false,nil,
		)
	failOnError(err, "Failed to declare a queue")
	// 發送消息到隊列中
	body := "Hello World!"
	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing {
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	fmt.Println("send message success\n"
}

// 幫助函數檢測每一個amqp調用
func failOnError(err error, msg string)  {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}


消費者從隊列接收數據

運行圖


package main

import (
	"github.com/streadway/amqp"
	"log"
)

func main(){
	// 連接RabbitMQ服務器
	conn, err := amqp.Dial("amqp://admin:admin@47.97.215.189:5672/admin")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 創建一個channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 監聽隊列
	q, err  := ch.QueueDeclare(
		"hello",			// 隊列名稱
		false,			// 是否持久化
		false,		// 是否自動刪除
		false,			// 是否獨立
		false,nil,
	)
	failOnError(err, "Failed to declare a queue")
	// 消費隊列
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")
    // 申明一個goroutine,一遍程序始終監聽
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

// 幫助函數檢測每一個amqp調用
func failOnError(err error, msg string)  {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM