一 . 環境說明
操作系統 : CentOS7
集群節點 :
172.18.0.231 集群節點1
172.18.0.232 集群節點2
172.18.0.233 集群節點3
EMQ X Broker版本 : 4.1.0
集群發現策略 : etcd
認證方式 : PostgreSQL方式
密碼加密 : SHA-265
SSL雙向認證 : 啟用
二 . 准備工作
准備etcd集群 , 可參考 : https://www.cnblogs.com/kreo/p/13155893.html
准備SSL證書 , 可參考 : https://www.cnblogs.com/kreo/p/13203973.html
三 . 下載EMQX
官網下載 : https://www.emqx.io/cn/products/broker
百度網盤下載 :
鏈接:https://pan.baidu.com/s/1mvs4M76cWeQh7VmkQXx9EA
提取碼:zfb5
四 . 解壓安裝包
注意 : 本文都采用root用戶進行安裝 , 如果需要切換其他用戶 , 也可以正常運行(注意權限問題)
#解壓 unzip emqx-centos7-v4.1.0.zip #可以移到任何你想移動到的目錄 , 本例子是移動到了/home下 , 即安裝目錄為 /home/emqx mv emqx /home/
PS : ***本文都是使用 /home/emqx 作為 ${emqx_home} 進行處理 ,如需改變 , 則需要修改相應的配置文件***
五. 修改主配置文件
主配置文件路徑 : /home/emqx/etc/emqx.conf
主要修改下面幾部分 :
1 . 節點名稱(node.name)
2 . PostgreSQL認證
3 . SSL認證
4 . etcd認證開啟
下面是我配置文件全文 , 可直接拿過去使用 , 需要修改部分我標了紅色 :
cluster.name = wanmaemqxcl cluster.proto_dist = inet_tcp cluster.discovery = etcd cluster.autoheal = on cluster.autoclean = 5m
#其他節點的server也可能要改 cluster.etcd.server = http://127.0.0.1:2379 cluster.etcd.prefix = emqxcl cluster.etcd.node_ttl = 1m
#其他節點的node.name要改 node.name = emqx@172.18.0.231 node.cookie = emqxsecretcookie node.data_dir = data node.global_gc_interval = 15m node.crash_dump = log/crash.dump node.dist_listen_min = 6369 node.dist_listen_max = 6369 rpc.mode = async rpc.async_batch_size = 256 rpc.tcp_server_port = 5369 rpc.tcp_client_port = 5369 rpc.connect_timeout = 5s rpc.send_timeout = 5s rpc.authentication_timeout = 5s rpc.call_receive_timeout = 15s rpc.socket_keepalive_idle = 900s rpc.socket_keepalive_interval = 75s rpc.socket_keepalive_count = 9 rpc.socket_sndbuf = 1MB rpc.socket_recbuf = 1MB rpc.socket_buffer = 1MB log.to = both log.level = notice log.dir = log log.file = emqx.log #log.chars_limit = 8192 log.rotation = on log.rotation.size = 10MB log.rotation.count = 5 #log.info.file = info.log log.error.file = error.log #log.sync_mode_qlen = 100 #log.drop_mode_qlen = 3000 #log.flush_qlen = 8000 #log.overload_kill = on #log.overload_kill_qlen = 20000 #log.overload_kill_mem_size = 30MB #log.overload_kill_restart_after = 5s #log.burst_limit = 20000, 1s allow_anonymous = false acl_nomatch = allow acl_file = etc/acl.conf enable_acl_cache = on acl_cache_max_size = 32 acl_cache_ttl = 1m acl_deny_action = ignore flapping_detect_policy = 30, 1m, 5m mqtt.max_packet_size = 1MB mqtt.max_clientid_len = 65535 mqtt.max_topic_levels = 0 mqtt.max_qos_allowed = 2 mqtt.max_topic_alias = 65535 mqtt.retain_available = true mqtt.wildcard_subscription = true mqtt.shared_subscription = true mqtt.ignore_loop_deliver = false mqtt.strict_mode = false zone.external.idle_timeout = 15s zone.external.enable_acl = on zone.external.enable_ban = on zone.external.enable_stats = on zone.external.acl_deny_action = ignore
#可根據實際情況修改 zone.external.force_gc_policy = 32000|32MB zone.external.keepalive_backoff = 0.75 zone.external.max_subscriptions = 0 zone.external.upgrade_qos = off zone.external.max_inflight = 32 zone.external.retry_interval = 30s zone.external.max_awaiting_rel = 100 zone.external.await_rel_timeout = 300s zone.external.session_expiry_interval = 2h zone.external.max_mqueue_len = 1000 zone.external.mqueue_priorities = none zone.external.mqueue_default_priority = highest zone.external.mqueue_store_qos0 = true zone.external.enable_flapping_detect = off zone.external.use_username_as_clientid = false zone.external.ignore_loop_deliver = false zone.external.strict_mode = false zone.internal.allow_anonymous = true zone.internal.enable_stats = on zone.internal.enable_acl = off zone.internal.acl_deny_action = ignore zone.internal.max_subscriptions = 0 zone.internal.max_inflight = 128 zone.internal.max_awaiting_rel = 1000 zone.internal.max_mqueue_len = 10000 zone.internal.mqueue_store_qos0 = true zone.internal.enable_flapping_detect = off zone.internal.ignore_loop_deliver = false zone.internal.strict_mode = false zone.internal.bypass_auth_plugins = true listener.tcp.external = 0.0.0.0:1883 listener.tcp.external.acceptors = 8 listener.tcp.external.max_connections = 1024000 listener.tcp.external.max_conn_rate = 1000 listener.tcp.external.active_n = 100 listener.tcp.external.zone = external listener.tcp.external.access.1 = allow all listener.tcp.external.backlog = 1024 listener.tcp.external.send_timeout = 15s listener.tcp.external.send_timeout_close = on listener.tcp.external.nodelay = true listener.tcp.external.reuseaddr = true listener.tcp.internal = 127.0.0.1:11883 listener.tcp.internal.acceptors = 4 listener.tcp.internal.max_connections = 1024000 listener.tcp.internal.max_conn_rate = 1000 listener.tcp.internal.active_n = 1000 listener.tcp.internal.zone = internal listener.tcp.internal.backlog = 512 listener.tcp.internal.send_timeout = 5s listener.tcp.internal.send_timeout_close = on listener.tcp.internal.recbuf = 64KB listener.tcp.internal.sndbuf = 64KB listener.tcp.internal.nodelay = false listener.tcp.internal.reuseaddr = true listener.ssl.external = 8883 listener.ssl.external.acceptors = 16 listener.ssl.external.max_connections = 102400 listener.ssl.external.max_conn_rate = 500 listener.ssl.external.active_n = 100 listener.ssl.external.zone = external listener.ssl.external.access.1 = allow all listener.ssl.external.tls_versions = tlsv1.3,tlsv1.2,tlsv1.1,tlsv1 listener.ssl.external.handshake_timeout = 15s
#證書文件要放到/home/emqx/etc/certs目錄下 listener.ssl.external.keyfile = etc/certs/server.key listener.ssl.external.certfile = etc/certs/server.pem listener.ssl.external.cacertfile = etc/certs/ca.pem listener.ssl.external.verify = verify_peer listener.ssl.external.fail_if_no_peer_cert = true listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA #listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA listener.ssl.external.reuseaddr = true listener.ws.external = 8083 listener.ws.external.mqtt_path = /mqtt listener.ws.external.acceptors = 4 listener.ws.external.max_connections = 102400 listener.ws.external.max_conn_rate = 1000 listener.ws.external.active_n = 100 listener.ws.external.zone = external listener.ws.external.access.1 = allow all listener.ws.external.verify_protocol_header = on listener.ws.external.backlog = 1024 listener.ws.external.send_timeout = 15s listener.ws.external.send_timeout_close = on listener.ws.external.nodelay = true listener.wss.external = 8084 listener.wss.external.mqtt_path = /mqtt listener.wss.external.acceptors = 4 listener.wss.external.max_connections = 16 listener.wss.external.max_conn_rate = 1000 listener.wss.external.active_n = 100 listener.wss.external.zone = external listener.wss.external.access.1 = allow all listener.wss.external.verify_protocol_header = on listener.wss.external.keyfile = etc/certs/server.key listener.wss.external.certfile = etc/certs/server.pem listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA listener.wss.external.backlog = 1024 listener.wss.external.send_timeout = 15s listener.wss.external.send_timeout_close = on modules.loaded_file = data/loaded_modules module.presence.qos = 1 plugins.etc_dir = etc/plugins/ plugins.loaded_file = data/loaded_plugins plugins.expand_plugins_dir = plugins/ broker.sys_interval = 1m broker.sys_heartbeat = 30s broker.session_locking_strategy = quorum broker.shared_subscription_strategy = random broker.shared_dispatch_ack_enabled = false broker.route_batch_clean = off sysmon.long_gc = 0 sysmon.long_schedule = 240ms sysmon.large_heap = 8MB sysmon.busy_port = false sysmon.busy_dist_port = true os_mon.cpu_check_interval = 60s os_mon.cpu_high_watermark = 80% os_mon.cpu_low_watermark = 60% os_mon.mem_check_interval = 60s os_mon.sysmem_high_watermark = 70% os_mon.procmem_high_watermark = 5% vm_mon.check_interval = 30s vm_mon.process_high_watermark = 80% vm_mon.process_low_watermark = 60%
六 . 開啟PostgreSQL認證
1 . 新建PostgreSQL數據庫 , 並新建表(為了和業務結合, 我改了官方的表名和部分結構)
新建數據庫 : mqttconfig
用戶名 : mqtt
密碼 : 123456
新建用戶表
CREATE TABLE "public"."ct_sys_emqx_user" ( "id" int4 NOT NULL, "is_superuser" bool NOT NULL, "username" varchar(100) COLLATE "pg_catalog"."default" NOT NULL, "password" varchar(255) COLLATE "pg_catalog"."default" NOT NULL, "salt" varchar(255) COLLATE "pg_catalog"."default", "enable" int2 NOT NULL DEFAULT 1, "add_user" varchar(64) COLLATE "pg_catalog"."default", "add_time" timestamp(6), "lm_user" varchar(64) COLLATE "pg_catalog"."default", "lm_time" timestamp(6), CONSTRAINT "ct_sys_emqx_user_pkey" PRIMARY KEY ("id") ); COMMENT ON COLUMN "public"."ct_sys_emqx_user"."id" IS '主鍵ID'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."is_superuser" IS '是否超級用戶'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."username" IS '用戶名'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."password" IS '密碼'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."salt" IS '鹽值'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."enable" IS '禁用0啟用1'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."add_user" IS '創建人'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."add_time" IS '創建時間'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."lm_user" IS '修改人'; COMMENT ON COLUMN "public"."ct_sys_emqx_user"."lm_time" IS '修改時間'; COMMENT ON TABLE "public"."ct_sys_emqx_user" IS 'EMQX用戶表';
新建ACL表(訪問控制表)
CREATE TABLE "public"."ct_sys_emqx_acl" ( "id" int4 NOT NULL, "allow" int4 NOT NULL, "ipaddr" varchar(60) COLLATE "pg_catalog"."default", "username" varchar(100) COLLATE "pg_catalog"."default", "clientid" varchar(100) COLLATE "pg_catalog"."default", "access" int4, "topic" varchar(100) COLLATE "pg_catalog"."default", "level" int4 NOT NULL DEFAULT 100, "enable" int2 NOT NULL DEFAULT 1, "add_user" varchar(64) COLLATE "pg_catalog"."default", "add_time" timestamp(6), "lm_user" varchar(64) COLLATE "pg_catalog"."default", "lm_time" timestamp(6), CONSTRAINT "ct_sys_emqx_acl_pkey" PRIMARY KEY ("id") ); COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."id" IS '主鍵ID'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."allow" IS '禁止0允許1'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."ipaddr" IS 'IP地址'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."username" IS '用戶名($all代表全部用戶)'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."clientid" IS '客戶端'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."access" IS '操作:訂閱1發布2全部3'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."topic" IS '控制的主題(允許占位符)'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."level" IS '等級,越高越優先'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."enable" IS '禁用0啟用1'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."add_user" IS '創建人'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."add_time" IS '創建時間'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."lm_user" IS '修改人'; COMMENT ON COLUMN "public"."ct_sys_emqx_acl"."lm_time" IS '修改時間';
添加相應數據到用戶表
注意 : password填入的是SHA-256加密后的字符串 , SHA-256加密方法參考:
public static String encryptSHA256(String content) { try { MessageDigest messageDigest = MessageDigest.getInstance("SHA-256"); return Hex.encodeHexString(messageDigest.digest(content.getBytes("UTF-8"))); } catch (UnsupportedEncodingException | NoSuchAlgorithmException e) { e.printStackTrace(); } return null; }
PS : 密碼的Hex一定要用全小寫字符 , 全大寫的經測試無效 , 所以采用Hex.encodeHexString方法 , 該方法生成全小寫的Hex字符串
2 . 修改emqx_auth_pgsql插件配置文件
配置文件路徑 : /home/emqx/etc/plugins/emqx_auth_pgsql.conf
auth.pgsql.server = 2.18.0.97:5432 auth.pgsql.pool = 8 auth.pgsql.username = mqtt auth.pgsql.password = 123456 auth.pgsql.database = mqttconfig auth.pgsql.encoding = utf8 auth.pgsql.ssl = false auth.pgsql.auth_query = select password from ct_sys_emqx_user where username = '%u' and enable = 1 limit 1 auth.pgsql.password_hash = sha256 auth.pgsql.super_query = select is_superuser from ct_sys_emqx_user where username = '%u' and enable = 1 limit 1 auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from ct_sys_emqx_acl where enable = 1 and (ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c') order by level desc
3. 修改插件加載配置
修改插件加載配置 : /home/emqx/data/loaded_plugins
下面加上一行 : {emqx_auth_pgsql,true}.
注意有個點 .
完成配置文件如下 : (你如果已經加載了其他插件 , 則可能會不同)
{emqx_management, true}. {emqx_recon, true}. {emqx_retainer, true}. {emqx_dashboard, true}. {emqx_rule_engine, true}. {emqx_bridge_mqtt, false}. {emqx_auth_pgsql , true}.
也可能這樣 :
emqx_management.
emqx_recon.
emqx_retainer.
emqx_dashboard.
emqx_rule_engine.
emqx_auth_pgsql.
兩種配置效果一樣.
通過命令方式動態加載插件 :
# ** 需要在emqx啟動后 # 使用命令 加載插件 /home/emqx/bin/emqx_ctl plugins load emqx_auth_pgsql
七 . 啟動/關閉 EMQX
1 . 改變當前環境變量(.bash_profile) , 把/home/emqx/bin 加入PATH中 (非必須)
2 . 啟動關閉腳本
#啟動腳本 /home/emqx/bin/emqx start #關閉腳本 /home/emqx/bin/emqx stop #查看啟動狀態 /home/emqx/bin/emqx_ctl status
八 . 部署其他集群環境
改變第四步中的 主配置文件中的 node.name 和 cluster.etcd.server(根據情況) , 其他全部按照 三 ~ 七 步驟進行部署
九 . 其他常用命令
# 重啟EMQX emqx restart #///啟動關閉插件 # 查看插件加載情況 emqx_ctl plugins list # Show loaded plugins # 加載插件 emqx_ctl plugins load <Plugin> # 取消加載插件 emqx_ctl plugins unload <Plugin> # 重新加載插件 emqx_ctl plugins reload <Plugin>
十 . WEB管理端(DashBoard)
隨便打開集群某一台的18083端口即可訪問
http://172.18.0.231:18083
默認用戶名 : admin
默認密碼 : public (注意修改)
里面可以查看一些集群監控信息
注意 : 此DashBorad只作為監控使用 , 不要用作管理(比如加載插件) , 否則可能出現不可預計的后果
十一 . nginx代理
使用nginx代理即可實現集群的負載均衡
首先nginx必須加載with-stream和with-stream_ssl_module模塊
具體nginx安裝可參考 : https://www.cnblogs.com/kreo/p/4378086.html
注意 : 下面配置需要放在 stream 塊中
log_format proxy '$remote_addr [$time_local] ' '$protocol $status $bytes_sent $bytes_received ' '$session_time "$upstream_addr" ' '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"'; upstream emqx_servers { zone emqx_servers 64k; hash $remote_addr; server 172.18.0.231:1883 max_fails=2 fail_timeout=30s weight=100; server 172.18.0.232:1883 max_fails=2 fail_timeout=30s weight=100; server 172.18.0.233:1883 max_fails=2 fail_timeout=30s weight=100; } server { listen 8883 ssl; proxy_pass emqx_servers; proxy_buffer_size 4k; ssl_handshake_timeout 15s;
# 下面2個模塊在nginx商業授權中 , 土豪可以打開 # status_zone emqx_servers; # health_check;
# 證書需要復制過來放入相應的目錄中 ssl_certificate /usr/local/nginx/conf/certs/emqx/client.pem; ssl_certificate_key /usr/local/nginx/conf/certs/emqx/client.key; access_log /u01/log/nginx/emqx.access.log proxy; error_log /u01/log/nginx/emqx.error.log; }
假設nginx服務器所在的IP是 : 172.18.0.97
那么 , 統一可以使用 ssl://172.18.0.97:8883 進行訪問
十二 . JAVA訪問例子
創建連接:
package test.mqtt; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.Properties; /** * @author kreo * @description * @date 2020-6-23 23:15:16 */ public class MqttConnection { private final static String broker = "ssl://172.18.0.97:8883"; private final static String clientId = "LOCAL_JAVA_CLIENT"; private final static MemoryPersistence persistence = new MemoryPersistence(); private static MqttClient client; public static MqttClient getClient() { try { if (client == null) { client = new MqttClient(broker, clientId, persistence); // MQTT 連接選項 MqttConnectOptions connOptions = new MqttConnectOptions(); connOptions.setUserName("guest"); connOptions.setPassword("123456".toCharArray()); Properties sslProperties = new Properties(); sslProperties.put(SSLSocketFactoryFactory.KEYSTORE, "/usr/var/certs/client.jks"); sslProperties.put(SSLSocketFactoryFactory.KEYSTOREPWD, "client.wanmagroup.com"); sslProperties.put(SSLSocketFactoryFactory.KEYSTORETYPE, "JKS"); sslProperties.put(SSLSocketFactoryFactory.TRUSTSTORE, "/usr/var/certs/ca.jks"); sslProperties.put(SSLSocketFactoryFactory.TRUSTSTOREPWD, "wanmagroup.com"); sslProperties.put(SSLSocketFactoryFactory.TRUSTSTORETYPE, "JKS"); sslProperties.put(SSLSocketFactoryFactory.CLIENTAUTH, true); connOptions.setSSLProperties(sslProperties); // 保留會話 connOptions.setCleanSession(true); // 設置回調 client.setCallback(new OnMessageCallback()); // 建立連接 System.out.println("嘗試建立連接... Broker >> " + broker); client.connect(connOptions); System.out.println("建立連接成功"); } } catch (MqttException me) { System.out.println("原因代碼 " + me.getReasonCode()); System.out.println("信息 " + me.getMessage()); System.out.println("LOC " + me.getLocalizedMessage()); System.out.println("原因 " + me.getCause()); me.printStackTrace(); } return client; } public static void close() { try { client.disconnect(); System.out.println("斷開連接"); client.close(); System.out.println("連接關閉"); } catch (MqttException me) { System.out.println("原因代碼 " + me.getReasonCode()); System.out.println("信息 " + me.getMessage()); System.out.println("LOC " + me.getLocalizedMessage()); System.out.println("原因 " + me.getCause()); me.printStackTrace(); } } }
訂閱和發布例子:
package test.mqtt; import com.wanma.framework.util.IDate; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.io.UnsupportedEncodingException; /** * @author kreo * @description * @date 2020-6-23 13:45:22 */ public class MqttClientHandle { // private final static String subTopic = "/home/kreo/testsub"; private final static String pubTopic = "/home/kreo/testpub"; public MqttClientHandle() { } public static void subscribe() { try { // MqttAsyncConnection.getClient().subscribe(pubTopic, 0); MqttConnection.getClient().subscribe(pubTopic, 2); } catch (MqttException me) { System.out.println("原因代碼 " + me.getReasonCode()); System.out.println("信息 " + me.getMessage()); System.out.println("LOC " + me.getLocalizedMessage()); System.out.println("原因 " + me.getCause()); me.printStackTrace(); } } public static void publish(int qos) { try { String content = "發送:" + IDate.getNowMillis(); MqttMessage message = new MqttMessage(); message.setQos(qos); message.setPayload(content.getBytes("UTF-8")); MqttConnection.getClient().publish(pubTopic, message); } catch (MqttException me) { System.out.println("原因代碼 " + me.getReasonCode()); System.out.println("信息 " + me.getMessage()); System.out.println("LOC " + me.getLocalizedMessage()); System.out.println("原因 " + me.getCause()); me.printStackTrace(); } catch (UnsupportedEncodingException ue) { ue.printStackTrace(); } } // public static public static void main(String[] args) { // 訂閱 subscribe(); //發布 publish(0); //publish(1); //publish(2); } }
十三 . 其他特別知識點
1 . 消息的QOS可能會降級 , 比如使用QOS=2發布 , 但是接收方卻使用QOS = 0進行接收 , 那么消息就會降級成為QOS0被接收方接收