簡介
Erlang/OTP 最初是愛立信為開發電信設備系統設計的編程語言平台,電信設備 (路由器、接入網關...) 典型設計是通過背板連接主控板卡與多塊業務板卡的分布式系統。
Erlang/OTP 語言平台的分布式程序,由分布互聯的 Erlang 運行時系統組成,每個 Erlang 運行時系統被稱為節點(Node),節點間通過 TCP 兩兩互聯,組成一個網狀結構。
Erlang 節點由唯一的節點名稱標識,節點名稱由 @
分隔的兩部分組成:
<name>@<ip-address>
Erlang 節點間通過 cookie 進行互連認證。cookie 是一個字符串,只有 cookie 相同的兩個節點才能建立連接。
cookie的配置在emqx.conf
配置文件中,默認配置如下:
## Node name.
##
## See: http://erlang.org/doc/reference_manual/distributed.html
##
## Value: <name>@<host>
##
## Default: emqx@127.0.0.1
node.name = emqx@127.0.0.1
## Cookie for distributed node communication.
##
## Value: String
node.cookie = emqxsecretcookie
EMQ X 集群協議設置
Erlang 集群中各節點可通過 TCPv4、TCPv6 或 TLS 方式連接,可在 etc/emqx.conf
中配置連接方式:
配置名 | 類型 | 默認值 | 描述 |
---|---|---|---|
cluster.proto_dist | enum | inet_tcp |
分布式協議,可選值: - inet_tcp: 使用 TCP IPv4 - inet6_tcp: 使用 TCP IPv6 - inet_tls: 使用 TLS |
node.ssl_dist_optfile | 文件路徑 | etc/ssl_dist.conf |
當 cluster.proto_dist 選定為 inet_tls 時,需要配置 etc/ssl_dist.conf 文件,指定 TLS 證書等 |
EMQ X 分布式集群設計
EMQ X 分布式的基本功能是將消息轉發和投遞給各節點上的訂閱者,如下圖所示:
為實現此過程,EMQ X 維護了幾個與之相關的數據結構:訂閱表,路由表,主題樹。
訂閱表
主題 - 訂閱者
MQTT 客戶端訂閱主題時,EMQ X 會維護主題(Topic) -> 訂閱者(Subscriber) 映射的訂閱表。訂閱表只存在於訂閱者所在的 EMQ X 節點上,例如:
node1:
topic1 -> client1, client2
topic2 -> client3
node2:
topic1 -> client4
路由表
主題 - 節點
而同一集群的所有節點,都會復制一份主題(Topic) -> 節點(Node) 映射的路由表,例如:
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
主題樹
帶統配符的主題匹配
除路由表之外,EMQ X 集群中的每個節點也會維護一份主題樹(Topic Trie) 的備份。
例如下述主題訂閱關系:
客戶端 | 節點 | 訂閱主題 |
---|---|---|
client1 | node1 | t/+/x, t/+/y |
client2 | node2 | t/# |
client3 | node3 | t/+/x, t/a |
在所有訂閱完成時,EMQ X 中會維護如下主題樹 (Topic Trie) 和路由表 (Route Table):
消息派發過程
當 MQTT 客戶端發布消息時,所在節點會根據消息主題,檢索路由表並轉發消息到相關節點,再由相關節點檢索本地的訂閱表並將消息發送給相關訂閱者。
例如 client1 向主題 t/a
發布消息,消息在節點間的路由與派發流程:
- client1 發布主題為
t/a
的消息到節點 node1 - node1 通過查詢主題樹,得知
t/a
可匹配到現有的t/a
、t/#
這兩個主題。 - node1 通過查詢路由表,得知主題
t/a
只在 node3 上有訂閱者,而主題t/#
只在 node2 上有訂閱者。故 node1 將消息轉發給 node2 和 node3。 - node2 收到轉發來的
t/a
消息后,查詢本地訂閱表,獲取本節點上訂閱了t/#
的訂閱者,並把消息投遞給他們。 - node3 收到轉發來的
t/a
消息后,查詢本地訂閱表,獲取本節點上訂閱了t/a
的訂閱者,並把消息投遞給他們。 - 消息轉發和投遞結束。
集群策略
EMQ X 支持基於 Ekka 庫的集群自動發現 (Autocluster)。Ekka 是為 Erlang/OTP 應用開發的集群管理庫,支持 Erlang 節點自動發現 (Service Discovery)、自動集群 (Autocluster)、腦裂自動愈合 (Network Partition Autoheal)、自動刪除宕機節點 (Autoclean)。
EMQ X 支持多種節點發現策略:
策略 | 說明 |
---|---|
manual | 手動命令創建集群 |
static | 靜態節點列表自動集群 |
mcast | UDP 組播方式自動集群 |
dns | DNS A 記錄自動集群 |
etcd | 通過 etcd 自動集群 |
k8s | Kubernetes 服務自動集群 |
節點發現策略配置:在/etc/emqx/emqx.conf
的cluster.discovery
配置項中
默認是manual
[root@localhost emqx]# cat /etc/emqx/emqx.conf | grep cluster.discovery
cluster.discovery = manual
manual 手動創建集群
默認配置為手動創建集群,節點須通過 emqx_ctl join
cluster.discovery = manual
基於 static 節點列表自動集群
配置固定的節點列表,自動發現並創建集群:
cluster.discovery = static
cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1
基於 mcast 組播自動集群
基於 UDP 組播自動發現並創建集群:
cluster.discovery = mcast
cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on
基於 DNS A 記錄自動集群
基於 DNS A 記錄自動發現並創建集群:
cluster.discovery = dns
cluster.dns.name = localhost
cluster.dns.app = ekka
基於 etcd 自動集群
基於 etcd (opens new window)自動發現並創建集群:
cluster.discovery = etcd
cluster.etcd.server = http://127.0.0.1:2379
cluster.etcd.prefix = emqcl
cluster.etcd.node_ttl = 1m
基於 kubernetes 自動集群
Kubernetes (opens new window)下自動發現並創建集群:
cluster.discovery = k8s
cluster.k8s.apiserver = http://10.110.111.204:8080
cluster.k8s.service_name = ekka
cluster.k8s.address_type = ip
cluster.k8s.app_name = ekka
Kubernetes 不建議使用 Fannel 網絡插件,推薦使用 Calico 網絡插件。
manual 集群搭建
環境:
節點名 | ip地址 |
---|---|
emqx128@192.168.40.128 | 192.168.40.128 |
emqx129@192.168.40.129 | 192.168.40.129 |
配置節點1
修改配置文件emqx.conf
使用yum安裝方式,默認的集群策略就是manual,不需要修改
node1節點修改emqx.conf:
node.name = emqx128@192.168.40.128
防火牆
如果集群節點間存在防火牆,防火牆需要開啟 4369 端口和一個 TCP 端口段。4369 由 epmd 端口映射服務使用,TCP 端口段用於節點間建立連接與通信。
防火牆設置后,需要在 /etc/emqx/emqx.conf 中配置相同的端口段:
## Distributed node port range
node.dist_listen_min = 6369
node.dist_listen_max = 7369
啟動節點1:
emqx start
配置節點2
修改配置文件emqx.conf
node2節點修改emqx.conf:
node.name = emqx129@192.168.40.129
啟動node2:
emqx start
組成集群
在node1上操作:
[root@localhost emqx]# emqx_ctl cluster join emqx129@192.168.40.129
=CRITICAL REPORT==== 28-Jul-2021::16:15:39.528620 ===
[EMQ X] emqx shutdown for join
Join the cluster successfully.
Cluster status: #{running_nodes =>
['emqx128@192.168.40.128','emqx129@192.168.40.129'],
stopped_nodes => []}
集群搭建成功后,dashboard也可以發現節點的相關信息:

集群狀態
[root@localhost emqx]# emqx_ctl cluster status
Cluster status: #{running_nodes =>
['emqx128@192.168.40.128','emqx129@192.168.40.129'],
stopped_nodes => []}
測試集群收發
這個連接的是node1,並發送消息到testtopic/123主題中

這個連接的是node2,並監聽testtopic/#,接收到node1發送過來的消息:

退出集群
節點退出集群,兩種方式:
- leave: 讓本節點退出集群
- force-leave: 從集群刪除其他節點
讓節點2主動退出集群:
#在node2上執行
emqx_ctl cluster leave
或節點1上,從集群刪除節點2:
emqx_ctl cluster force-leave emqx129@192.168.40.129
集群節點自動清除
EMQ X 支持從集群自動刪除宕機節點 (Autoclean),可在 emqx.conf
中配置:
cluster.autoclean = 5m
集群腦裂與自動愈合
EMQ X 支持集群腦裂自動恢復(Network Partition Autoheal),可在 emqx.conf
中配置:
cluster.autoheal = on
集群腦裂自動恢復流程:
- 節點收到 Mnesia 的
inconsistent_database
事件 3 秒后進行集群腦裂確認; - 節點確認集群腦裂發生后,向 Leader 節點 (集群中最早啟動節點) 上報腦裂消息;
- Leader 節點延遲一段時間后,在全部節點在線狀態下創建腦裂視圖 (SplitView);
- Leader 節點在多數派 (majority) 分區選擇集群自愈的 Coordinator 節點;
- Coordinator 節點重啟少數派 (minority) 分區節點恢復集群。