MQTT C Client實現消息推送(入門指南)【轉】


MQTT C Client實現消息推送(入門指南)

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,通過MQTT協議,目前已經擴展出了數十個MQTT服務器端程序,可以通過PHP,JAVA,Python,C,C#等系統語言來向MQTT發送相關消息。隨着移動互聯網的發展,MQTT由於開放源代碼,耗電量小等特點,將會在移動消息推送領域會有更多的貢獻,在物聯網領域,傳感器與服務器的通信,信息的收集,MQTT都可以作為考慮的方案之一。在未來MQTT會進入到我們生活的各各方面。The Paho MQTT C Client is a fully fledged MQTT client written in ANSI standard C. It avoids C++ in order to be as portable as possible. A C++ layer over this library is also available in Paho.


目錄:

 

 

何為MQTT?

MQTT主要用於服務端對客戶端進行消息推送,根據這個具體要求,很容易知道它包括兩個部分:客戶端、服務端。

MQTT消息推送是基於主題topic模式的,可以分開來說:

  • 客戶端發布一條消息時,必須指定消息主題。(如,topic=”天氣”,payload=”北京今天霧霾好大啊~~嗚嗚”),其中topic就是主題,payload是發送的具體內容。
  • 服務端推送消息,也是基於主題的。當服務器發現有主題(如,topic=“天氣”)時,就會給所有訂閱該主題的客戶端推送payload內容。 
    • 這里需要個前提,就是有客戶端訂閱topic=”天氣”這個主題;
    • 一旦客戶端訂閱該主題,服務端就會每收到該主題的消息,都會推送給訂閱該主題的客戶端。如果客戶端不需要關注該主題了,也就是說不想接受到這樣的推送消息了,只要取消otpic=”天氣”的主題訂閱即可。

MQTT協議是為大量計算能力有限,且工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:

  1. 使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合;
  2. 對負載內容屏蔽的消息傳輸;
  3. 使用 TCP/IP 提供網絡連接;
  4. 有三種消息發布服務質量: 
    • “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
    • “至少一次”,確保消息到達,但消息重復可能會發生。
    • “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。(在實際編程中,只需要設置QoS值即可實現以上幾種不同消息發布服務質量模式)
  5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量;
  6. 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制;

生成dll庫?混合編程?

在開始開發之前需要做一些准備工作,MQTT已經把所有的APIs封裝好了,我們可以使用它的dll庫,也可以直接導入源碼進行混合編程,一般要求不高的話(因為不太懂得話,最好不要修改源碼)可以直接將源碼生成dll,然后使用即可,下文就是使用該方式:

git clone https://github.com/eclipse/paho.mqtt.c.git

從這里獲得C Client源碼之后,可以直接使用VS打開(我是VS2013):

這里寫圖片描述

對於上圖的說明,下載源碼后,打開將是以上界面,包括十來個工程,這里講解幾個:

  • paho-mqtt3a : 一般實際開發中就是使用這個,a表示的是異步消息推送(asynchronous)。
  • paho-mqtt3as : as表示的是 異步+加密(asynchronous+OpenSSL)。
  • paho-mqtt3c : c 表示的應該是同步(Synchronize),一般性能較差,是發送+等待模式。
  • paho-mqtt3cs : 同上,增加了一個OpenSSL而已。

這里根據自身的需要選擇不同的項目生成DLL即可,右擊單個項目->生成。由於你電腦中可能沒有OPenSSL環境,如果點擊VS工具欄中的生成解決方案,十有八九會失敗,因為它會生成所有項目的解決方案,其實你根本用不着這么多。

另外,上圖中無法打開包括文件VersionInfo.h,你只需要在src文件夾中找到VersionInfo.h.in文件,去掉.in后綴->重新生成即可。

MQTT C Client實戰

了解更多可以閱讀《MQTT C Client for Posix and Windows》一文,下面根據官網資料,摘錄了幾個C語言實現MQTT的小DEMO。

MQTT使用起來也十分容易,基本上就那四五個函數:MQTTClient_create(創建客戶端)、MQTTClient_connect(連接服務端)、MQTTClient_publishMessage(客戶端->服務端發送消息)、MQTTClient_subscribe(客戶端訂閱某個主題)等等。其中,很多異步回調函數,需要自己去實現,如,

MQTTAsync_setCallbacks(mqtt->_client, mqtt->_client, connlost, msgarrvd, NULL);

MQTTAsync_setCallbacks中,

  • connlost函數指針,是當MQTT意外斷開鏈接時會回調的函數,由自己實現;
  • msgarrvd函數指針,是當服務器有消息推送回來時,客戶端在此處接受服務端消息內容。

另外,就是一些函數執行是否成功的回調函數,C語言封裝回調之后,就是這么寫法,看起來有些變扭。有興趣的可以看《淺談C/C++回調函數(Callback)& 函數指針》文章,再了解以下回調函數。

mqtt->_conn_opts.onSuccess = onConnect; mqtt->_conn_opts.onFailure = onConnectFailure;

 

最后,不得不說的就是,MQTT有些發送或者是訂閱的內容時(某些函數中),在編程最好將參數中傳進來的值在內存中拷貝一份再操作,筆者當時開發時,就是因為這樣的問題,折騰了較長時間,后來在wireshark中發現數據包中根本沒有內容,才知道是由於函數參數是指針形式,直接在異步中使用可能會發生一些未知的錯誤。

Synchronous publication example

#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientPub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }

 

 

Asynchronous publication example

#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientPub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } putchar('\n'); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; deliveredtoken = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for publication of %s\n" "on topic %s for client with ClientID: %s\n", PAYLOAD, TOPIC, CLIENTID); while(deliveredtoken != token); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }

 

 

Asynchronous subscription example

#include "stdio.h" #include "stdlib.h" #include "string.h" #include "MQTTClient.h" #define ADDRESS "tcp://localhost:1883" #define CLIENTID "ExampleClientSub" #define TOPIC "MQTT Examples" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } putchar('\n'); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; int ch; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); MQTTClient_subscribe(client, TOPIC, QOS); do { ch = getchar(); } while(ch!='Q' && ch != 'q'); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM