目前面對大多數的需要在異構系統間進行消息傳遞技術路線,大多會選擇socket或webservice。這兩種技術的共同特點是耦合緊,調試依賴雙方同步,但是效率高。除此以外,使用消息隊列(MQ)的應用場景也偶爾能遇到。本文就將要從AMQP協議說起,重點介紹利用RabbitMQ實現C++和Java跨系統開發的實踐。
一、AMQP是什么
AMQP又稱為高級消息隊列協議,是一種進程間進行異步消息的網絡協議。它的出現是為了讓各類消息中間件提供統一服務,以降低系統集成的開銷。目前,完全准尋AMQP協議的消息中間件只有RabbitMQ。雖然各大中間件產品也都針對不同的語言推出了客戶端。但是,無論是從業務適應性還是集成通用性上來說,比較推薦的還是RabbitMQ。不同的消息中間件在性能上的差異網上資料很多,這里不再贅述。
amqp協議和http協議一樣都是建立在TCP/IP協議簇之上的應用層協議。不同於http協議的,它是一個二進制協議,具有多信道,異步,高效等特點。amqp協議規定了從消息發布者到消息接收者之間的消息傳遞方式,並且提出了交換機(Exchange)隊列(Queue)以及他們之間的路由(Routing)。
作為一套標准協議,使用者甚至可以完全根據amqp的協議規范定制化的開發出客戶端和RabbitMQ通信,這一特點也讓RabbitMQ在業務通用性上具備了得天獨厚的優勢。標准的amqp協議格式如下:
amqp://<username>:<password>@<host>:<port>/<virtual>
username: 用戶名
password: 登錄密碼
host: 服務所在主機地址
port: 服務端口號
virtual: 虛擬路徑
AMQP協議最值得學習的地方在於,它定義了消息的發送和投遞過程:
交換機(Exchange)負責接收消息,並根據提前指定的規則(Routing)投送消息到特定隊列(Queue)。消費者監聽隊列,並處理消息。如果多個消費者監聽同一個隊列,消息一般會輪流的發送給它們。以實現負載均衡。此外,通過虛擬路徑約束還允許在不同的虛擬路徑下建立同命隊列。
AMQP協議默認提供了四種類型的交換機:
直接交換機(Direct Exchange):根據路由鍵的不同將消息直接發送到不同隊列,未匹配路由鍵的消息會被丟棄。
扇形交換機(Funout Exchange):扇形交換機是實現廣播的基礎,它能夠同時將消息推送給多個隊列。
主題交換機(Topic Exchange):交換機會根據路由鍵進行模糊匹配,從而完成消息投送。
頭交換機(Header Exchange):它不依賴特定路由鍵,而是將投送目標寫在消息頭,支持字典類型,配置更加靈活。
二、C++開發指南
官網提供了其它常見語言的開發向導,對於C++個人推薦使用AMQP-CPP這套庫。另外還需要一套網絡庫支持,個人也推薦libevent。編譯方法可以參考github上的說明。發送方式區別於傳統的socket,你不應該將一條消息分多個部分發送。因此推薦使用對象序列化模型直接轉換為字節數組,同樣受到tcp/ip傳輸的制約,你應該選擇高效的序列化工具來進行。個人推薦使用protobuf,同樣作為一種跨平台的支持。
下面以一套RPC調用為例進行說明:
#include <iostream> #include "event2/event.h" #include "amqpcpp.h" #include "amqpcpp/libevent.h" #include "amqp_msg.pb.h" #include <string> using namespace std; using namespace amqp; int main() { event_base *base = event_base_new(); // 通過libevent啟動實踐循環 AMQP::LibEventHandler handler(base); AMQP::TcpConnection connection(&handler, AMQP::Address("localhost", 5672, AMQP::Login("guest", "guest"), "/")); AMQP::TcpChannel channel(&connection); // 創建一條通道 channel.setQos(1); // 監聽login.rpc隊列 channel.consume("login.rpc").onReceived([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { cout << "login.rpc" << endl; Login login; login.ParseFromArray(message.body(), message.bodySize()); Response resp; // 創建應答對象 resp.set_status(Response_RespType_OK); resp.set_session_id("acd"); char data[1024] = {0}; int data_size = resp.ByteSizeLong(); resp.SerializeToArray(data, data_size); AMQP::Envelope env(data, data_size); env.setCorrelationID(message.correlationID()); // 獲取應答ID channel.publish("", message.replyTo(), env); // 發送給應答隊列 channel.ack(deliveryTag); // 向MQ發送確認 }).onSuccess([&](const std::string &consumertag) { }).onError([](const char *message) { event_base_loopbreak(base); // 發送錯誤中斷事件循環 cout << message << endl; }); // 監聽logout.rpc隊列 channel.consume("logout.rpc") .onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool) { Logout logout; logout.ParseFromArray(message.body(), message.bodySize()); Response resp; resp.set_status(Response_RespType_OK); char data[1024] = {0}; int data_size = resp.ByteSizeLong(); resp.SerializeToArray(data, data_size); AMQP::Envelope env(data, data_size); env.setCorrelationID(message.correlationID()); channel.publish("", message.replyTo(), env); channel.ack(deliveryTag); }).onError([](const char *message) { event_base_loopbreak(base); cout << message << endl; }); event_base_dispatch(base); // 事件循環 event_base_free(base); // 釋放 return 0; }
AMQP-CPP庫直接主動連接,或者你也可以在繼承相應的Handler自己完成網絡連接。此外,Connection 和 Channel的創建也都支持回調函數。如:
channel.onError([&base](const char* message) { std::cout << "Channel error: " << message << std::endl; event_base_loopbreak(base); });
channel.declareQueue("queueName", AMQP::passive) .onSuccess([&](const string& name, uint32_t, uint32_t) { cout << "Queue Name:" << name << endl; });
channel.declareExchange("logs", AMQP::fanout) .onSuccess([&]() {})
三、Spring AMQP開發指南
與Spring整合的技巧,官網有很詳細的指導意見。這里只給出與上文C++配合的請求端如何發送以及等待應答的核心代碼:
@GetMapping("login") public String loginRpc() throws InvalidProtocolBufferException { AmqpMsg.Login login = AmqpMsg.Login.newBuilder() .addParams(AmqpMsg.PairParams.newBuilder().setKey("username").setValue("admin").build()) .addParams(AmqpMsg.PairParams.newBuilder().setKey("password").setValue("admin").build()) .build(); byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "login.rpc", login.toByteArray()); AmqpMsg.Response response = AmqpMsg.Response.parseFrom(resp); if (response.getStatus() == AmqpMsg.Response.RespType.OK) { String sessionID = response.getSessionId(); System.out.println("登錄成功 SessionID=" + sessionID); return "SUCCESS"; } return "ERROR"; } @GetMapping("logout") public String logoutRpc() throws InvalidProtocolBufferException { AmqpMsg.Logout logout = AmqpMsg.Logout.newBuilder() .setSessionId("123456").build(); byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "logout.rpc", logout.toByteArray()); AmqpMsg.Response response = AmqpMsg.Response.parseFrom(resp); if(response.getStatus() == AmqpMsg.Response.RespType.OK) { System.out.println("注銷成功"); return "SUCCESS"; } return "ERROR"; }
@Configuration public class RPCRabbitConfig { @Bean("simple") public Queue simpleQueue() { return new Queue("simple"); } @Bean("login.rpc") public Queue loginRpcQueue() { return new Queue("login.rpc"); } @Bean("logout.rpc") public Queue logoutRpcQueue() { return new Queue("logout.rpc"); } @Bean public DirectExchange defaultExchange() { return new DirectExchange("amq.direct"); } @Bean public Binding loginRpcBinding(DirectExchange exchange, @Qualifier("login.rpc") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("login.rpc"); } @Bean public Binding logoutRpcBind(DirectExchange exchange, @Qualifier("logout.rpc") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("logout.rpc"); } }
后記:可能是由於工作上與架構的關系比較密切,目前在博客中提供的大多數解決方案都以跨平台應用為主。如果您對文章中介紹的知識點有任何的疑問也可以與我聯系。