RabbitMQ MQTT協議和AMQP協議
1 序言... 1
1.1 RabbitMq結構... 1
1.2 RabbitMq消息接收... 4
1.3 Exchange種類和消息發送模式... 4
1.4 RabbitMq的協議... 6
1.4.1 AMQP協議... 6
1.4.2 MQTT協議... 8
2 RabbitMq服務器安裝和使用... 9
2.1 Windows下安裝RabbitMQ.. 9
2.2 centos7 Linux安裝RabbitMq. 11
3 MQTT協議C++開發... 12
3.1 庫函數介紹... 12
3.2 MQTT協議實例代碼... 19
4 AMQP協議開發... 25
4.1 AMQP庫函數介紹... 25
4.2 AMQP實例代碼... 26
4.2.1 生產者實例... 27
4.2.2 消費者實例代碼... 29
1 序言
1.1 RabbitMq結構
RabbitMQ是一種異步通信機制,消息的發送者和接收者之間不建立直接的聯系,而是通過RabbitMQ服務器去做中間代理,生產者向服務器發布消息,消費者向服務器去訂閱消息;生產者與服務器建立連接,將消息發給服務器,服務器通過映射關系將消息緩存到指定隊列中,消費者再與RabbitMQ服務器建立連接,隊列中有消息時,服務器會將消息發給消費者;這樣做可以降低發送者和接受者之間的耦合度,一方斷開連接,消息也不會丟失,會在服務器中進行緩存;通過中間服務器代理可以做到負載均衡、集群擴展、 優先級分配等;
圖1.1.1Rabbitmq通訊機制
圖1.1.2 RabbitMQ服務網頁界面
如圖1.1.1所示,發布者和接收者都是作為客戶端和RabbitMq服務器建立一個TCP Connection連接,也可以建立多個TCP connection連接;在一個TCP連接之上又可以創建多個通道Channel與一個Exchange建立連接;發布者就像是淘寶賣家一樣,exchange就像是快遞公司,賣家與多家快遞公司建立合作連接,一個賣家的多個分店和快遞公司建立多個通道,rootingkey就像是快遞地址,queue就像是集散中心,接收者就像是收快遞的買家;相關概念如下所示:
Producer:消息的發布者;相當於淘寶賣家;
Consumer:消息的接收者;相當於淘寶買家;
Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。相當於賣家和快遞公司建立合作協議;
Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。相當於賣家的分店與快遞公司之間的生意往來;建立和關閉TCP連接耗資源,影響性能,而且TCP的連接數也有限制,限制了系統處理高並發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive;
Exchange:交換機,將消息路由到指定的消費者;相當於快遞公司,將接到的快遞集散之后發給各個城市的集散中心;有三種exchange,通過參數來設置;第一個模式是定向模式Direct exchange,只有當routing key 匹配時, 消息才會被傳遞到相應的queue中。第二種模式是廣播模式Fanout exchange, 會向所有綁定的隊列發送消息。第三種模式是模糊匹配模式Topic exchange,routing key由通配符構成,對routing key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。
RootingKey:消息發送給誰的標示符,用來連接Exchange和queue;相當於快遞中的地址,讓快遞公司知道將快遞發給哪個集散中心;
Queue:消息隊列,用於緩存消息的隊,Consumer和Procuder都可以創建queue,隊列的持久化也可以設置;相當於快遞的集散中心,用來暫時存放快遞;消費者從隊列中取消息;相當於買家從集散中心取快遞,多個買家可以從同一個集散中心取快遞;相當於多個客戶端從隊列里取消息;一個客戶端也可以創建多個通道從隊列里取消息,相當於一個家庭的不同成員去取快遞; 程序中就是開啟多個線程,通過通道從隊列中取消息,實現高並發;
Binding:綁定exchange和queue,建立聯系;
1.2 RabbitMq消息接收
同一個客戶端可以開啟多個線程consumer從一個隊列里獲取消息,隊列按照輪詢的方式發給每個consumer,假如隊列里有六條消息1、2、4、5、6,有兩個consumer,consumer1接收到1、3、5,consumer2接收到2、4、6。如果希望每個consumer都獲取完整的6條消息,需要進建立兩個對列;為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。Consumer正確處理數據后給Rabbit發送確認細信息,然后Rabbit再從queue中刪除消息;
1.3 Exchange種類和消息發送模式
(1)default exchange
default exchange是一個沒有名稱的(空字符串)被broker預先申明的direct exchange。它所擁有的一個特殊屬性使它對於簡單的應用程序很有作用:每個創建的queue會與它自動綁定,使用queue名稱作為routing key。舉例說,當你申明一個名稱為“search-indexing-online”的queue時,AMQP broker使用“search-indexing-online”作為routing key將它綁定到default exchange。因此,一條被發布到default exchange並且routing key為"search-indexing-online"將被路由到名稱為"search-indexing-online"的queue。
(2)定向模式direct exchange
direct exchange嚴格根據消息的routing key來傳送消息。direct exchange是單一傳播路由消息的最佳選擇,routing key將queue與exchange進行綁定,消息根據routing key分配到指定的queue中;一個rooting key可以綁定多個queue,多個rooting key也可以綁定到同一個queue上面;
圖1.3.1 定向模式隊列綁定示意圖
(3)廣播模式Fanout exchange
fanout exchange路由消息到所有的與其綁定的queue中,忽略routing key。如果N個queue被綁定到一個fanout exchange,當一條新消息被發布到exchange時,消息會被復制並且傳送到這N個queue。fanout exchange是廣播路由的最佳選擇。
因為一個fanout exchange傳送消息的副本到每一個與其綁定的queue,它的使用情況很相似:
1)大量的多用戶在線(multi-player online MMO)游戲使用它更新排行榜或者其他的全體事件
2)體育新聞網站使用fanout exchange向手機客戶端實時發送比分更新
3)分布式系統可以廣播各種狀態與配置更新
4)群聊可以使用fanout exchange讓消息在參與者之間傳輸
圖1.3.2廣播模式示意圖
(4)模糊匹配模式Topic exchange
Topic exchange路由消息到一個或者多個queue, routing key可以是包含通配符的字符串,用於模糊匹配多個rooting key。Topic exchange經常被用於實現各種發布/訂閱模式的變化。Topic exchanges通常被用於多路廣播路由消息。
topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:
1)routing key與binding key是用一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
2)binding key中可以存在兩種特殊字符“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
圖1.3.3廣播模式示意圖
1.4 RabbitMq的協議
1.4.1 AMQP協議
AMQP是由各種數據幀組成的協議,包括方法幀、內容幀,心跳幀等,每一幀數據所有的幀都由一個頭(header,7個字節),任意大小的負載(payload),和一個檢測錯誤的幀結束(frame-end)字節組成:
圖1.4.1 AMQP協議結構
要讀取一個幀,我們必須:
(1)讀取header,檢查幀類型(frame type)和通道(channel)。
(2) 根據幀類型,我們讀取負載並進行處理。
(3)讀取幀結束字節。
在實際實現中,如果性能很關鍵的話,我們應該使用讀前緩沖(read-ahead buffering)”或“收集讀取(gathering reads)”,以避免為了讀一個幀而做三次獨立的系統調用。
AMQP是一套標准的底層協議,加入了許多其他特征來支持互用性,具有跨語言和跨平台的特性。AMQP協議的主要特性如下:
•獨立於平台的底層消息傳遞協議
•消費者驅動消息傳遞
•跨語言和平台的互用性
•它是底層協議的
•有5種交換類型direct,fanout,topic,headers,system
•面向緩存的
•可實現高性能
•支持長周期消息傳遞
•支持經典的消息隊列,循環,存儲和轉發
•支持事務(跨消息隊列)
•支持分布式事務(XA,X/OPEN,MS DTC)
•使用SASL和TLS確保安全性
•支持代理安全服務器
•元數據可以控制消息流
•不支持LVQ
•客戶端和服務端對等
•可擴展
1.4.2 MQTT協議
傳輸協議實際上是一種消息通訊機制,通訊的雙方進行約定消息內容的格式,一方發送,另一方接收,按照協議格式進行解析,獲取到正確的消息內容。傳輸協議一般包括協議頭,版本號,消息長度,消息內容等。不同的協議頭代表不同的消息類型。MQTT協議結構體如下:
typedef struct
{
/** The eyecatcher for this structure. must be MQTM.識別碼*/
char struct_id[4];
/** The version number of this structure. Must be 0 版本號*/
int struct_version;
/** The length of the MQTT message payload in bytes. 消息長度*/
int payloadlen;
/** A pointer to the payload of the MQTT message. 消息數據*/
void* payload;
int qos;
int retained;
int dup;
int msgid;
} MQTTAsync_message;
MQTT協議是它是專門為小設備設計的。計算性能不高的設備不能適應AMQP上的復雜操作,它們需要一種簡單而且可互用的方式進行通信。這是MQTT的基本要求,而如今,MQTT是物聯網(IOT)生態系統中主要成分之一。
MQTT的主要特性:
•面向流,內存占用低
•為小型無聲設備之間通過低帶寬發送短消息而設計
•不支持長周期存儲和轉發
•不允許分段消息(很難發送長消息)
•支持主題發布-訂閱
•消息實際上是短暫的(短周期)
•簡單用戶名和密碼,基於沒有足夠信息熵的安全
•不支持安全連接
•消息不透明
•Topic是全局的(一個全局的命名空間)
•支持最新值隊列(Last Value Queue (LVQ) )
•客戶端和服務端不對稱
•不能擴展
2 RabbitMq服務器安裝和使用
2.1 Windows下安裝RabbitMQ
(1):下載erlang,原因在於RabbitMQ服務端代碼是使用並發式語言erlang編寫的,下載地址:http://www.erlang.org/downloads,雙擊.exe文件進行安裝就好,安裝完成之后創建一個名為ERLANG_HOME的環境變量,其值指向erlang的安裝目錄,同時將%ERLANG_HOME%\bin加入到Path中,最后打開命令行,輸入erl,如果出現erlang的版本信息就表示erlang語言環境安裝成功;
圖2.1.1 環境變量設置
(2):下載RabbitMQ,下載地址:http://www.rabbitmq.com/,同樣雙擊.exe進行安裝就好(這里需要注意一點,默認的安裝目錄是C:/Program Files/....,這個目錄中是存在空格符的,我們需要改變安裝目錄,貌似RabbitMQ安裝目錄中是不允許有空格的,我之前踩過這個大坑);
(3):安裝RabbitMQ-Plugins,這個相當於是一個管理界面,方便我們在瀏覽器界面查看RabbitMQ各個消息隊列以及exchange的工作情況,安裝方法是:打開命令行cd進入rabbitmq的sbin目錄(我的目錄是:E:\software\rabbitmq\rabbitmq_server-3.6.5\sbin),輸入:rabbitmq-plugins enable rabbitmq_management命令,稍等會會發現出現plugins安裝成功的提示,默認是安裝6個插件,如果你在安裝插件的過程中出現了下面的錯誤:
圖2.1.2錯誤示意圖
解決方法是:首先在命令行輸入:rabbitmq-service stop,接着輸入rabbitmq-service remove,再接着輸入rabbitmq-service install,接着輸入rabbitmq-service start,最后重新輸入rabbitmq-plugins enable rabbitmq_management試試,我是這樣解決的;
(4):插件安裝完之后,本機可以在瀏覽器輸入http://localhost:15672進行驗證,其他電腦需要輸入http://IP地址:15672進行訪問,你會看到下面界面,輸入用戶名:guest,密碼:guest你就可以進入管理界面,當然用戶名密碼你都可以變的;
圖2.1.3 RabbitMq服務器登陸界面
2.2 centos7 Linux安裝RabbitMq
(1).首先需要安裝erlang
#rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
#yum install erlang
安裝過程中會有提示,一路輸入“y”即可。
(2).完成后安裝RabbitMQ:
先下載rpm:
#wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
下載完成后安裝:
#yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
完成后啟動服務:
#service rabbitmq-server start
可以查看服務狀態:
#service rabbitmq-server status
3 MQTT協議C++開發
3.1 庫函數介紹
在http://mqtt.org/網站下載MQTT協議的軟件開發包,paho.mqtt.c-master.rar;下載CMAKE編譯工具並安裝,編譯生成paho-mqtt3a.dll和paho-mqtt3a.lib文件,復制頭文件MQTTAsync.h到工程,進行軟件開發。函數的調用順序如下圖所示。
圖3.1.1 MQTT協議函數調用流程圖
RabbitMQ MQTT的函數調用步驟為:
(1) 創建MQTT客戶端
MQTTAsync_create輸入參數服務器的地址url,標識客戶端的clientid和持久化參數,輸出一個客戶端的句柄;這個句柄作為其他函數的的輸入參數;
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,int persistence_type, void* persistence_context);
/**
功能:創建客戶端句柄的函數
* @param handle out 返回的客戶端句柄
* @param serverURI in 服務器的URL tcp://localhost:1883
* @param clientId in 連接服務器時提供給服務器用來標識客戶端的id,需要是null結尾的UTF-8格式的字符串
* @param persistence_type in 持久化的類型
1)MQTTCLIENT_PERSISTENCE_NONE:無持久化,客戶端運行失敗或者關閉時,消息會丟失,persistence_context參數無效且應該被設置為NULL;
2)MQTTCLIENT_PERSISTENCE_DEFAULT:默認持久化,消息保存在硬盤中,有保護機制防止消息丟失,persistence_context指定保存數據的目錄路徑,如果persistence_context是NULL,采用默認的了路徑
3)MQTTCLIENT_PERSISTENCE_USER:指定的消息持久化實現,將持久化的控制交給程序,程序要實現MQTTClient_persistence 接口,persistence_context指定一個有效的MQTTClient_persistence結構體指針;
* @param persistence_context 持久化參數的上下文指針
* @return ::成功返回MQTTASYNC_SUCCESS,失敗返回錯誤碼;
*/
(2)設置回調函數
MQTTAsync_setCallbacks是設置回調函數的函數,handle為MQTTAsync_create創建的句柄,三個回調函數分別是連接斷開回調函數、接收到消息回調函數,發送完成回調函數;三個回調函數根據需要添加,不需要的直接設置為NULL;
int MQTTAsync_setCallbacks (MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
/**功能:設置回到函數
* @param handle in 客戶端句柄
* @param context in 回調函數返回時,上下文指針,一般傳入this指針
* @param cl in 斷開連接的回調函數指針
* @param ma in 接收到消息的回調函數指針
* @param dc in 發送成功的回調函數指針
* @return 成功返回MQTTASYNC_SUCCESS 失敗返回MQTTASYNC_FAILURE
*/
(3)連接到服務器
MQTTAsync_connect向服務器發起連接請求,返回成功后並不一定立刻連接成功,需要通過MQTTAsync_connectOptions中設置的成功或者失敗的回調函數來顯示是否連接成功;所以要等到回調函數onSuccess返回時,才可以調用其他的函數,例如發送消息,訂閱消息,測試是否連接正常;
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions * options);
/**功能:異步連接到服務器,成功失敗會調用參數中提供的指針
* @param handle in MQTTAsync_create()創建的有效的客戶端句柄
* @param options A pointer to a valid MQTTAsync_connectOptions
* structure.連接參數
* @return ::MQTTASYNC_SUCCESS if the client connect request was accepted.
* If the client was unable to connect to the server, an error code is
* returned via the onFailure callback, if set.返回值,以及錯誤碼
* Error codes greater than 0 are returned by the MQTT protocol:<br><br>
* <b>1</b>: Connection refused: Unacceptable protocol version<br>
* <b>2</b>: Connection refused: Identifier rejected<br>
* <b>3</b>: Connection refused: Server unavailable<br>
* <b>4</b>: Connection refused: Bad user name or password<br>
* <b>5</b>: Connection refused: Not authorized<br>
* <b>6-255</b>: Reserved for future use<br>
*/
MQTTAsync_connectOptions是連接結構體,里面可以設置很多連接參數;可以設置是否使用SSL,內部重連機制,內部重連時間間隔,用戶名,密碼等;具體看下面的結構體介紹;
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3 4 or 5.
* 0 signifies no SSL options and no serverURIs 0 表示沒有SSL選項和服務url
* 1 signifies no serverURIs 1表示沒有服務器UIRL
* 2 signifies no MQTTVersion 2表示沒有MQTT版本
* 3 signifies no automatic reconnect options 3表示沒有自動重連選項
* 4 signifies no binary password option (just string) 4表示沒有二進制密碼選項,只有字符串
*/
int struct_version;
/** 內部心跳間隔,以秒為單位;在心跳間隔內,客戶端至少發送一條
消息給服務器,如果沒有則發送默認的ping消息給服務器,這樣是為了檢測
客戶端和服務器之間的連接,避免時間較長的TCP/IP超時;設置為0時,表示沒有心跳機制
int keepAliveInterval;
/**
這是一個布爾值,當客戶端和服務器連接和斷開連接時起作用;客戶端和服務器都保存着session狀態信息,session值用來確定通訊消息至少發送一次,且僅發送一次;session狀態包括客戶端創建的訂閱,你可以選擇保存或者放棄session之間的狀態信息設置為true時,連接和斷開連接時清除session,為false時,保留session;客戶端連接服務器,客戶端通過客戶端標識和服務器地址識別本次連接服務器檢查這個客戶端連接的session是否存在;如果為ture,則清除之前的session,如果為false,則之前的session會被重新使用;之前沒有的則重新創建
int cleansession;
/**這個值設置最大的在飛行中的消息數量 */
int maxInflight;
/**這個參數設置斷開連接時的臨終遺囑消息,斷開連接時發送消息到LWP topic*/
MQTTAsync_willOptions* will;
/**服務器的用戶名*/
const char* username;
/**密碼*/
const char* password;
/**連接超時時間*/
int connectTimeout;
/***以秒為單位的內部時鍾間隔*/
int retryInterval;
/**指向ssl設置參數的結構體指針,為NULL時表示不使用SSL*/
MQTTAsync_SSLOptions* ssl;
/**連接成功的回調函數
MQTTAsync_onSuccess* onSuccess;
/**連接失敗時的回調函數
MQTTAsync_onFailure* onFailure;
/**上下文信息,一般傳入this指針
void* context;
/**服務器的url數組數量
int serverURIcount;
/**服務器url數組,null結尾的字符串數組,地址格式為protocol://host:port
protocol可以是tcp或者ssl,host可以是ip地址或者域名
char* const* serverURIs;
/**MQTT的版本
* Sets the version of MQTT to be used on the connect.
* MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1 先嘗試.1.1版本,失敗在嘗試.1
* MQTTVERSION_3_1 (3) = only try version 3.1只嘗試.1
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1只嘗試.1.1
*/
int MQTTVersion;
/**自動重新連接
int automaticReconnect;
/**最小的重試間隔,以秒為單位,沒失敗一次,值增加一倍
int minRetryInterval;
/**最大的重連時間間隔,最小重連時間間隔超過這個值時停止
int maxRetryInterval;
/**如果password為NULL時,才會采用這個結構體,二進制格式的密碼
struct {
int len; /**< binary password length */
const void* data; /**< binary password data */
} binarypwd;
} MQTTAsync_connectOptions;
(4)發送消息;
回調函數onSuccess返回連接成功后,就可以向服務器發送消息了;這個函數的功能是發布消息,輸入參數為客戶端句柄、發送的目的topic、發送消息結構體;發送消息也是異步的過程,函數返回成功並不代表消息已經傳遞到消費者,只有回調函數MQTTAsync_deliveryComplete回調函數返回的時候才表示傳遞成功,回調函數中返回參數有個token值,發送消息的結構體MQTTAsync_responseOptions中也有一個token值,回調返回token值,表示這個token值代表的這個消息已經傳遞成功;
DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* msg, MQTTAsync_responseOptions* response);
/**發送信息,
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param destinationName in 發送消的topic
* @param msg in 發送消息MQTTAsync_message結構體
* @param response 設置回調函數的結構體MQTTAsync_responseOptions
* @return :成功返回MQTTASYNC_SUCCESS 失敗返回錯誤碼
*/
(5)訂閱消息
DLLExport int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
/**訂閱消息函數,topic可能包含通配符
* @param handle in MQTTAsync_create().客戶端句柄
* @param topic in topic名稱,可能包含通配符
* @param qos in 服務器處理重復消息的機制0表示可能會丟失,1至少發一次,可能發多次;2嚴格只發一次;
* @param response out 返回結果函數
* @return 成功返回MQTTASYNC_SUCCESS 失敗返回錯誤碼
*/
(6)接收消息
訂閱消息之后就可以接收消息了,通過設置的回調函數接收消息;context就是設置回調函數時設置的上下文指針,傳入的一般是this,可以用這個指針來獲取消息接收的對象;topic名稱則是用於接收多個topic消息時,區分消息的作用;message則是接收到消息的結構體,
int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
(7)判斷是否正常連接
/**測試連接是否正常
* @param handle A valid client handle from a successful call to MQTTAsync_create().
* @return Boolean true if the client is connected, otherwise false.
*/
DLLExport int MQTTAsync_isConnected(MQTTAsync handle);
(8)重新連接函數
/**調用該函數前必須連接成功過,用之前的參數了重新連接,所以一定要等到成功連接一次后才能重連,否則會報錯;
DLLExport int MQTTAsync_reconnect(MQTTAsync handle);
(9)銷毀句柄函數
DLLExport void MQTTAsync_destroy(MQTTAsync* handle);
3.2 MQTT協議實例代碼
(1)初始化參數,設置回調函數,連接服務器
HPR_INT32 Init()
{
HPR_INT32 iRetVal = HPR_ERROR;
m_bStart=HPR_FALSE;
//從配置文件獲取參數
m_strAddress=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strAddress;
m_strTopic=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strTopic;
m_strUserName=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strUserName;
m_strPsw=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strPassword;
m_strClientId=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strClientId;
do
{
//傳入參數,創建客戶端句柄
int rc=MQTTAsync_create(&m_RmqMqttAClient, m_strAddress.c_str(),m_strClientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc!=MQTTASYNC_SUCCESS||m_RmqMqttAClient==NULL)
{
FIRE_ERROR("MQTTAsync_create failed");
break;
}
//設置回調函數
rc=MQTTAsync_setCallbacks(m_RmqMqttAClient, this,OnConnectLost, OnMessageArrived, NULL);
if (MQTTASYNC_SUCCESS!=rc)
{
FIRE_ERROR("MQTTAsync_setCallbacks failed errorcode %d",rc);
//break;
}
//連接到服務器
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 5;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnectSuccess;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = this;
conn_opts.username=m_strUserName.c_str();
conn_opts.password=m_strPsw.c_str();
conn_opts.retryInterval=1;
conn_opts.automaticReconnect=1;
int rc=0;
if ((rc = MQTTAsync_connect(m_RmqMqttAClient, &conn_opts)) != MQTTASYNC_SUCCESS)
{
FIRE_ERROR("Failed to start connect, return code %d\n", rc);
break;
}
}
(2)連接失敗的回調函數
static void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
if (context==NULL||response==NULL)
{
FIRE_ERROR("input para is null!");
return;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return ;
}
FIRE_ERROR("Connect failed, rc %d errror message%s\n", response->code ,response->message==NULL? "no msg":response->message);
return ;
}
(3)連接成功的回調函數
static void onConnectSuccess(void* context, MQTTAsync_successData* response)
{
if (context==NULL||response==NULL)
{
FIRE_ERROR("onConnectSuccess input para is NULL");
return ;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return ;
}
p->SetConnectState(HPR_TRUE);
//訂閱消息函數
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc=0;
opts.onSuccess = NULL;
opts.onFailure = NULL;
opts.context = p->this;
if ((rc = MQTTAsync_subscribe(p->m_RmqMqttAClient, p-> m_strTopic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{
FIRE_ERROR("MQTTAsync_subscribe failed return code %d",rc);
}
FIRE_INFO("Connect rabbitmq MQTT success!");
}
(4)連接斷開的回調函數
static void OnConnectLost(void* context, char* cause)
{
if (context==NULL||cause==NULL)
{
FIRE_ERROR("MQTTAsync_connectionLost input para is NULL");
return ;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return ;
}
p->SetConnectState(HPR_FALSE);
FIRE_INFO("Connect rabbitmq MQTT success!");
}
(5)接收到消息的回調函數
static int OnMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
if (context==NULL||topicName==NULL||message==NULL)
{
FIRE_ERROR("OnMessageArrived input para is NULL");
return false ;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return false ;
}
char* ReciveBuff=g_MemPool.MemAlloc((message->payloadlen)+1);
memmove_s(ReciveBuff,(message->payloadlen)+1,message->payload,message->payloadlen);
p->ProcessMsg(ReciveBuff);
g_MemPool.MemRstore(ReciveBuff);
return true;
}
(6)發送消息的函數
HPR_INT32 CRabbitMqQT::RabbitMqMQTTPublish(char* msg)
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc=0;
opts.onSuccess =NULL ;
opts.context = this;
pubmsg.payload = msg;
pubmsg.payloadlen =strlen(msg);
pubmsg.qos = 1;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(m_RmqMqttAClient, m_strTopic.c_str() ,&pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
FIRE_ERROR("Failed to start sendMessage, return code %d\n", rc);
return HPR_ERROR;
}
else
{
return HPR_OK;
}
}
(7)重新連接的函數,一般定義在定時器中
HPR_INT32 CRabbitMqQT::ReConnect()
{
if (MQTTAsync_isConnected(m_RmqMqttAClient)==false)
{
if (MQTTASYNC_SUCCESS==MQTTAsync_reconnect(m_RmqMqttAClient))
{
m_bConnectSuccess=HPR_TRUE;
FIRE_INFO("MQTTAsync_reconnect sucess!");
return HPR_OK;
}
else
{
m_bConnectSuccess=HPR_FALSE;
FIRE_ERROR("MQTTAsync_reconnect ERROR!");
return HPR_ERROR;
}
}
}
(8)結束連接,銷毀句柄
if(m_RmqMqttAClient!=NULL)
{
MQTTAsync_destroy(&m_RmqMqttAClient);
m_RmqMqttAClient=NULL;
}
4 AMQP協議開發
4.1 AMQP庫函數介紹
下載 rabbitmq-c源碼包 : https://github.com/alanxz/rabbitmq-c需要用到SSL庫文件和頭文件。下載CMAKE編譯工具並安裝,編譯生成rabbitmq.4.dll和rabbitmq.4.lib文件,和四個頭文件amqp.h、amqp_framing.h、amqp_tcp_socket.h、stdint.h一起復制到工程。AMQP協議的庫函數調用流程圖如下圖所示。
圖4.1庫函數調用流程圖
4.2 AMQP實例代碼
4.2.1 生產者實例
(1)建立連接
HPR_INT32 CRabbitMqQP::creat_amqp_publisher()
{
HPR_INT32 iRetVal = HPR_ERROR;
m_mqPublishConn = amqp_new_connection();
m_mqPublishSocket = amqp_tcp_socket_new(m_mqPublishConn);
if(!m_mqPublishSocket)
{
FIRE_ERROR("New amqp socket error");
return iRetVal;
}
int iStatus = amqp_socket_open(m_mqPublishSocket, m_strMqIp.c_str(), m_iMqPort);
if(iStatus)
{
FIRE_ERROR("amqp_socket open failed");
m_bConnectSuccess = HPR_FALSE;
return iRetVal;
}
string strRabbitMqName = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strUserName;
string strRabbitMqPwd = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strPassword;
m_mqReply = amqp_login(m_mqPublishConn,"/",AMQP_DEFAULT_MAX_CHANNELS,AMQP_DEFAULT_FRAME_SIZE,AMQP_DEFAULT_HEARTBEAT,AMQP_SASL_METHOD_PLAIN,strRabbitMqName.c_str(),strRabbitMqPwd.c_str());
if(m_mqReply.reply_type != AMQP_RESPONSE_NORMAL)
{
FIRE_ERROR("amqp login failed");
return iRetVal;
}
amqp_channel_open(m_mqPublishConn,1);
m_mqReply = amqp_get_rpc_reply(m_mqPublishConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("open channel faild");
return iRetVal;
}
string strExchange = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strExchange;
amqp_exchange_declare(m_mqPublishConn, 1, amqp_cstring_bytes(strExchange.c_str()), amqp_cstring_bytes("direct"),
0, 1, 0, 0, amqp_empty_table);
m_mqReply = amqp_get_rpc_reply(m_mqPublishConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("declare exchange faild\n");
return iRetVal;
}
return HPR_OK;
}
(2)發布消息
HPR_VOID CRabbitMqQP::DoPublishAlarm()
{
int iStatus = 0;
string strBindingKey = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strRountKey;
string strExchange = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strExchange;
std::string strData= "hello Rabbit";
while (!m_bExit)
{
if ((m_bConnectSuccess == HPR_TRUE&&m_mqPublishConn !=NULL))
{
char* msg=PopMq();
if (msg!=NULL)
{//發布消息到服務器
iStatus = amqp_basic_publish(m_mqPublishConn,1,amqp_cstring_bytes(strExchange.c_str()),\
amqp_cstring_bytes(strBindingKey.c_str()),0,0,NULL,amqp_cstring_bytes(msg));
if(iStatus<0)
{
FIRE_ERROR("send data %s faild\n",msg);
PushMq(msg);
continue;
}
g_MemPool.MemRstore(msg);
FIRE_INFO("send data sucess %s\n",msg);
}
}
else
{
FIRE_ERROR("RabbitMq is not connected !");
Sleep(1000);
}
}
}
(3)結束清理資源
HPR_VOID CRabbitMqQP::destory_amqp_publisher()
{
amqp_channel_close(m_mqPublishConn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(m_mqPublishConn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(m_mqPublishConn);
}
4.2.2 消費者實例代碼
(1) 建立連接
HPR_INT32 CRabbitMqQPRecv::creat_amqp_consumer()
{
HPR_INT32 iRetVal = HPR_ERROR;
//創建新的連接句柄
m_mqConsumConn = amqp_new_connection();
if (m_mqConsumConn==NULL)
{
FIRE_ERROR("New mq connecttion error");
return iRetVal;
}
//創建tcp socket
m_mqSocket = amqp_tcp_socket_new(m_mqConsumConn);
if(!m_mqSocket)
{
FIRE_ERROR("New amqp socket error");
return iRetVal;
}
//建立socket連接
int iStatus = amqp_socket_open(m_mqSocket, m_strMqIp.c_str(), m_iMqPort);
if(iStatus)
{
FIRE_ERROR("amqp_socket open failed");
m_bConnectSuccess = HPR_FALSE;
return iRetVal;
}
string strRabbitMqName = CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strUserName;
string strRabbitMqPwd = CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strPassword;
//登陸服務器
m_mqReply = amqp_login(m_mqConsumConn,"/",AMQP_DEFAULT_MAX_CHANNELS,AMQP_DEFAULT_FRAME_SIZE,AMQP_DEFAULT_HEARTBEAT,AMQP_SASL_METHOD_PLAIN,strRabbitMqName.c_str(),strRabbitMqPwd.c_str());
if(m_mqReply.reply_type != AMQP_RESPONSE_NORMAL)
{
FIRE_ERROR("amqp login failed");
return iRetVal;
}
//在連接上創建一個通道
amqp_channel_open(m_mqConsumConn, 1);
//異步查看是否調用成功
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("ConnectRabbitmq::amqp_channel_open error\n");
return iRetVal;
}
string strExchange = CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strExchange;
string strBindingKey =CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strRountKey;
//創建隊列queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(m_mqConsumConn, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
FIRE_INFO("The test name is %s",r->queue.bytes);
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("ConnectRabbitmq::amqp_queue_declare error\n");
return iRetVal;
}
m_mqQueuename = amqp_bytes_malloc_dup(r->queue);
if (m_mqQueuename.bytes == NULL) {
FIRE_ERROR("Out of memory while copying queue name\n");
return iRetVal;
}
//綁定隊列到exchange
amqp_queue_bind(m_mqConsumConn,
1,
amqp_empty_bytes,
amqp_cstring_bytes(strExchange.c_str()),
amqp_cstring_bytes(strBindingKey.c_str()),
amqp_empty_table);
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("ConnectRabbitmq::amqp_queue_bind error\n");
return iRetVal;
}
//消費消息
amqp_basic_consume(m_mqConsumConn, 1, m_mqQueuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("amqp_basic_consume \n");
return iRetVal;
}
return HPR_OK;
}
(2) 接收消息
HPR_VOID CRabbitMqQPRecv::DoRevAlarm()
{
char* ReciveBuff=NULL;
while (!m_bExit)
{
if(HPR_FALSE == m_bConnectSuccess)
{
Sleep(1000);
continue;
}
if(m_mqConsumConn !=NULL)
{
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(m_mqConsumConn);
ret = amqp_consume_message(m_mqConsumConn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
amqp_destroy_envelope(&envelope);
FIRE_ERROR("amqp_consume_message fail");
Sleep(1000);
continue;
}
ReciveBuff=g_MemPool.MemAlloc(MSG_LEN);
if (ReciveBuff==NULL)
{
FIRE_ERROR("g_MemPool MemAlloc fail");
return;
}
HPR_ZeroMemory(ReciveBuff, MSG_LEN);
memmove_s(ReciveBuff,MSG_LEN,envelope.message.body.bytes,envelope.message.body.len);
m_ReciveCallback(ReciveBuff,m_pDialog);
FIRE_INFO("Receieve the alarm msg is%s",ReciveBuff);
g_MemPool.MemRstore(ReciveBuff);
ReciveBuff=NULL;
amqp_destroy_envelope(&envelope);
}
//HPR_Sleep(1);
}
}
(3) 結束清除資源
HPR_VOID CRabbitMqQPRecv::destory_amqp_consumer()
{
if (m_mqConsumConn!=NULL)
{
amqp_maybe_release_buffers(m_mqConsumConn);
amqp_channel_close(m_mqConsumConn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(m_mqConsumConn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(m_mqConsumConn);
}
}
自己開發了一個股票智能分析軟件,功能很強大,需要的點擊下面的鏈接獲取:
https://www.cnblogs.com/bclshuai/p/11380657.html
百度雲盤下載地址:
鏈接:https://pan.baidu.com/s/1swkQzCIKI3g3ObcebgpIDg
提取碼:mc8l
微信公眾號獲取最新的軟件和視頻介紹
QStockView