在鴻蒙系統上使用MQTT編程


我們使用的是paho mqtt軟件包,這里介紹一下怎么使用mqtt協議編程。關於鴻蒙系統的mqtt移植好的軟件包,相關github鏈接如下:

https://gitee.com/qidiyun/harmony_mqtt

這里提供一個簡單的編程示例:

這里我們使用MQTTClient編程模型,他支持多任務多線程,非常適合用在鴻蒙系統上。

1、網絡初始化

這里定義一個 Network 結構體,然后指定我們的MQTT服務器的IP和端口號。

Network n;
   //初始化結構體
 NetworkInit(&n);
 //連接到指定的MQTT服務器IP、端口號
 NetworkConnect(&n, “XXX.XXX.XXX.XXX”, XXXX);

2、設置MQTT緩存和啟動MQTT線程

我們這里使用的是MQTT線程功能。

 MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
 MQTTStartTask(&c);

3、設置MQTT相關參數

接下來我們設置MQTT的相關參數,包括版本號、客戶端ID、賬戶密碼等

MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 
 data.willFlag = 0;
 //MQTT版本為 v3
 data.MQTTVersion = 3;
 //設置客戶端ID
 data.clientID.cstring = opts.clientid;
 //設置客戶端賬戶
 data.username.cstring = opts.username;
 //設置客戶端密碼
 data.password.cstring = opts.password;
 data.keepAliveInterval = 10;
 data.cleansession = 1;

 //連接到MQTT服務器
 rc = MQTTConnect(&c, &data);

4、訂閱主題和接收消息

訂閱主題可以使用如下函數

MQTTSubscribe(&c, topic, opts.qos, messageArrived);

它的函數原型如下:

DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);

其中:

MQTTClient* c :我們前面定義的MQTTClient結構體

const char* topicFilter:訂閱的主題

messageHandler messageHandler :接收到主題信息后的回調處理函數。

例如上面我們的回調函數是 messageArrived ,它的原型如下:

void messageArrived(MessageData* md)
{
 MQTTMessage* message = md->message;
 //打印接收到的消息的長度、和消息內容
 printf("%.*s", (int)message->payloadlen, (char*)message->payload);
}

5、發送消息

發送消息也比較簡單,我們只需要設置好我們的主題和消息內容即可

memset(&pubmsg, '\0', sizeof(pubmsg));
 //消息內容為 hello harmonyOS !
   pubmsg.payload = (void*)"hello harmonyOS !";
 //消息長度
   pubmsg.payloadlen = strlen((char*)pubmsg.payload);
   pubmsg.qos = QOS0;
   pubmsg.retained = 0;
   pubmsg.dup = 0;

 //推送消息,主題為 pubtest
 MQTTPublish(&c, "pubtest", &pubmsg);

完整源碼如下:

#include <stdio.h>

#include <unistd.h>

#include "ohos_init.h"
#include "cmsis_os2.h"

#include <unistd.h>
#include "hi_wifi_api.h"
//#include "wifi_sta.h"
#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"

#include "lwip/sockets.h"

#include "MQTTClient.h"

/**
 * MQTT URI farmat:
 * domain mode
 * tcp://iot.eclipse.org:1883
 *
 * ipv4 mode
 * tcp://192.168.10.1:1883
 * ssl://192.168.10.1:1884
 *
 * ipv6 mode
 * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
 * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
 */
#define MQTT_URI                "tcp://106.13.62.194:1883"

struct opts_struct
{
	char* clientid;
	int nodelimiter;
	char* delimiter;
	enum QoS qos;
	char* username;
	char* password;
	char* host;
	int port;
	int showtopics;
} opts =
{
	(char*)"stdout-subscriber", 0, (char*)"\n", QOS2, NULL, NULL, (char*)"106.13.62.194", 1883, 1
};


void messageArrived(MessageData* md)
{
	MQTTMessage* message = md->message;

	if (opts.showtopics)
		printf("%.*s\t", md->topicName->lenstring.len, md->topicName->lenstring.data);
	if (opts.nodelimiter)
		printf("%.*s", (int)message->payloadlen, (char*)message->payload);
	else
		printf("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
	//fflush(stdout);
}


unsigned char buf[100];
unsigned char readbuf[100];

int mqtt_test(void)
{
	int rc = 0;
	
	MQTTMessage pubmsg;

	
	char* topic = "test";

	if (strchr(topic, '#') || strchr(topic, '+'))
		opts.showtopics = 1;
	if (opts.showtopics)
		printf("topic is %s\n", topic);

	Network n;
	MQTTClient c;

	NetworkInit(&n);
	NetworkConnect(&n, opts.host, opts.port);
	
	MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
	MQTTStartTask(&c);

	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
	data.willFlag = 0;
	data.MQTTVersion = 3;
	data.clientID.cstring = opts.clientid;
	data.username.cstring = opts.username;
	data.password.cstring = opts.password;

	data.keepAliveInterval = 10;
	data.cleansession = 1;
	printf("Connecting to %s %d\n", opts.host, opts.port);
	
	rc = MQTTConnect(&c, &data);
	printf("Connected %d\n", rc);
    
    printf("Subscribing to %s\n", topic);
	rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);
	printf("Subscribed %d\n", rc);

	memset(&pubmsg, '\0', sizeof(pubmsg));
  	pubmsg.payload = (void*)"hello harmonyOS !";
  	pubmsg.payloadlen = strlen((char*)pubmsg.payload);
  	pubmsg.qos = QOS0;
  	pubmsg.retained = 0;
  	pubmsg.dup = 0;
	while (1)
	{
		MQTTPublish(&c, "pubtest", &pubmsg);
		sleep(1);	
	}
	
	printf("Stopping\n");

	MQTTDisconnect(&c);
	NetworkDisconnect(&n);

	return 0;
}

作者:連志安
想了解更多內容,請訪問:
51CTO和華為官方戰略合作共建的鴻蒙技術社區
https://harmonyos.51cto.com#bky


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM