配置两台Linux CentOS 6.7虚拟主机
CentOS6.7下载地址
https://pan.baidu.com/s/1i5GPg9n
安装视频下载
https://pan.baidu.com/s/1qYSgohQ
rabbitmq2
rabbitmq1
1、分别在两台主机上修改/etc/hosts
192.168.169.100 rabbitmq1
192.168.169.110 rabbitmq2
2、从客户端上传RPM包
https://pan.baidu.com/s/1dE1iaGx
3、下载阿里云Yum源
#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
依次安装Erlang,Rabbitmq
#yum -y install openssl
#yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
#yum -y install erlang-17.4-1.el6.x86_64.rpm
#yum -y install rabbitmq-server-3.6.3-1.noarch.rpm
4、启动rabbitmq1,rabbitmq2上的RabbitMQ
rabbitmq1
#service rabbitmq-server start
rabbitmq2
#service rabbitmq-server start
5、从rabbitmq1主机上拷贝文件到rabbitmq2
scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq
6、在rabbitmq1,rabbitmq2上分别关闭防火墙
[root@rabbitmq1 ~]# service iptables stop
[root@rabbitmq2 ~]# service iptables stop
7、在rabbitmq1,rabbitmq2上分别启动RibbitMQ
[root@rabbitmq1 ~]# service rabbitmq-server start
[root@rabbitmq2 ~]# service rabbitmq-server start
8、在rabbitmq2上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
9、查看各节点上的状态
rabbitmqctl cluster_status
10、在rabbitmq1,rabbitmq2节点上分别添加用户和设置控制台插件
[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
[root@rabbitmq1 ~]# rabbitmqctl add_user admin admin
[root@rabbitmq1 ~]# rabbitmqctl set_permissions admin ".*" ".*" ".*"
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
11、在rabbitmq1节点上安装haproxy
yum -y install haproxy
12、配置haproxy
cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg
添加配置信息
listen rabbitmq_local_cluster 192.168.169.100:5670
mode tcp
balance roundrobin
server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3
server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3
listen private_monitoring :8100
mode http
option httplog
stats enable
stats uri /stats
stats refresh 60s
13、启动haproxy
service haproxy start
14、查看haproxy控制台
http://192.168.169.142:8100/stats
15、建立RabbitMQ策略
16、建立持久队列
测试代码
Producer.java
package com.test.cluster;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//使用默认端口连接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.100"); //使用默认端口5672
factory.setPort(5670);
Connection conn = factory.newConnection(); //声明一个连接
Channel channel = conn.createChannel(); //声明消息通道
String exchangeName = "TestEXG";//交换机名称
String routingKey = "RouteKey1";//RoutingKey关键字
channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机
String queueName = "ClusterQueue";//队列名称
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
channel.queueBind(queueName, exchangeName, routingKey);//定义声明对象
byte[] messageBodyBytes = "Hello, world!".getBytes();//消息内容
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//发布消息
//关闭通道和连接
channel.close();
conn.close();
}
}
Customer.java
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//通过channel.basicAck向服务器发送回执,删除服务上的消息
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.100"); //使用默认端口5672
factory.setPort(5670);
Connection conn = factory.newConnection(); //声明一个连接
Channel channel = conn.createChannel(); //声明消息通道
String exchangeName = "TestEXG";//交换机名称
String queueName = "ClusterQueue";//队列名称
channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机
channel.queueBind(queueName, exchangeName, "RouteKey1");
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("Received " + new String(delivery.getBody()));
//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
关闭掉其中一个RabbitMQ,测试群集效果