12.RabbitMQ多機集群


配置兩台Linux CentOS 6.7虛擬主機

CentOS6.7下載地址
https://pan.baidu.com/s/1i5GPg9n
 
安裝視頻下載

https://pan.baidu.com/s/1qYSgohQ
 

rabbitmq2
12.RabbitMQ多機群集
rabbitmq1
12.RabbitMQ多機群集


1、分別在兩台主機上修改/etc/hosts
192.168.169.100 rabbitmq1
192.168.169.110 rabbitmq2
 
2、從客戶端上傳RPM包
12.RabbitMQ多機群集
RPM包下載地址
https://pan.baidu.com/s/1dE1iaGx
 

3、下載阿里雲Yum源

#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo

依次安裝Erlang,Rabbitmq
#yum -y install openssl
#yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
#yum -y install erlang-17.4-1.el6.x86_64.rpm
#yum -y install rabbitmq-server-3.6.3-1.noarch.rpm

4、啟動rabbitmq1,rabbitmq2上的RabbitMQ
rabbitmq1
#service rabbitmq-server start
rabbitmq2
#service rabbitmq-server start

5、從rabbitmq1主機上拷貝文件到rabbitmq2
scp /var/lib/rabbitmq/.erlang.cookie  rabbitmq2:/var/lib/rabbitmq
6、在rabbitmq1,rabbitmq2上分別關閉防火牆
[root@rabbitmq1 ~]# service iptables stop
[root@rabbitmq2 ~]# service iptables stop

7、在rabbitmq1,rabbitmq2上分別啟動RibbitMQ
[root@rabbitmq1 ~]# service rabbitmq-server start
[root@rabbitmq2 ~]# service rabbitmq-server start
 
8、在rabbitmq2上執行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
9、查看各節點上的狀態
rabbitmqctl cluster_status
 
10、在rabbitmq1,rabbitmq2節點上分別添加用戶和設置控制台插件
 
[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
 
[root@rabbitmq1 ~]# rabbitmqctl add_user admin admin
 
[root@rabbitmq1 ~]# rabbitmqctl set_permissions admin ".*" ".*" ".*"
 
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
 
[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
 
 

11、在rabbitmq1節點上安裝haproxy

yum -y install haproxy
12、配置haproxy
cp /etc/haproxy/haproxy.cfg  /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg
添加配置信息
12.RabbitMQ多機群集

listen rabbitmq_local_cluster 192.168.169.100:5670 
mode tcp 
balance roundrobin 
server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3
server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3 

listen private_monitoring :8100 
mode http 
option httplog 
stats enable 
stats uri /stats 
stats refresh 60s

13、啟動haproxy

service haproxy start

14、查看haproxy控制台
http://192.168.169.142:8100/stats
12.RabbitMQ多機群集
15、建立RabbitMQ策略
12.RabbitMQ多機群集

16、建立持久隊列
 
12.RabbitMQ多機群集
 
測試代碼
Producer.java
package com.test.cluster;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
 
public class Producer {
 
    public static void main(String[] args) throws Exception {
   
    //使用默認端口連接MQ
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.100"); //使用默認端口5672
        factory.setPort(5670);
        Connection conn = factory.newConnection(); //聲明一個連接
        Channel channel = conn.createChannel(); //聲明消息通道
   
        String exchangeName = "TestEXG";//交換機名稱
        String routingKey = "RouteKey1";//RoutingKey關鍵字
        channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
        String queueName = "ClusterQueue";//隊列名稱
        Map arg = new HashMap();
        arg.put("x-ha-policy", "all");
        channel.queueDeclare(queueName, false, false, false, arg);
 
        channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象
        
        byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息
        //關閉通道和連接
channel.close();
conn.close();
    }
 
}
 
 
Customer.java
 
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
//通過channel.basicAck向服務器發送回執,刪除服務上的消息
public class Consumer {
 
    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.100"); //使用默認端口5672
        factory.setPort(5670);
        Connection conn = factory.newConnection(); //聲明一個連接
        Channel channel = conn.createChannel(); //聲明消息通道
        String exchangeName = "TestEXG";//交換機名稱
        String queueName = "ClusterQueue";//隊列名稱
        channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
        channel.queueBind(queueName, exchangeName, "RouteKey1");
 
        channel.basicQos(1); //server push消息時的隊列長度
 
        //用來緩存服務器推送過來的消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
 
        channel.basicConsume(queueName, false, consumer);
 
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("Received " + new String(delivery.getBody()));
 
            //回復ack包,如果不回復,消息不會在服務器刪除
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
 
關閉掉其中一個RabbitMQ,測試群集效果
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM