配置兩台Linux CentOS 6.7虛擬主機
CentOS6.7下載地址
https://pan.baidu.com/s/1i5GPg9n
安裝視頻下載
https://pan.baidu.com/s/1qYSgohQ
rabbitmq2
rabbitmq1
1、分別在兩台主機上修改/etc/hosts
192.168.169.100 rabbitmq1
192.168.169.110 rabbitmq2
2、從客戶端上傳RPM包
https://pan.baidu.com/s/1dE1iaGx
3、下載阿里雲Yum源
#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
依次安裝Erlang,Rabbitmq
#yum -y install openssl
#yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
#yum -y install erlang-17.4-1.el6.x86_64.rpm
#yum -y install rabbitmq-server-3.6.3-1.noarch.rpm
4、啟動rabbitmq1,rabbitmq2上的RabbitMQ
rabbitmq1
#service rabbitmq-server start
rabbitmq2
#service rabbitmq-server start
5、從rabbitmq1主機上拷貝文件到rabbitmq2
scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq
6、在rabbitmq1,rabbitmq2上分別關閉防火牆
[root@rabbitmq1 ~]# service iptables stop
[root@rabbitmq2 ~]# service iptables stop
7、在rabbitmq1,rabbitmq2上分別啟動RibbitMQ
[root@rabbitmq1 ~]# service rabbitmq-server start
[root@rabbitmq2 ~]# service rabbitmq-server start
8、在rabbitmq2上執行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
9、查看各節點上的狀態
rabbitmqctl cluster_status
10、在rabbitmq1,rabbitmq2節點上分別添加用戶和設置控制台插件
[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
[root@rabbitmq1 ~]# rabbitmqctl add_user admin admin
[root@rabbitmq1 ~]# rabbitmqctl set_permissions admin ".*" ".*" ".*"
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
11、在rabbitmq1節點上安裝haproxy
yum -y install haproxy
12、配置haproxy
cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg
添加配置信息
listen rabbitmq_local_cluster 192.168.169.100:5670
mode tcp
balance roundrobin
server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3
server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3
listen private_monitoring :8100
mode http
option httplog
stats enable
stats uri /stats
stats refresh 60s
13、啟動haproxy
service haproxy start
14、查看haproxy控制台
http://192.168.169.142:8100/stats
15、建立RabbitMQ策略
16、建立持久隊列
測試代碼
Producer.java
package com.test.cluster;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//使用默認端口連接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.100"); //使用默認端口5672
factory.setPort(5670);
Connection conn = factory.newConnection(); //聲明一個連接
Channel channel = conn.createChannel(); //聲明消息通道
String exchangeName = "TestEXG";//交換機名稱
String routingKey = "RouteKey1";//RoutingKey關鍵字
channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
String queueName = "ClusterQueue";//隊列名稱
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象
byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息
//關閉通道和連接
channel.close();
conn.close();
}
}
Customer.java
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//通過channel.basicAck向服務器發送回執,刪除服務上的消息
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.100"); //使用默認端口5672
factory.setPort(5670);
Connection conn = factory.newConnection(); //聲明一個連接
Channel channel = conn.createChannel(); //聲明消息通道
String exchangeName = "TestEXG";//交換機名稱
String queueName = "ClusterQueue";//隊列名稱
channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
channel.queueBind(queueName, exchangeName, "RouteKey1");
channel.basicQos(1); //server push消息時的隊列長度
//用來緩存服務器推送過來的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("Received " + new String(delivery.getBody()));
//回復ack包,如果不回復,消息不會在服務器刪除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
關閉掉其中一個RabbitMQ,測試群集效果