消息接收+數據持久化:Mosquitto+MQTT+MySQL筆記


https://blog.csdn.net/yangkaizzz/article/details/112258463

消息接收+數據持久化:Mosquitto+MQTT+MySQL
文章目錄
消息接收+數據持久化:Mosquitto+MQTT+MySQL
業務需求:
具體實現
1 數據庫連接——C3P0數據庫連接池
1.1添加依賴
1.2 配置
1.3 編寫工具類
2 Mosquitto 客戶端的JAVA實現
2.1 MQTT協議實現方式
2.2 從Mosquitto存取數據
2.2.1 添加依賴
2.2.2 客戶端訂閱
2.2.3 消息接收后,業務邏輯添加
業務需求:
服務器能實時更新設備參數和加載最新數據
服務器能將從Mosquitto收到的數據持久化到本地的數據庫
具體實現
1 數據庫連接——C3P0數據庫連接池
數據庫連接池負責分配、管理和釋放數據庫連接,它允許應用程序重復使用一個現有的數據庫連接,而不是再重新建立一個;釋放空閑時間超過最大空閑時間的數據庫連接來避免因為沒有釋放數據庫連接而引起的數據庫連接遺漏。這項技術能明顯提高對數據庫操作的性能。

1.1添加依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
1.2 配置
在src下的source配置目錄(maven項目)新建一個名叫 c3p0-config.xml 的文件(必須是這個文件名)
主要包括:初始化連接池時建立多少個連接、連接池最少多少個連接 最多容納多少連接、每個連接的生存時間、連接池能同時允許多少個操作進行, 以及對具體數據庫連接的配置:數據庫的驅動、 數據庫的URL、 數據庫登錄名、 數據庫密碼、對這個數據庫的連接池的細化配置(比如初始化時建立多少連接, 最多最少連接數等等)。
一個數據庫的連接池配置用一個 節點來定義。在C3P0Utils中創建連接池時把 “標識” 作為連接池的構造函數的參數傳入,則C3P0在配置文件中找到同名節點,按照這個節點的配置來創建相應配置的連接池。

<?xml version="1.0" encoding="UTF-8"?>
<c3p0-config>
<!-- 默認配置,如果沒有指定則使用這個配置 -->
<default-config>
<property name="driverClass">com.mysql.jdbc.Driver</property>
<property name="jdbcUrl">
<![CDATA[jdbc:mysql://127.0.0.1:3306/aa?useUnicode=true&characterEncoding=UTF-8]]>
</property>
<property name="user">root</property>
<property name="password">0000</property>
<!-- 初始化池大小 -->
<property name="initialPoolSize">2</property>
<!-- 最大空閑時間 -->
<property name="maxIdleTime">30</property>
<!-- 最多有多少個連接 -->
<property name="maxPoolSize">10</property>
<!-- 最少幾個連接 -->
<property name="minPoolSize">2</property>
<!-- 每次最多可以執行多少個批處理語句 -->
<property name="maxStatements">50</property>
</default-config>

<!-- 命名的配置 -->
<named-config name="yk">
<property name="driverClass">com.mysql.jdbc.Driver</property>
<property name="jdbcUrl">jdbc:mysql://127.0.0.1:3306/aa</property>
<property name="user">userName</property>
<property name="password">password</property>
<property name="acquireIncrement">5</property><!-- 如果池中數據連接不夠時一次增長多少個 -->
<property name="initialPoolSize">100</property>
<property name="minPoolSize">50</property>
<property name="maxPoolSize">1000</property>
<property name="maxStatements">0</property>
<property name="maxStatementsPerConnection">5</property>
</named-config>
</c3p0-config>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1.3 編寫工具類
需要定義獲取connection、釋放connection的方法

package data.c3p0Utils;

import java.sql.Connection;
import java.sql.SQLException;

import com.mchange.v2.c3p0.ComboPooledDataSource;

public class C3P0Utils {
private C3P0Utils() {}
private static ComboPooledDataSource ds = null;
private static ThreadLocal<Connection>tLocal = new ThreadLocal<>();
static {
ds = new ComboPooledDataSource("yk");//讀取名為yk的配置
public static Connection getConnection() {
Connection con = tLocal.get();
if(con==null) {
try {
con = ds.getConnection();
tLocal.set(con);
} catch (SQLException e) {
System.out.println("連接數據庫出錯");
e.printStackTrace();
}
}
return con;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2 Mosquitto 客戶端的JAVA實現
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的“輕量級”通訊協議,MQTT消息的發送和訂閱都是依賴MQTT服務器。

2.1 MQTT協議實現方式
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者、代理、訂閱者,其中消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者也可以是訂閱者。

MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:

主題:可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload)
負載:可以理解為消息內容,是指的訂閱者具體收到的數據。
1
2
2.2 從Mosquitto存取數據
MQ是消息中間件,是一種在分布式系統中應用程序借以傳遞消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka,Mosquitto (我們這里使用的Mosquitto)

2.2.1 添加依賴
了解Paho
Paho Java客戶端是一個用Java編寫的MQTT客戶端庫,用於開發在JVM或其他Java兼容平台(如Android)上運行的應用程序。
Paho Java客戶端提供了兩個API:MqttAsyncClient提供了一個完全異步的API,通過已注冊的回調通知完成活動。 MqttClient是MqttAsyncClient的一個同步包裝,其中函數與應用程序同步。

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
1
2
3
4
5
2.2.2 客戶端訂閱
package com.NWPU.wsn.data;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class MyMqttClient {

public static final String HOST = "tcp://10.70.143.83:1883";
public static final String TOPIC = "test";
private static final String clientid = "client1ID";
private MqttClient client;
private MqttConnectOptions options;
private String userName = "userName";
private String passWord = "password";

private ScheduledExecutorService scheduler;

//重新鏈接
public void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
if (!client.isConnected()) {
try {
client.connect(options);
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
}

private void start() {
try {
// host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接設置
options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(true);
// 設置連接的用戶名
options.setUserName(userName);
// 設置連接的密碼
options.setPassword(passWord.toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 設置回調
client.setCallback(new MqttReceriveCallback());
MqttTopic topic = client.getTopic(TOPIC);

//setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
options.setWill(topic, "close".getBytes(), 0, true);

client.connect(options);
//訂閱消息
int[] Qos = {1};
String[] topic1 = {TOPIC};
client.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
}
public void disconnect() {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws MqttException {
MyMqttClient client = new MyMqttClient();
client.start();
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
2.2.3 消息接收后,業務邏輯添加
即編寫發布消息的回調類:

必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。

每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。

在回調中,將它用來標識已經啟動了該回調的哪個實例。

必須在回調類中實現三個方法:

public void connectionLost(Throwable cause) 在斷開連接時調用。調用在

public void deliveryComplete(MqttDeliveryToken token))

public void messageArrived(MqttTopic topic, MqttMessage message) 接收已經預訂的發布。在此部分需要將接收到的消息解析,數據的持久化操作,具體為流程為:先對收到的來自於網關的json字符串進行反序列化,將其轉化為sensor實體類,編寫SQL語句,實現寫數據庫

package data;

import data.c3p0Utils.C3P0Utils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.sql.Connection;
import java.sql.Statement;

public class MqttReceriveCallback implements MqttCallback {

public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
System.out.println("連接斷開,正在嘗試做重連.......");
MyMqttClient client = new MyMqttClient();
client.startReconnect();
System.out.println("重連成功..........");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內容 : " + new String(message.getPayload()));

//json字符串返回值反序列化為實體類
Sensor sensor = JSONObject.parseObject(new String(message.getPayload()), Sensor.class);
//將消息存儲進入數據庫

String sql= "INSERT INTO sensor (sID,sTemperature,sHumidity) VALUES ("
+sensor.getsId()+","+sensor.getsTemperature()+","+sensor.getsHumidity()+");";
Connection conn = null;
try {
conn = C3p0Utils.getConnection();
} catch (MyError myError) {
myError.printStackTrace();
}
Statement stmt=conn.createStatement();
stmt.executeUpdate(sql);;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
**注:**Statement 接口提供了三種執行 SQL 語句的方法:executeQuery、executeUpdate 和execute。使用哪一個方法由 SQL 語句所產生的內容決定。

executeQuery 用於產生單個結果集的語句,例如 SELECT 語句

executeUpdate 用於執行 INSERT、UPDATE 或 DELETE 語句以及 SQLDDL(數據定義語言)語句。
INSERT、UPDATE 或 DELETE語句的效果是修改表中零行或多行中的一列或多列,executeUpdate 的返回值是一個整數,表示受影響的行數(即更新計數)。
對於CREATE TABLE 或 DROP TABLE 等不操作行的語句,executeUpdate 的返回值總為零

execute 用於執行返回多個結果集、多個更新計數或二者組合的語句,多數程序員不會需要該高級功能。

sensor實體類

package com.NWPU.wsn.data.pojo;

import java.util.Date;

public class Sensor {
private float sTemperature;
private float sHumidity;
private String sDate;
private int sId;

public Sensor(float sTemperature, float sHumidity, String sDate, int sId) {
this.sTemperature = sTemperature;
this.sHumidity = sHumidity;
this.sDate = sDate;
this.sId = sId;
}

@Override
public String toString() {
return "Sensor{" +
"sTemperature=" + sTemperature +
", sHumidity=" + sHumidity +
", sDate='" + sDate + '\'' +
", sId=" + sId +
'}';
}

public float getsTemperature() {
return sTemperature;
}

public void setsTemperature(float sTemperature) {
this.sTemperature = sTemperature;
}

public float getsHumidity() {
return sHumidity;
}

public void setsHumidity(float sHumidity) {
this.sHumidity = sHumidity;
}

public String getsDate() {
return sDate;
}

public void setsDate(String sDate) {
this.sDate = sDate;
}

public int getsId() {
return sId;
}

public void setsId(int sId) {
this.sId = sId;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

————————————————
版權聲明:本文為CSDN博主「木易豈幾」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/yangkaizzz/article/details/112258463


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM