EMQ X:集群


簡介

Erlang/OTP 最初是愛立信為開發電信設備系統設計的編程語言平台,電信設備 (路由器、接入網關...) 典型設計是通過背板連接主控板卡與多塊業務板卡的分布式系統。

Erlang/OTP 語言平台的分布式程序,由分布互聯的 Erlang 運行時系統組成,每個 Erlang 運行時系統被稱為節點(Node),節點間通過 TCP 兩兩互聯,組成一個網狀結構。

image-20210728151402948

Erlang 節點由唯一的節點名稱標識,節點名稱由 @ 分隔的兩部分組成:

<name>@<ip-address>

Erlang 節點間通過 cookie 進行互連認證。cookie 是一個字符串,只有 cookie 相同的兩個節點才能建立連接。

cookie的配置在emqx.conf配置文件中,默認配置如下:

## Node name.
##
## See: http://erlang.org/doc/reference_manual/distributed.html
##
## Value: <name>@<host>
##
## Default: emqx@127.0.0.1
node.name = emqx@127.0.0.1

## Cookie for distributed node communication.
##
## Value: String
node.cookie = emqxsecretcookie

EMQ X 集群協議設置

Erlang 集群中各節點可通過 TCPv4、TCPv6 或 TLS 方式連接,可在 etc/emqx.conf 中配置連接方式:

配置名 類型 默認值 描述
cluster.proto_dist enum inet_tcp 分布式協議,可選值:
- inet_tcp: 使用 TCP IPv4
- inet6_tcp: 使用 TCP IPv6
- inet_tls: 使用 TLS
node.ssl_dist_optfile 文件路徑 etc/ssl_dist.conf cluster.proto_dist 選定為 inet_tls 時,需要配置 etc/ssl_dist.conf 文件,指定 TLS 證書等

EMQ X 分布式集群設計

EMQ X 分布式的基本功能是將消息轉發和投遞給各節點上的訂閱者,如下圖所示:

image-20210728152832286

為實現此過程,EMQ X 維護了幾個與之相關的數據結構:訂閱表,路由表,主題樹。

訂閱表

主題 - 訂閱者

MQTT 客戶端訂閱主題時,EMQ X 會維護主題(Topic) -> 訂閱者(Subscriber) 映射的訂閱表。訂閱表只存在於訂閱者所在的 EMQ X 節點上,例如:

node1:

    topic1 -> client1, client2
    topic2 -> client3

node2:

    topic1 -> client4

路由表

主題 - 節點

而同一集群的所有節點,都會復制一份主題(Topic) -> 節點(Node) 映射的路由表,例如:

topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4

主題樹

帶統配符的主題匹配

除路由表之外,EMQ X 集群中的每個節點也會維護一份主題樹(Topic Trie) 的備份。

例如下述主題訂閱關系:

客戶端 節點 訂閱主題
client1 node1 t/+/x, t/+/y
client2 node2 t/#
client3 node3 t/+/x, t/a

在所有訂閱完成時,EMQ X 中會維護如下主題樹 (Topic Trie) 和路由表 (Route Table):

image-20210728153542314

消息派發過程

當 MQTT 客戶端發布消息時,所在節點會根據消息主題,檢索路由表並轉發消息到相關節點,再由相關節點檢索本地的訂閱表並將消息發送給相關訂閱者。

例如 client1 向主題 t/a 發布消息,消息在節點間的路由與派發流程:

  1. client1 發布主題為 t/a 的消息到節點 node1
  2. node1 通過查詢主題樹,得知 t/a 可匹配到現有的 t/at/# 這兩個主題。
  3. node1 通過查詢路由表,得知主題 t/a 只在 node3 上有訂閱者,而主題 t/# 只在 node2 上有訂閱者。故 node1 將消息轉發給 node2 和 node3。
  4. node2 收到轉發來的 t/a 消息后,查詢本地訂閱表,獲取本節點上訂閱了 t/# 的訂閱者,並把消息投遞給他們。
  5. node3 收到轉發來的 t/a 消息后,查詢本地訂閱表,獲取本節點上訂閱了 t/a 的訂閱者,並把消息投遞給他們。
  6. 消息轉發和投遞結束。

集群策略

EMQ X 支持基於 Ekka 庫的集群自動發現 (Autocluster)。Ekka 是為 Erlang/OTP 應用開發的集群管理庫,支持 Erlang 節點自動發現 (Service Discovery)、自動集群 (Autocluster)、腦裂自動愈合 (Network Partition Autoheal)、自動刪除宕機節點 (Autoclean)。

EMQ X 支持多種節點發現策略:

策略 說明
manual 手動命令創建集群
static 靜態節點列表自動集群
mcast UDP 組播方式自動集群
dns DNS A 記錄自動集群
etcd 通過 etcd 自動集群
k8s Kubernetes 服務自動集群

節點發現策略配置:在/etc/emqx/emqx.confcluster.discovery配置項中

默認是manual

[root@localhost emqx]# cat /etc/emqx/emqx.conf | grep cluster.discovery
cluster.discovery = manual

manual 手動創建集群

默認配置為手動創建集群,節點須通過 emqx_ctl join 命令加入:

cluster.discovery = manual

基於 static 節點列表自動集群

配置固定的節點列表,自動發現並創建集群:

cluster.discovery = static
cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1  

基於 mcast 組播自動集群

基於 UDP 組播自動發現並創建集群:

cluster.discovery = mcast
cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on

基於 DNS A 記錄自動集群

基於 DNS A 記錄自動發現並創建集群:

cluster.discovery = dns
cluster.dns.name = localhost
cluster.dns.app  = ekka

基於 etcd 自動集群

基於 etcd (opens new window)自動發現並創建集群:

cluster.discovery = etcd
cluster.etcd.server = http://127.0.0.1:2379
cluster.etcd.prefix = emqcl
cluster.etcd.node_ttl = 1m

基於 kubernetes 自動集群

Kubernetes (opens new window)下自動發現並創建集群:

cluster.discovery = k8s
cluster.k8s.apiserver = http://10.110.111.204:8080
cluster.k8s.service_name = ekka
cluster.k8s.address_type = ip
cluster.k8s.app_name = ekka
    

Kubernetes 不建議使用 Fannel 網絡插件,推薦使用 Calico 網絡插件。

manual 集群搭建

環境:

節點名 ip地址
emqx128@192.168.40.128 192.168.40.128
emqx129@192.168.40.129 192.168.40.129

配置節點1

修改配置文件emqx.conf

使用yum安裝方式,默認的集群策略就是manual,不需要修改

node1節點修改emqx.conf:

node.name = emqx128@192.168.40.128

防火牆

如果集群節點間存在防火牆,防火牆需要開啟 4369 端口和一個 TCP 端口段。4369 由 epmd 端口映射服務使用,TCP 端口段用於節點間建立連接與通信。

防火牆設置后,需要在 /etc/emqx/emqx.conf 中配置相同的端口段:

## Distributed node port range
node.dist_listen_min = 6369
node.dist_listen_max = 7369

啟動節點1:

emqx start

配置節點2

修改配置文件emqx.conf

node2節點修改emqx.conf:

node.name = emqx129@192.168.40.129

啟動node2:

emqx start

組成集群

在node1上操作:

[root@localhost emqx]# emqx_ctl cluster join emqx129@192.168.40.129
=CRITICAL REPORT==== 28-Jul-2021::16:15:39.528620 ===
[EMQ X] emqx shutdown for join
Join the cluster successfully.
Cluster status: #{running_nodes =>
                      ['emqx128@192.168.40.128','emqx129@192.168.40.129'],
                  stopped_nodes => []}

集群搭建成功后,dashboard也可以發現節點的相關信息:

image-20210728161644837

集群狀態

[root@localhost emqx]# emqx_ctl cluster status
Cluster status: #{running_nodes =>
                      ['emqx128@192.168.40.128','emqx129@192.168.40.129'],
                  stopped_nodes => []}

測試集群收發

這個連接的是node1,並發送消息到testtopic/123主題中

image-20210728165840815

這個連接的是node2,並監聽testtopic/#,接收到node1發送過來的消息:

image-20210728165951650

退出集群

節點退出集群,兩種方式:

  • leave: 讓本節點退出集群
  • force-leave: 從集群刪除其他節點

讓節點2主動退出集群:

#在node2上執行
emqx_ctl cluster leave

或節點1上,從集群刪除節點2:

emqx_ctl cluster force-leave emqx129@192.168.40.129

image-20210728162051574

集群節點自動清除

EMQ X 支持從集群自動刪除宕機節點 (Autoclean),可在 emqx.conf 中配置:

cluster.autoclean = 5m

集群腦裂與自動愈合

EMQ X 支持集群腦裂自動恢復(Network Partition Autoheal),可在 emqx.conf 中配置:

cluster.autoheal = on

集群腦裂自動恢復流程:

  1. 節點收到 Mnesia 的 inconsistent_database 事件 3 秒后進行集群腦裂確認;
  2. 節點確認集群腦裂發生后,向 Leader 節點 (集群中最早啟動節點) 上報腦裂消息;
  3. Leader 節點延遲一段時間后,在全部節點在線狀態下創建腦裂視圖 (SplitView);
  4. Leader 節點在多數派 (majority) 分區選擇集群自愈的 Coordinator 節點;
  5. Coordinator 節點重啟少數派 (minority) 分區節點恢復集群。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM