MQTT--Paho C Client 的實現和詳解


概述
  在文章Paho - MQTT C Cient的實現中,我介紹了如何使用Paho開源項目創建MQTTClient_pulish客戶端。但只是簡單的介紹了使用方法,而且客戶端的結果與之前介紹的並不吻合,今天我就結合新的例子,給大家講解一下Paho使用MQTT客戶端的主要過程。
  如同前面介紹的,MQTT客戶端分為同步客戶端和異步客戶端。今天主要講解的是同步客戶端,結構還是如同步客戶端中介紹的:

  1.創建一個客戶端對象;
  2.設置連接MQTT服務器的選項;
  3.如果多線程(異步模式)操作被使用則設置回調函數(詳見 Asynchronous >vs synchronous client applications);
  4.訂閱客戶端需要接收的任意話題;
  5.重復以下操作直到結束:
    a.發布客戶端需要的任意信息;
    b.處理所有接收到的信息;
  6.斷開客戶端連接;
  7.釋放客戶端使用的所有內存。

實現
  好,直接上代碼,MQTT簡單的同步客戶端。

  1 #include <pthread.h>
  2 #include <stdio.h>
  3 #include <stdlib.h>
  4 #include <string.h>
  5 #include "MQTTClient.h"
  6 #if !defined(WIN32)
  7 #include <unistd.h>
  8 #else
  9 #include <windows.h>
 10 #endif
 11 
 12 #define NUM_THREADS 2
 13 #define ADDRESS "tcp://localhost:1883" //更改此處地址
 14 #define CLIENTID "aaabbbccc_pub" //更改此處客戶端ID
 15 #define SUB_CLIENTID "aaabbbccc_sub" //更改此處客戶端ID
 16 #define TOPIC "topic01" //更改發送的話題
 17 #define PAYLOAD "Hello Man, Can you see me ?!" //
 18 #define QOS 1
 19 #define TIMEOUT 10000L
 20 #define USERNAME "test_user"
 21 #define PASSWORD "jim777"
 22 #define DISCONNECT "out"
 23 
 24 int CONNECT = 1;
 25 volatile MQTTClient_deliveryToken deliveredtoken;
 26 
 27 void delivered(void *context, MQTTClient_deliveryToken dt)
 28 {
 29 printf("Message with token value %d delivery confirmed\n", dt);
 30 deliveredtoken = dt;
 31 }
 32 
 33 int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
 34 {
 35 int i;
 36 char* payloadptr;
 37 
 38 printf("Message arrived\n");
 39 printf(" topic: %s\n", topicName);
 40 printf(" message: ");
 41 
 42 payloadptr = message->payload;
 43 if(strcmp(payloadptr, DISCONNECT) == 0){
 44 printf(" \n out!!");
 45 CONNECT = 0;
 46 }
 47 
 48 for(i=0; i<message->payloadlen; i++)
 49 {
 50 putchar(*payloadptr++);
 51 }
 52 printf("\n");
 53 
 54 MQTTClient_freeMessage(&message);
 55 MQTTClient_free(topicName);
 56 return 1;
 57 }
 58 
 59 void connlost(void *context, char *cause)
 60 {
 61 printf("\nConnection lost\n");
 62 printf(" cause: %s\n", cause);
 63 }
 64 
 65 void *subClient(void *threadid){
 66 long tid;
 67 tid = (long)threadid;
 68 printf("Hello World! It's me, thread #%ld!\n", tid);
 69 
 70 MQTTClient client;
 71 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
 72 int rc;
 73 int ch;
 74 
 75 MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
 76 MQTTCLIENT_PERSISTENCE_NONE, NULL);
 77 conn_opts.keepAliveInterval = 20;
 78 conn_opts.cleansession = 1;
 79 conn_opts.username = USERNAME;
 80 conn_opts.password = PASSWORD;
 81 
 82 MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
 83 
 84 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
 85 {
 86 printf("Failed to connect, return code %d\n", rc);
 87 exit(EXIT_FAILURE);
 88 }
 89 printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
 90 "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
 91 MQTTClient_subscribe(client, TOPIC, QOS);
 92 
 93 do 
 94 {
 95 ch = getchar();
 96 } while(ch!='Q' && ch != 'q');
 97 
 98 MQTTClient_unsubscribe(client, TOPIC);
 99 MQTTClient_disconnect(client, 10000);
100 MQTTClient_destroy(&client);
101 
102 pthread_exit(NULL);
103 }
104 void *pubClient(void *threadid){
105 long tid;
106 tid = (long)threadid;
107 int count = 0;
108 printf("Hello World! It's me, thread #%ld!\n", tid);
109 //聲明一個MQTTClient
110 MQTTClient client;
111 //初始化MQTT Client選項
112 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
113 //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
114 MQTTClient_message pubmsg = MQTTClient_message_initializer;
115 //聲明消息token
116 MQTTClient_deliveryToken token;
117 int rc;
118 //使用參數創建一個client,並將其賦值給之前聲明的client
119 MQTTClient_create(&client, ADDRESS, CLIENTID,
120 MQTTCLIENT_PERSISTENCE_NONE, NULL);
121 conn_opts.keepAliveInterval = 20;
122 conn_opts.cleansession = 1;
123 conn_opts.username = USERNAME;
124 conn_opts.password = PASSWORD;
125 //使用MQTTClient_connect將client連接到服務器,使用指定的連接選項。成功則返回MQTTCLIENT_SUCCESS
126 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
127 {
128 printf("Failed to connect, return code %d\n", rc);
129 exit(EXIT_FAILURE);
130 }
131 pubmsg.payload = PAYLOAD;
132 pubmsg.payloadlen = strlen(PAYLOAD);
133 pubmsg.qos = QOS;
134 pubmsg.retained = 0;
135 while(CONNECT){
136 MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
137 printf("Waiting for up to %d seconds for publication of %s\n"
138 "on topic %s for client with ClientID: %s\n",
139 (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
140 rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
141 printf("Message with delivery token %d delivered\n", token);
142 usleep(3000000L);
143 }
144 
145 
146 MQTTClient_disconnect(client, 10000);
147 MQTTClient_destroy(&client);
148 }
149 int main(int argc, char* argv[])
150 {
151 pthread_t threads[NUM_THREADS];
152 long t;
153 pthread_create(&threads[0], NULL, subClient, (void *)0);
154 pthread_create(&threads[1], NULL, pubClient, (void *)1);
155 pthread_exit(NULL);
156 }

 

  在代碼中,我創建了兩個線程,分別用來處理訂閱客戶端和發布客戶端。

整體詳解
接下來我講解一下這個簡單的客戶端,其中,大體的流程如下:

  大體的流程如圖所示,在客戶端啟動之后,會啟動線程,創建一個訂閱客戶端,它會監聽消息的到達,在消息到達之后會觸發相應的回調函數以對消息進行處理;后在啟動一個線程,創建一個發送客戶端,用來發送消息的,每次發送消息之前會判斷是否要掉線,如CONNECT=0則會掉線,否則發送消息給topic01。

訂閱客戶端詳解
  以下函數完成的是訂閱的功能。

void *subClient(void *threadid)


過程大概如下:
  第一步:聲明客戶端,並通過函數給其賦值;

MQTTClient client;
MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);


  第二步:設置連接MQTT服務器的選項;

1 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

 

  第三步:設置回調函數;

1 MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
2 //相應的回調函數connlost,msgarrvd,delivered我的代碼中都有

 

  第四步:使用客戶端和連接選項連接服務器;

1 MQTTClient_connect(client, &conn_opts))

 

  第五步訂閱話題;

1 MQTTClient_subscribe(client, TOPIC, QOS);

  第六步一直等待,直到輸入’Q’ 或’q’;

1 do 
2 {
3 ch = getchar();
4 } while(ch!='Q' && ch != 'q');

 

  第七步取消訂閱;

1 MQTTClient_unsubscribe(client, TOPIC);


  第八步.斷開客戶端連接;

1 MQTTClient_disconnect(client, 10000);

 

  第九步.釋放客戶端使用的所有內存;

MQTTClient_destroy(&client);

  至此,訂閱客戶端就結束了。一般訂閱客戶端的大體結構都是這樣。不同的是回調函數的個性化上。

 

發送客戶端詳解
  以下函數完成的是發送的功能。

void *pubClient(void *threadid)

過程大概如下:
  第一步:聲明客戶端,並通過函數給其賦值;

1 MQTTClient client;
2 MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

  第二步:設置連接MQTT服務器的選項;

MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

 

  第三步:使用客戶端和連接選項連接服務器;

MQTTClient_connect(client, &conn_opts)


  第四步設置發送消息的屬性;

1 pubmsg.payload = PAYLOAD;
2 pubmsg.payloadlen = strlen(PAYLOAD);
3 pubmsg.qos = QOS;
4 pubmsg.retained = 0;

 

  第五步循環發送消息;

MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);

 

  第六步一直等待,當CONNECT=0時退出該客戶端;

  第七步.斷開客戶端連接;

MQTTClient_disconnect(client, 10000);

 

  第八步.釋放客戶端使用的所有內存;

MQTTClient_destroy(&client);

 

  至此,發送客戶端就結束了。一般的發送客戶端大體結構也如此,但異步客戶端可能有些許不同,無非就是設計回調函數,然后在連接,斷開連接等時可以使用回調函數做一些操作而已,具體的可以自己研究。

  為了讓大家能夠更深入了解,我把自己學到的一些函數和結構體大致在下面講解了一下。

相關結構體

MQTTClient
定義:typedef void* MQTTClient; 含義:代表MQTT客戶端的句柄。成功調用MQTTClient_create()后,可以得到有效的客戶端句柄。

 

MQTTClient_connectOptions
定義:
typedef struct
{
char struct_id[4];//結構體的識別序列,必須為MQTC
int struct_version;//結構體版本
/**
在0,1,2,3,4,5中取值:
0-表示沒有SSL選項且沒有serverURIs;
1-表示沒有serverURIs;
2-表示沒有MQTTVersion
3-表示沒有返回值;
4-表示沒有二進制密碼選項
*/
int keepAliveInterval;
/**
在這段時間內沒有數據相關的消息時,客戶端發送一個非常小的MQTT“ping”消息,服務器將會確認這個消息
*/
int cleansession;
/**
當cleansession為true時,會話狀態信息在連接和斷開連接時被丟棄。 將cleansession設置為false將保留會話狀態信息
*/
int reliable;
/*
將該值設置為true意味着必須完成發布的消息(已收到確認),才能發送另一個消息
*/
MQTTClient_willOptions* will;
/*
如果程序不使用最后的意願和遺囑功能,請將此指針設置為NULL。
*/
const char* username;//用戶名
const char* password;//密碼
int connectTimeout;//允許嘗試連接的過時時間
int retryInterval;//嘗試重連的時間
MQTTClient_SSLOptions* ssl;
/*
如果程序不使用最后的ssl,請將此指針設置為NULL。
*/
int serverURIcount;

char* const* serverURIs;
/*
連接服務器的url,以protocol:// host:port為格式
*/
int MQTTVersion;
/*
MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4) 
*/
struct
{
const char* serverURI; 
int MQTTVersion; 
int sessionPresent; 
} returned;
struct {
int len; 
const void* data; 
} binarypwd;
} MQTTClient_connectOptions;
含義:用來設置MQTTClient的連接選項的結構體。

 

MQTTClient_message
定義:
typedef struct { char struct_id[4];//結構體的識別序列,必須為MQTM
int struct_version;//結構體的版本,必須為0
int payloadlen;//MQTT信息的長度
void* payload;//指向消息負載的指針
int qos;//服務質量
int retained;//保留標志
int dup;dup//標志指示這個消息是否是重復的。 只有在收到QoS1消息時才有意義。 如果為true,則客戶端應用程序應采取適當的措施來處理重復的消息。
int msgid;//消息標識符通常保留供MQTT客戶端和服務器內部使用。
} MQTTClient_message;
含義:代表MQTT信息的結構體。

 



相關函數詳解

MQTTClient_create
定義:

DLLExport int MQTTClient_create( 
MQTTClient * handle,
const char * serverURI,
const char * clientId,
int persistence_type,
void * persistence_context 
) 
作用:該函數創建了一個用於連接到特定服務器,使用特定持久存儲的MQTT客戶端。

參數	含義
handle	指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充
serverURI	以空結尾的字符串,其指定客戶端將連接到的服務器。其格式為protocol://host:port。現在的(protocol)協議必須是tcp或ssl,而host可以指定為IP地址或域名。例如, 要使用默認 MQTT 端口連接到本地計算機上運行的服務器, 請指定為 tcp://localhost:1883。
clientId	客戶端標識符(clientId)是一個以空結尾的 UTF-8 編碼字符串,客戶端連接到服務器時將它傳遞過去。
persistence_type	客戶端所使用的持久類型。MQTTCLIENT_PERSISTENCE_NONE-使用內存持久化。如果客戶端運行的設備或系統出故障或關閉, 則任何正在運行的消息的當前狀態都將丟失, 甚至在 QoS1 和 QoS2 中也可能無法傳遞某些消息; MQTTCLIENT_PERSISTENCE_DEFAULT-使用默認的持久化機制(文件系統)。正在運行消息的狀態被保存在持久存儲中,以便在意外出現時對消息的丟失提供一些保護; MQTTCLIENT_PERSISTENCE_USER-使用程序指定的持久化實現。使用這種類型,應用程序可對持久化機制進行控制,應用程序必須實現MQTTClient_persistence 接口。
persistence_context	如果應用程序使用的是MQTTCLIENT_PERSISTENCE_NONE持久化,該參數不使用,而且值應該設置為NULL。對於MQTTCLIENT_PERSISTENCE_DEFAULT持久化,應該設置持久化目錄的位置(如果設置為NULL,則使用工作目錄作為持久化目錄)。使用MQTTCLIENT_PERSISTENCE_USER持久化,則將此參數指向有效的MQTTClient_persistence結構。
MQTTClient_setCallbacks

  

MQTTClient_setCallbacks 定義:

DLLExport int MQTTClient_setCallbacks ( MQTTClient handle, void * context, MQTTClient_connectionLost * cl, MQTTClient_messageArrived * ma, MQTTClient_deliveryComplete * dc ) 
作用:該函數為特定的客戶端創建回調函數。如果您的客戶端應用程序不使用特定的回調函數,請將相關參數設置為NULL。 調用MQTTClient_setCallbacks()使客戶端進入多線程模式。 任何必要的消息確認和狀態通信都在后台處理,而不需要客戶端應用程序的任何干預。

注意:在調用該函數時,MQTT客戶端必須斷開連接。(即先要調用該函數在連接客戶端)。

參數    含義
handle    指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充
context    指向任何應用程序特定上下文的指針。 上下文指針被傳遞給每個回調函數,以提供對回調中的上下文信息的訪問。
cl    指向MQTTClient_connectionLost()回調函數的指針。 如果您的應用程序不處理斷開連接,您可以將其設置為NULL。
ma    指向MQTTClient_messageArrived()回調函數的指針。 當您調用MQTTClient_setCallbacks()時,必須指定此回調函數。
dc    指向MQTTClient_deliveryComplete()回調函數的指針。 如果您的應用程序同步發布,或者您不想檢查是否成功發送,則可以將其設置為NULL。

 

 

MQTTClient_connect 定義: DLLExport int MQTTClient_connect ( MQTTClient handle, MQTTClient_connectOptions * options ) 
作用:此函數嘗試使用指定的選項將先前創建的客戶端連接到MQTT服務器。 參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 options 指向有效的MQTTClient_connectOptions結構的指針。 返回值 含義 0 連接成功 1 拒絕連接:不可接受的協議版本。 2 拒絕連接:標識符被拒絕。 3 拒絕連接:服務器不可用。 4 拒絕連接:用戶名或密碼錯誤。 5 拒絕連接:未經授權。 6    保留給未來用。

 

MQTTClient_subscribe 定義: DLLExport int MQTTClient_subscribe ( MQTTClient handle, const char * topic, int qos ) 
作用:此功能嘗試將客戶訂閱到單個主題,該主題可能包含通配符。 此函數還指定服務質量。

參數    含義
handle    指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充
topic    訂閱的主題,可使用通配符。
qos    訂閱的請求服務質量

 

 

 

DLLExport int MQTTClient_publishMessage ( MQTTClient handle, const char * topicName, MQTTClient_message * msg, MQTTClient_deliveryToken * dt ) 
作用:此功能嘗試將客戶訂閱到單個主題,該主題可能包含通配符。 此函數還指定服務質量。

參數    含義
handle    指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充
topicName    與信息相關的主題。
msg    指向有效的 MQTTClient_message 結構的指針, 其中包含要發布消息的有效負載和屬性
dt    指向MQTTClient_deliveryToken的指針。當函數成功返回時,dt會被賦值為代表消息的token。如果程序中沒有使用傳遞token,將其設置為NULL。

 

MQTTClient_waitForCompletion 定義: DLLExport int MQTTClient_waitForCompletion ( MQTTClient handle, MQTTClient_deliveryToken dt, unsigned long timeout ) 
作用:客戶端應用程序調用此函數來將主線程的執行與消息的完成發布同步。 被調用時,MQTTClient_waitForCompletion()阻塞執行,直到消息成功傳遞或已超過指定的時間。 參數 含義 handle 指向MQTT客戶端句柄的指針。句柄被成功從函數中返回的客戶端引用所填充 dt 代表消息的MQTTClient_deliveryToken用來檢測是否成功傳遞。傳遞token由發布函數MQTTClient_publish () 和 MQTTClient_publishMessage ()所產生。 timeout 等待的最大毫秒數。 返回值: 消息成功傳遞則返回MQTTCLIENT_SUCCESS(0) ,如果時間已過期或檢測token時出問題,則返回錯誤碼。

 

 

 

 

---------------------
參考鏈接:https://blog.csdn.net/weixin_37139197/article/details/78966249


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM