Rust 是由 Mozilla 主導開發的通用、編譯型編程語言。該語言的設計准則為:安全、並發、實用,支持 函數式、並發式、過程式以及面向對象的編程風格。Rust 速度驚人且內存利用率極高。由於沒有運行時和垃圾回收,它能夠勝任對性能要求特別高的服務,可以在嵌入式設備上運行,還能輕松和其他語言集成。Rust 豐富的類型系統和所有權模型保證了內存安全和線程安全,讓您在編譯期就能夠消除各種各樣的錯誤。
MQTT 是一種基於發布/訂閱模式的 輕量級物聯網消息傳輸協議 ,可以用極少的代碼和帶寬為聯網設備提供實時可靠的消息服務,它廣泛應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等行業。
本文主要介紹如何在 Rust 項目中使用 paho-mqtt 客戶端庫 ,實現客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發消息等功能。
項目初始化
本項目使用 Rust 1.44.0 進行開發測試,並使用 Cargo 1.44.0 包管理工具進行項目管理,讀者可用如下命令查看當前的 Rust 版本。
~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)
選擇 MQTT 客戶端庫
paho-mqtt 是目前 Rust 中,功能完善且使用較多的 MQTT 客戶端,最新的 0.7.1
版本支持 MQTT v5、3.1.1、3.1,支持通過標准 TCP、SSL / TLS、WebSockets 傳輸數據,QoS 支持 0、1、2 等。
初始化項目
執行以下命令創建名為 mqtt-example
的 Rust 新項目。
~ cargo new mqtt-example
Created binary (application) `mqtt-example` package
編輯項目中的 Cargo.toml
文件,在 dependencies
中添加 paho-mqtt
庫的地址,以及指定訂閱、發布代碼文件對應的二進制文件。
[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }
[[bin]]
name = "sub"
path = "src/sub/main.rs"
[[bin]]
name = "pub"
path = "src/pub/main.rs"
Rust MQTT 的使用
創建客戶端連接
本文將使用 EMQ X 提供的 免費公共 MQTT 服務器 作為測試連接的 MQTT 服務器,該服務基於 EMQ X 的 MQTT 物聯網雲平台 創建。服務器接入信息如下:
- Broker: broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
配置 MQTT Broker 連接參數
配置 MQTT Broker 連接地址(包括端口)、topic (這里我們配置了兩個 topic ),以及客戶端 id。
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
編寫 MQTT 連接代碼
編寫 MQTT 連接代碼,為了提升使用體驗,可在執行二進制文件時通過命令行參數的形式傳入連接地址。通常我們需要先創建一個客戶端,然后將該客戶端連接到 broker.emqx.io
。
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
發布消息
這里我們總共發布五條消息,根據循環的奇偶性,分別向 rust/mqtt
、 rust/test
這兩個主題發布。
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
訂閱消息
在客戶端連接之前,需要先初始化消費者。這里我們會循環處理消費者中的消息隊列,並打印出訂閱的 topic 名稱及接收到的消息內容。
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
...
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
...
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
...
}
完整代碼
消息發布代碼
use std::{
env,
process,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Create a message and publish it.
// Publish message to 'test' and 'hello' topics.
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
// Disconnect from the broker.
let tok = cli.disconnect(None);
println!("Disconnect from the broker");
tok.unwrap();
}
消息訂閱代碼
為了提升使用體驗,消息訂閱做了斷開重連的處理,並在重新建立連接后對主題進行重新訂閱。
use std::{
env,
process,
thread,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];
// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}
// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
// Define the set of options for the connection.
let lwt = mqtt::MessageBuilder::new()
.topic("test")
.payload("Consumer lost connection")
.finalize();
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.will_message(lwt)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
// If still connected, then disconnect now.
if cli.is_connected() {
println!("Disconnecting");
cli.unsubscribe_many(DFLT_TOPICS).unwrap();
cli.disconnect(None).unwrap();
}
println!("Exiting");
}
運行與測試
編譯二進制文件
執行以下命令,會在 mqtt-example/target/debug
目錄下生成消息訂閱、發布對應的 sub
、pub
二進制文件。
cargo build
消息訂閱
執行 sub
二進制文件,等待消費發布。
消息發布
執行 pub
二進制文件,可以看到分別往 rust/test
、rust/mqtt
這兩個主題發布了消息。
同時在消息訂閱中可看到發布的消息
至此,我們完成了使用 paho-mqtt 客戶端連接到 公共 MQTT 服務器,並實現了測試客戶端與 MQTT 服務器的連接、消息發布和訂閱。
版權聲明: 本文為 EMQ 原創,轉載請注明出處。