RabbitMQ協議基礎及C++和Java混合開發


目前面對大多數的需要在異構系統間進行消息傳遞技術路線,大多會選擇socket或webservice。這兩種技術的共同特點是耦合緊,調試依賴雙方同步,但是效率高。除此以外,使用消息隊列(MQ)的應用場景也偶爾能遇到。本文就將要從AMQP協議說起,重點介紹利用RabbitMQ實現C++和Java跨系統開發的實踐。

一、AMQP是什么

AMQP又稱為高級消息隊列協議,是一種進程間進行異步消息的網絡協議。它的出現是為了讓各類消息中間件提供統一服務,以降低系統集成的開銷。目前,完全准尋AMQP協議的消息中間件只有RabbitMQ。雖然各大中間件產品也都針對不同的語言推出了客戶端。但是,無論是從業務適應性還是集成通用性上來說,比較推薦的還是RabbitMQ。不同的消息中間件在性能上的差異網上資料很多,這里不再贅述。

amqp協議和http協議一樣都是建立在TCP/IP協議簇之上的應用層協議。不同於http協議的,它是一個二進制協議,具有多信道,異步,高效等特點。amqp協議規定了從消息發布者到消息接收者之間的消息傳遞方式,並且提出了交換機(Exchange)隊列(Queue)以及他們之間的路由(Routing)。

作為一套標准協議,使用者甚至可以完全根據amqp的協議規范定制化的開發出客戶端和RabbitMQ通信,這一特點也讓RabbitMQ在業務通用性上具備了得天獨厚的優勢。標准的amqp協議格式如下:

amqp://<username>:<password>@<host>:<port>/<virtual>

username: 用戶名

password: 登錄密碼

host: 服務所在主機地址

port: 服務端口號

virtual: 虛擬路徑

AMQP協議最值得學習的地方在於,它定義了消息的發送和投遞過程:

  交換機(Exchange)負責接收消息,並根據提前指定的規則(Routing)投送消息到特定隊列(Queue)。消費者監聽隊列,並處理消息。如果多個消費者監聽同一個隊列,消息一般會輪流的發送給它們。以實現負載均衡。此外,通過虛擬路徑約束還允許在不同的虛擬路徑下建立同命隊列。

AMQP協議默認提供了四種類型的交換機:

直接交換機(Direct Exchange):根據路由鍵的不同將消息直接發送到不同隊列,未匹配路由鍵的消息會被丟棄。

扇形交換機(Funout Exchange):扇形交換機是實現廣播的基礎,它能夠同時將消息推送給多個隊列。

主題交換機(Topic Exchange):交換機會根據路由鍵進行模糊匹配,從而完成消息投送。

頭交換機(Header Exchange):它不依賴特定路由鍵,而是將投送目標寫在消息頭,支持字典類型,配置更加靈活。

二、C++開發指南

官網提供了其它常見語言的開發向導,對於C++個人推薦使用AMQP-CPP這套庫。另外還需要一套網絡庫支持,個人也推薦libevent。編譯方法可以參考github上的說明。發送方式區別於傳統的socket,你不應該將一條消息分多個部分發送。因此推薦使用對象序列化模型直接轉換為字節數組,同樣受到tcp/ip傳輸的制約,你應該選擇高效的序列化工具來進行。個人推薦使用protobuf,同樣作為一種跨平台的支持。

下面以一套RPC調用為例進行說明:

#include <iostream>
#include "event2/event.h"
#include "amqpcpp.h"
#include "amqpcpp/libevent.h"
#include "amqp_msg.pb.h"
#include <string>

using namespace std;
using namespace amqp;

int main()
{
    event_base *base = event_base_new(); // 通過libevent啟動實踐循環
    AMQP::LibEventHandler handler(base);
    AMQP::TcpConnection connection(&handler, AMQP::Address("localhost", 5672, AMQP::Login("guest", "guest"), "/"));
    AMQP::TcpChannel channel(&connection); // 創建一條通道
    channel.setQos(1);
    // 監聽login.rpc隊列
    channel.consume("login.rpc").onReceived([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
    {
        cout << "login.rpc" << endl;
        Login login;
        login.ParseFromArray(message.body(), message.bodySize());
        Response resp; // 創建應答對象
        resp.set_status(Response_RespType_OK);
        resp.set_session_id("acd");
        char data[1024] = {0};
        int data_size = resp.ByteSizeLong();
        resp.SerializeToArray(data, data_size);
        AMQP::Envelope env(data, data_size);
        env.setCorrelationID(message.correlationID()); // 獲取應答ID
        channel.publish("", message.replyTo(), env); // 發送給應答隊列
        channel.ack(deliveryTag); // 向MQ發送確認
    }).onSuccess([&](const std::string &consumertag)
    {

    }).onError([](const char *message)
    {
        event_base_loopbreak(base); // 發送錯誤中斷事件循環
        cout << message << endl;
    });
    // 監聽logout.rpc隊列
    channel.consume("logout.rpc")
            .onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool)
    {
        Logout logout;
        logout.ParseFromArray(message.body(), message.bodySize());
        Response resp;
        resp.set_status(Response_RespType_OK);
        char data[1024] = {0};
        int data_size = resp.ByteSizeLong();
        resp.SerializeToArray(data, data_size);
        AMQP::Envelope env(data, data_size);
        env.setCorrelationID(message.correlationID());
        channel.publish("", message.replyTo(), env);
        channel.ack(deliveryTag);
    }).onError([](const char *message)
    {
        event_base_loopbreak(base);
        cout << message << endl;
    });
    event_base_dispatch(base); // 事件循環
    event_base_free(base); // 釋放
    return 0;
}

AMQP-CPP庫直接主動連接,或者你也可以在繼承相應的Handler自己完成網絡連接。此外,Connection 和 Channel的創建也都支持回調函數。如:

channel.onError([&base](const char* message)
{
    std::cout << "Channel error: " << message << std::endl;
    event_base_loopbreak(base);
});
channel.declareQueue("queueName", AMQP::passive)
       .onSuccess([&](const string& name, uint32_t, uint32_t)
{
    cout << "Queue Name:" << name << endl;
});
channel.declareExchange("logs", AMQP::fanout)
       .onSuccess([&]() {})

三、Spring AMQP開發指南

與Spring整合的技巧,官網有很詳細的指導意見。這里只給出與上文C++配合的請求端如何發送以及等待應答的核心代碼:

@GetMapping("login")
public String loginRpc() throws InvalidProtocolBufferException {
    AmqpMsg.Login login = AmqpMsg.Login.newBuilder()
            .addParams(AmqpMsg.PairParams.newBuilder().setKey("username").setValue("admin").build())
            .addParams(AmqpMsg.PairParams.newBuilder().setKey("password").setValue("admin").build())
            .build();
    byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "login.rpc", login.toByteArray());

    AmqpMsg.Response response = AmqpMsg.Response.parseFrom(resp);
    if (response.getStatus() == AmqpMsg.Response.RespType.OK) {
        String sessionID = response.getSessionId();
        System.out.println("登錄成功 SessionID=" + sessionID);
        return "SUCCESS";
    }
    return "ERROR";
}

@GetMapping("logout")
public String logoutRpc() throws InvalidProtocolBufferException {
    AmqpMsg.Logout logout = AmqpMsg.Logout.newBuilder()
            .setSessionId("123456").build();
    byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "logout.rpc", logout.toByteArray());
    AmqpMsg.Response response = AmqpMsg.Response.parseFrom(resp);
    if(response.getStatus() == AmqpMsg.Response.RespType.OK) {
        System.out.println("注銷成功");
        return "SUCCESS";
    }
    return "ERROR";
}
@Configuration
public class RPCRabbitConfig {
    @Bean("simple")
    public Queue simpleQueue() {
        return new Queue("simple");
    }

    @Bean("login.rpc")
    public Queue loginRpcQueue() {
        return new Queue("login.rpc");
    }

    @Bean("logout.rpc")
    public Queue logoutRpcQueue() {
        return new Queue("logout.rpc");
    }

    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange("amq.direct");
    }

    @Bean
    public Binding loginRpcBinding(DirectExchange exchange, @Qualifier("login.rpc") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("login.rpc");
    }

    @Bean
    public Binding logoutRpcBind(DirectExchange exchange, @Qualifier("logout.rpc") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("logout.rpc");
    }
}

 

后記:可能是由於工作上與架構的關系比較密切,目前在博客中提供的大多數解決方案都以跨平台應用為主。如果您對文章中介紹的知識點有任何的疑問也可以與我聯系。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM