背景
Javashop電商系統的消息總線使用的事rabbitmq,在訂單創建、靜態頁生成、索引生成等等業務中大量采用異步消息系統,這個對於mq高可用的要求有兩個重要的考量:
1、集群化
2、可擴容
3、冗災
冗災就要實現rabbitmq的持久化,要考慮到rabbitmq宕機的情況,當rabbitmq因不可抗因素掛掉了,這時有一些消息還沒來得及被消費,當我們再恢復了rabbitmq的運行后,這些消息應該同時被恢復,可以再次被消費。
本文着重討論rabbitmq的k8s的持久化部署方案,當然提供在方案也支持了集群及擴容。
思路
1、數據的存儲
在k8s中的持久化部署不可避免的要用到持久卷,我們采用nfs方式的持久卷來存儲es數據。
持久卷的詳細介紹請見這里:
https://kubernetes.io/docs/concepts/storage/persistent-volumes/
2、多節點的權限問題
rabbit的數據目錄默認只允許一個節點訪問,但在k8s上采用了持久卷,所有節點的數據都存儲在這個卷上,這會導致rabbitmq的數據目錄訪問權限問題:
1 {{failed_to_cluster_with, 2 [rabbit@b8c4d82b52bc], 3 "Mnesia could not connect to any nodes."}, 4 {rabbit,start,[normal,[]]}}
我們通過指定節點名稱的方式來解決,稍后可以在配置文件中看到具體的配置項。
部署過程
一、pv(持久卷的建立)
先要建立nfs服務器
對於持久卷的結構規划如下:
1 /nfs/data/mqdata
根據如上規划建立nfs服務:
1 #master節點安裝nfs 2 yum -y install nfs-utils 3 #創建nfs目錄 4 mkdir -p /nfs/data/{mqdata,esmaster,esdata} 5 #修改權限 6 chmod -R 777 /nfs/data/ 7 8 #編輯export文件 9 vim /etc/exports 10 11 粘貼如下內容: 12 /nfs/data/mqdata *(rw,no_root_squash,sync) 13 14 #配置生效 15 exportfs -r 16 #查看生效 17 exportfs 18 19 #啟動rpcbind、nfs服務 20 systemctl restart rpcbind && systemctl enable rpcbind 21 systemctl restart nfs && systemctl enable nfs 22 23 #查看 RPC 服務的注冊狀況 24 rpcinfo -p localhost 25 26 #showmount測試,這里的ip輸入master節點的局域網ip 27 showmount -e <your ip>
如果成功可以看到可被掛載的目錄:
1 # showmount -e 172.17.14.73 2 Export list for 172.17.14.73: 3 /nfs/data/esmaster * 4 /nfs/data/mqdata *
接下來,要在每一個節點上安裝nfs服務以便使k8s可以掛載nfs目錄
1 #所有node節點安裝客戶端 2 yum -y install nfs-utils 3 systemctl start nfs && systemctl enable nfs
這樣就為k8s的持久卷做好了准備。
建立持久卷
有了nfs的准備,我就可以建立持久卷了:
我們分享了javashop內部使用的yaml倉庫供大家參考:
https://gitee.com/enation/rabbitmq-on-kubernetes
在您的k8s maseter節點服務器上 clone我們准備好的yaml文件
https://gitee.com/javashop/rabbitmq-on-kubernetes
修改根目錄中的pv.yaml
修改其中的server配置為nfs服務器的IP:
1 nfs: 2 server: 192.168.1.100 #這里請寫nfs服務器的ip
通過下面的命令建立持久卷:
1 kubectl create -f pv.yaml
通過以下命令查看持久卷是否建立成功:
1 kubectl get pv
部署rabbitmq
在k8s master節點上執行下面的命令創建namespace:
1 kubectl create namespace ns-rabbitmq
執行下面的命令創建rabbitmq集群(執行整個目錄的所有配置文件)
1 kubectl create -f rabbitmq/
通過以上部署我們建立了一個ns-rabbitmq的namespace,並在其中創建了相應的pvc、角色賬號,有狀態副本集以及服務。
鏡像
使用的是javashop自己基於rabbitmq:3.8做的,加入了延遲消息插件,其他沒有變化。
服務
我們默認開啟了對外nodeport端口,對應關系:
31672->15672
30672->5672
k8s內部可以通過下面的服務名稱訪問:
rabbitmq.ns-rabbitmq:15672
rabbitmq.ns-rabbitmq:5672
等待容器都啟動成功后驗證。
驗證
使用附帶程序校驗
-
發送消息(注釋掉接收消息)
-
觀察mq的隊列中有消息堆積
-
刪除mq的副本集
-
恢復mq副本集
-
接收消息
關鍵技術點
1、集群發現:
使用rabbitmq提供的k8s對等發現插件:rabbitmq_peer_discovery_k8s
2、映射持久卷
映射到:/var/lib/rabbitmq/mnesia
3、自定義數據目錄
1 - name: RABBITMQ_MNESIA_BASE 2 value: /var/lib/rabbitmq/mnesia/$(MY_POD_NAME)
其中MY_POD_NAME是讀取的容器名稱,通過有狀態副本集保證唯一性的綁定:
1 - name: MY_POD_NAMESPACE 2 valueFrom: 3 fieldRef: 4 fieldPath: metadata.namespace
附帶驗證程序
1 private static CachingConnectionFactory connectionFactory; 2 private static void initConnectionFactory() { 3 connectionFactory = new CachingConnectionFactory(); 4 connectionFactory.setHost("localhost"); 5 connectionFactory.setPort(5672); 6 connectionFactory.setUsername("guest"); 7 connectionFactory.setPassword("guest"); 8 } 9 public static void main(String[] args) { 10 initConnectionFactory(); 11 //發送消息 12 send(); 13 //接收消息 14 receive(); 15 } 16 private static void receive() { 17 AmqpTemplate template = new RabbitTemplate(connectionFactory); 18 String foo = (String) template.receiveAndConvert("myqueue"); 19 System.out.println("get message : "+ foo); 20 } 21 private static void send() { 22 AmqpAdmin admin = new RabbitAdmin(connectionFactory); 23 admin.declareQueue(new Queue("myqueue",true)); 24 AmqpTemplate template = new RabbitTemplate(connectionFactory); 25 template.convertAndSend("myqueue", "foo"); 26 }
歡迎關注Javashop技術分享公眾號,觀看更多的視頻講解:

