1、安裝前准備
由於RabbitMQ
使用的是Erlang
語言開發的,因此在安裝RabbitMQ
之前需要安裝Erlang
環境,Erlang
與RabbitMQ
的下載地址分別為:
Erlang
:https://github.com/rabbitmq/erlang-rpm/releases
RabbitMQ
:https://github.com/rabbitmq/rabbitmq-server/releases
注意:
RabbitMQ
與Erlang
安裝時是有版本對於關系,可以查看:https://www.rabbitmq.com/which-erlang.html
從版本對應關系可以看出,RabbitMQ
的最新3.9.13
需要Erlang
的最低23.2
版本,另外下載的時候還需要注意一點,安裝的系統為Centos7
,因此在下載RabbitMQ
與Erlang
版本的時候需要下載xxx.el7
的版本,我這里下載版本如下:
rabbitmq-server-3.9.13-1.el7.noarch.rpm
erlang-23.3.4.11-1.el7.x86_64.rpm
將下載好的文件上傳到服務器。
2、安裝Erlang
使用如下命令安裝Erlang
rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
執行命令后,如果出現這樣的提示,則需要根據提示下載對應的依賴。
我這里安裝提示缺少了libcrypto.so.10(OPENSSL_1.0.2)(64bit)
依賴,可以到https://pkgs.org/下載
將下載好的版本上傳到服務器通過如下命令進行安裝
rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force
安裝結果:
再執行安裝Erlang
命令,出現如下結果則表示安裝成功:
可以使用命令erl -version
查看Erlang
版本。
3、安裝RabbitMQ
上一步我們已經把Erlang
安裝成功,現在安裝RabbitMQ
,如下:
rpm -ivh rabbitmq-server-3.9.13-1.el7.noarch.rpm
檢查是否安裝成功rabitmqctl version
4、運行RabbitMQ服務器
4.1、啟動服務器
- 添加開機啟動
RabbitMQ
服務
chkconfig rabbitmq-server on
- 啟動啟動
systemctl start rabbitmq-server
- 查看服務器狀態
systemctl status rabbitmq-server
如上結果表示RabbitMQ
服務已經成功啟動
- 停止服務器
systemctl stop rabbitmq-server
4.2、安裝WEB插件
RabbitMQ
默認提供了WEB
插件,方便通過頁面進行RabbitMQ
管理,需要執行如下命令啟用WEB
插件,啟用之前如果RabbitMQ
服務已經啟動,則先停止服務。
rabbitmq-plugins enable rabbitmq_management
重新啟動RabbitMQ
服務,通過地址:http://192.168.247.136:15672然后訪問RabbitMQ
RabbitMQ
默認提供了一個guest
賬戶,默認的賬戶沒有權限登錄不了,因此接下來我們需要創建用戶。
4、創建用戶
- 創建賬號
rabbitmqctl add_user admin 123456
- 設置用戶角色
rabbitmqctl set_user_tags admin administrator
- 設置用戶權限
命令格式:rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
上面命令表示用戶具有/vhost1
這個virtual host
中的所有資源配置、讀、寫權限
- 查看用戶和角色
rabbitmqctl list_users
可以看到用戶已經添加成功並設置了角色,現在就可以使用用戶登錄RabbitMQ
了。
- 刪除用戶
rabbitmqctl delete_user guest
5、RabbitMQ集群
在實際的工作場景,一台機器肯定是應付不了的,因此就需要多台機器搭建集群環境,這里我准備了3台虛擬機環境。搭建集群步驟如下:
- 修改3台機器的主機名稱
vim /etc/hostname
名稱根據自己的需要取名就行,這里我取名為node1
、node2
、node3
- 配置各個節點的
hosts
文件,讓各個節點都能互相識別
vim /etc/hosts
192.168.247.133 node1
192.168.247.134 node2
192.168.247.135 node3
- 確保各個節點的cookie文件使用的是同一個值
在node1
上執行遠程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
會提示是否繼續,然后輸入node2
的密碼
The authenticity of host 'node2 (192.168.247.134)' can't be established.
ECDSA key fingerprint is a2:a0:00:25:7a:3e:45:d1:42:5f:9f:90:09:22:92:17.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'node2,192.168.247.134' (ECDSA) to the list of known hosts.
root@node2's password:
.erlang.cookie 100% 20 0.0KB/s 00:00
將node3
的同步到node1
,與上面操作一樣
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
- 分別在三台機器上執行以下命令
rabbitmq-server -detached
- 將
node2
鏈接到node1
,在node2
機器下執行如下命令
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
- 將
node3
鏈接到node2
,在node2
機器下執行如下命令
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
當然節點node3
也可以鏈接到node1
。
- 查看集群狀態
rabbitmqctl cluster_status
------------------------
Cluster status of node rabbit@node2 ...
Basics
Cluster name: rabbit@node1
Disk Nodes
rabbit@node1
rabbit@node2
rabbit@node3
Running Nodes
rabbit@node1
rabbit@node2
rabbit@node3
Versions
rabbit@node1: RabbitMQ 3.8.5 on Erlang 21.3.8.1
rabbit@node2: RabbitMQ 3.8.5 on Erlang 21.3.8.1
rabbit@node3: RabbitMQ 3.8.5 on Erlang 21.3.8.1
這里是在node2
上查看集群狀態,可以看出有3個節點,由於搭建集群時,將RabbitMQ
重置過,需要重新添加用戶並設置權限。到這里我們的集群環境就已經搭建成功,可以登錄RabbitMQ
查看搭建結果:
可以看到有3台RabbitMQ
的服務了。
6、在Java中使用
6.1、引入RabbitMQ
包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.1</version>
</dependency>
6.1、定義RabbitMQ
鏈接工具類
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* ClassName RabbitMQUtil
*
* @Description RabbitMQ連接工具來
* @Author SIE_LiBiao
* @Date 2022/2/11 12:09
* @Version 1.0
*/
public class RabbitMQUtil {
private RabbitMQUtil(){
}
public static Channel getChannel() throws Exception{
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置主機
connectionFactory.setHost("192.168.247.136");
//設置用戶名
connectionFactory.setUsername("admin");
//設置密碼
connectionFactory.setPassword("123456");
//創建連接
Connection connection = connectionFactory.newConnection();
//創建信道
Channel channel = connection.createChannel();
return channel;
}
}
6.2、消息生產者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.tenghu.rc.utils.RabbitMQUtil;
import java.nio.charset.StandardCharsets;
/**
* ClassName Producers
*
* @Description 消息生產者
* @Author SIE_LiBiao
* @Date 2022/2/12 19:26
* @Version 1.0
*/
public class Producers {
/**
* 交換機
*/
public static final String HELLO_EXCHANGE = "hello_exchange";
/**
* 隊列
*/
public static final String HELLO_QUEUE = "hello_queue";
public static void main(String[] args) throws Exception {
//獲取信道
Channel channel = RabbitMQUtil.getChannel();
//消息發布確認
channel.confirmSelect();
//創建交換機
channel.exchangeDeclare(HELLO_EXCHANGE, BuiltinExchangeType.DIRECT);
//創建隊列
channel.queueDeclare(HELLO_QUEUE,false,false,false,null);
//綁定交換機與隊列
channel.queueBind(HELLO_QUEUE,HELLO_EXCHANGE,"");
//發送消息
channel.basicPublish(HELLO_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello word!".getBytes(StandardCharsets.UTF_8));
if(channel.waitForConfirms()){
System.out.println("消息發布成功!");
}
}
}
執行后,可以在管理頁面看到對應的交換機與隊列,隊列里面有一個未被消費的消息。
進入隊列頁面就可以看到剛才執行發送的一條消息。
6.2、消息消費者
import com.rabbitmq.client.Channel;
import com.tenghu.rc.utils.RabbitMQUtil;
/**
* ClassName Consumer
*
* @Description 消息消費者
* @Author SIE_LiBiao
* @Date 2022/2/12 19:37
* @Version 1.0
*/
public class Consumer {
/**
* 隊列
*/
public static final String HELLO_QUEUE = "hello_queue";
public static void main(String[] args) throws Exception {
//獲取信道
Channel channel = RabbitMQUtil.getChannel();
//接收消息
channel.basicConsume(HELLO_QUEUE, false, (consumerTag, message) -> {
System.out.println("獲取消息:" + new String(message.getBody()));
//手動確認消息被消費
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
System.out.println("消息被取消時回調!");
});
}
}
執行后控制台輸出結果:獲取消息:hello word!
,消息消費成功后,我們手動確認消息已經被消費了,再去管理頁面查看隊列里面的消息已經不存在了。