rabbitmq在kubernetes中持久化集群部署


背景

Javashop電商系統的消息總線使用的事rabbitmq,在訂單創建、靜態頁生成、索引生成等等業務中大量采用異步消息系統,這個對於mq高可用的要求有兩個重要的考量:

1、集群化

2、可擴容

3、冗災

冗災就要實現rabbitmq的持久化,要考慮到rabbitmq宕機的情況,當rabbitmq因不可抗因素掛掉了,這時有一些消息還沒來得及被消費,當我們再恢復了rabbitmq的運行后,這些消息應該同時被恢復,可以再次被消費。

本文着重討論rabbitmq的k8s的持久化部署方案,當然提供在方案也支持了集群及擴容。 

思路

1、數據的存儲

在k8s中的持久化部署不可避免的要用到持久卷,我們采用nfs方式的持久卷來存儲es數據。

持久卷的詳細介紹請見這里:

https://kubernetes.io/docs/concepts/storage/persistent-volumes/

 

2、多節點的權限問題

rabbit的數據目錄默認只允許一個節點訪問,但在k8s上采用了持久卷,所有節點的數據都存儲在這個卷上,這會導致rabbitmq的數據目錄訪問權限問題:

1     {{failed_to_cluster_with,
2                          [rabbit@b8c4d82b52bc],
3                          "Mnesia could not connect to any nodes."},
4                      {rabbit,start,[normal,[]]}}

 

我們通過指定節點名稱的方式來解決,稍后可以在配置文件中看到具體的配置項。

部署過程

一、pv(持久卷的建立)

 先要建立nfs服務器

對於持久卷的結構規划如下:

1 /nfs/data/mqdata

根據如上規划建立nfs服務:

 1 #master節點安裝nfs
 2 yum -y install nfs-utils
 3 #創建nfs目錄
 4 mkdir -p /nfs/data/{mqdata,esmaster,esdata}
 5 #修改權限
 6 chmod -R 777 /nfs/data/
 7 
 8 #編輯export文件
 9 vim /etc/exports
10 
11 粘貼如下內容:
12 /nfs/data/mqdata *(rw,no_root_squash,sync)
13 
14 #配置生效
15 exportfs -r
16 #查看生效
17 exportfs
18 
19 #啟動rpcbind、nfs服務
20 systemctl restart rpcbind && systemctl enable rpcbind
21 systemctl restart nfs && systemctl enable nfs
22 
23 #查看 RPC 服務的注冊狀況
24 rpcinfo -p localhost
25 
26 #showmount測試,這里的ip輸入master節點的局域網ip
27 showmount -e <your ip>

如果成功可以看到可被掛載的目錄:

1 # showmount -e 172.17.14.73                                                                                                                  
2 Export list for 172.17.14.73:
3                                                                                                                                           /nfs/data/esmaster *                                                                                                                                            
4 /nfs/data/mqdata   *

接下來,要在每一個節點上安裝nfs服務以便使k8s可以掛載nfs目錄

1 #所有node節點安裝客戶端
2 yum -y install nfs-utils
3 systemctl start nfs && systemctl enable nfs

這樣就為k8s的持久卷做好了准備。

建立持久卷

有了nfs的准備,我就可以建立持久卷了:

我們分享了javashop內部使用的yaml倉庫供大家參考:

https://gitee.com/enation/rabbitmq-on-kubernetes

在您的k8s maseter節點服務器上 clone我們准備好的yaml文件

https://gitee.com/javashop/rabbitmq-on-kubernetes

修改根目錄中的pv.yaml

修改其中的server配置為nfs服務器的IP:

1  nfs:
2     server: 192.168.1.100 #這里請寫nfs服務器的ip

通過下面的命令建立持久卷:

1 kubectl create -f pv.yaml

通過以下命令查看持久卷是否建立成功:

1 kubectl get pv

部署rabbitmq

在k8s  master節點上執行下面的命令創建namespace:

1 kubectl create namespace ns-rabbitmq

執行下面的命令創建rabbitmq集群(執行整個目錄的所有配置文件)

1 kubectl create -f rabbitmq/

通過以上部署我們建立了一個ns-rabbitmq的namespace,並在其中創建了相應的pvc、角色賬號,有狀態副本集以及服務。

鏡像

使用的是javashop自己基於rabbitmq:3.8做的,加入了延遲消息插件,其他沒有變化。

 服務

我們默認開啟了對外nodeport端口,對應關系:

31672->15672

30672->5672

k8s內部可以通過下面的服務名稱訪問:

 

rabbitmq.ns-rabbitmq:15672

rabbitmq.ns-rabbitmq:5672

 

等待容器都啟動成功后驗證。

驗證

使用附帶程序校驗

  1. 發送消息(注釋掉接收消息)

  2. 觀察mq的隊列中有消息堆積

  3. 刪除mq的副本集

  4. 恢復mq副本集

  5. 接收消息

關鍵技術點

1、集群發現:

使用rabbitmq提供的k8s對等發現插件:rabbitmq_peer_discovery_k8s

2、映射持久卷

映射到:/var/lib/rabbitmq/mnesia

3、自定義數據目錄

1           - name: RABBITMQ_MNESIA_BASE
2             value: /var/lib/rabbitmq/mnesia/$(MY_POD_NAME)

 其中MY_POD_NAME是讀取的容器名稱,通過有狀態副本集保證唯一性的綁定:

1           - name: MY_POD_NAMESPACE
2             valueFrom:
3               fieldRef:
4                 fieldPath: metadata.namespace

 

附帶驗證程序

 1 private static CachingConnectionFactory connectionFactory;
 2 private static void initConnectionFactory() {
 3     connectionFactory = new CachingConnectionFactory();
 4     connectionFactory.setHost("localhost");
 5     connectionFactory.setPort(5672);
 6     connectionFactory.setUsername("guest");
 7     connectionFactory.setPassword("guest");
 8 }
 9 public static void main(String[] args) {
10     initConnectionFactory();
11     //發送消息
12     send();
13     //接收消息
14     receive();
15 }
16 private static void  receive() {
17     AmqpTemplate template = new RabbitTemplate(connectionFactory);
18     String foo = (String) template.receiveAndConvert("myqueue");
19     System.out.println("get message : "+ foo);
20 }
21 private static void send() {
22      AmqpAdmin admin = new RabbitAdmin(connectionFactory);
23     admin.declareQueue(new Queue("myqueue",true));
24     AmqpTemplate template = new RabbitTemplate(connectionFactory);
25     template.convertAndSend("myqueue", "foo");
26 }

 

 歡迎關注Javashop技術分享公眾號,觀看更多的視頻講解:

                                                                             


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM