一,消息服務中間件的概述
1,大多應用中,可通過消息服務中間件來提升系統異步通信、擴展解耦能力。
2,消息服務中有兩個概念:消息代理和目的地
當消息發送者發送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。
3,消息隊列主要由兩種形式的目的地。
(1)隊列:點對點消息通信(一對一)
(2)主題:發布/訂閱消息通信(一對多)
1,消息隊列的應用場景:
一個網站,用戶需要注冊,注冊后還需要發送注冊郵件,還要發送注冊短信,假如每一步都需要花費5秒的話,那么注冊使用同步的方式,需要花費15秒,如果使用異步的方式注冊,讓注冊郵件和注冊短信同時進行,那么花費10秒。
那么問題來了,哪里用到了消息服務中間件呢?
用戶注冊信息之后,花費0.5秒的時間把信息寫入消息中間件,然后提示用戶注冊成功,用戶注冊就只花費了5.5秒的時間。
寫入消息中間件之后就不用管了,剩下的就交給消息隊列自己執行了。
2,消息服務中間件的作用詳述
應用解耦:
訂單系統---------庫存系統
訂單系統每次減少一個庫存,都要訪問庫存系統,告訴他已經賣了一單了,庫存要減少一個。
訂單系統----------消息隊列-----------庫存系統
訂單系統減少一個庫存時,寫入消息隊列,然后庫存系統訂閱該消息隊列,得到訂單系統寫入的信息,然后庫存系統自己減少一個庫存。
流量削峰:
雙十一那天會同時有超級多的用戶同時搶購某一個東西,比如淘寶,那么如果特別多的用戶同時訪問服務器,服務器肯定是受不了的,解決辦法就是把接到的請求訂單信息寫入消息隊列,然后讓服務器訂閱該消息隊列,自己取信息,如果請求數量超過了消息隊列的上限,那么消息隊列會拒收,所以減小了服務器的壓力。
3,消息隊列的模式
點對點式:
消息發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取后被移出隊列。
消息只有唯一的發送者和接受者。
發布訂閱式:
發送者(發布者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么就會在消息到達時同時收到消息。
JMS,JAVA消息服務,基於JVM消息代理的規范,ActiveMQ,HornetMQ是JMS實現。
AMQP,高級消息隊列協議,也是一個消息代理的規范,兼容JMS,RabbitMQ是AMQP的實現。
二,RabbitMQ
的安裝和使用
1,下載和安裝
到RabbitMQ
官網下載安裝包:https://www.rabbitmq.com/
在安裝RabbitMQ
之前,首先需要安裝ErLang
包,因為RabbitMQ
是基於ErLang
語言的。
下載的RabbitMQ
安裝包和RabbitMQ
安裝包的版本要一致,否則環境會搭建失敗,出現很多問題。
如果查看對應一致的版本?
在RabbitMQ
官網有和ErLang
版本對照表。
本次安裝選擇在Linux
虛擬機進行安裝,在安裝之前要准備三個安裝包。
ErLang的 rpm包
RabbitMQ是Erlang語言編寫,所以Erang環境必須要有,注:Erlang環境一定要與RabbitMQ版本匹配:https://www.rabbitmq.com/which-erlang.html
Erlang下載地址:https://www.rabbitmq.com/releases/erlang/(根據自身需求及匹配關系,下載對應rpm包)
RabbitMQ的 rpm包
RabbitMQ下載地址:https://www.rabbitmq.com/releases/rabbitmq-server/(根據自身需求及匹配關系,下載對應rpm包)
socat的 rpm包
rabbitmq安裝依賴於socat,所以需要下載socat。
socat下載地址:http://repo.iotti.biz/CentOS/6/x86_64/socat-1.7.3.2-1.el6.lux.x86_64.rpm
根據自身需求下載對應系統socat依賴:(http://repo.iotti.biz/CentOS/)
下載好之后的rpm包
打開Linux
虛擬機,進入/usr/local/software
文件夾,使用xftp
工具把rpm
安裝包上傳到該文件夾下。
安裝順序:1:erlang 2:socat(密鑰) 3:rabbitmq
首先安裝erlang
的rpm
安裝包
[fanjiangfeng@localhost software]$ rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
注意一點:要切換到root
用戶才有權限安裝,否則沒有權限。
其次安裝socat
的rpm
安裝包(該密鑰為RabbitMQ
服務所依賴,如果不安裝的話,rabbitMQ
會安裝失敗)
[root@localhost software]# rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
最后安裝RabbitMQ
的rpm
安裝包
[root@localhost software]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安裝到此結束。
2,修改配置文件
安裝完成后,rabbitmq
會默認安裝到/usr/lib/rabbitmq/lib
路徑下。
進入ebin
目錄,修改配置文件rabbit.app
。
[root@localhost ebin]# pwd
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
vim rabbit.app
找到下面這個地方
修改為
RabbitMQ
安裝就完成了。
3,命令行與管控台-基礎操作
rabbitmqctl stop_app 關閉應用
rabbitmqctl start_app 開啟應用
rabbitmqctl status 節點狀態
rabbitmqctl add_user username password 添加用戶
rabbitmqctl list_users 列出所有用戶
rabbitmqctl delete_user usernmae 刪除用戶
rabbitmqctl clear_permissions -p vhostpath username 清除用戶權限
rabbitmqctl add_vhost vhostpath 創建虛擬主機
rabbitmqctl list_vhosts 列出所有虛擬主機
rabbitmqctl list_permissions -p vhostpath 列出虛擬主機上所有權限
rabbitmqctl delete_vhost vhostpath 刪除虛擬主機
rabbitmqctl list_queues 查看所有隊列信息
rabbitmqctl -p vhostpath purge_queue blue 清除隊列里的消息
4,命令行與管控台-高級操作
rabbitmqctl reset 移除所有數據,要在rabbitmqctl stop_app 之后使用
rabbitmqctl join_cluster
[--ram] 組成集群命令 rabbitmqctl cluster_status 查看集群狀態
5,來些基本操作
首先啟動rabbitmq
服務
[root@localhost software]# rabbitmqctl start_app
異常:Error: unable to connect to node rabbit@localhost: nodedown
如果出現上面的異常的話,解決方式如下:
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
rabbitmqctl start_app
## 成功啟動rabbitmq
## 啟動neutrou-server,openstack恢復正常
systemctl start neutron-server.service
基本操作
[root@localhost software]# lsof -i:5672 查看rabbitmq是否啟動(此為已啟動的狀態)
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
beam 6172 rabbitmq 48u IPv6 73786 0t0 TCP *:amqp (LISTEN)
[root@localhost software]# rabbitmqctl list_queues 查看消息隊列(空,因為剛安裝的)
Listing queues ...
[root@localhost software]# rabbitmqctl list_vhosts 查看虛擬主機(空)
Listing vhosts ...
/
[root@localhost software]#
6,管控台
啥是管控台?
說白了,命令行是命令的方式進行管理,而管控台則是可視化視圖的方式進行管理,當然管控台更方便了。只要是rabbitmqctl
命令行有的,管控台全都有!
進入/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/sbin
的目錄
[root@localhost sbin]# ls
rabbitmqctl rabbitmq-defaults rabbitmq-env rabbitmq-plugins rabbitmq-server
運行命令查看已安裝的插件列表
[root@localhost sbin]# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@localhost
|/
[ ] amqp_client 3.6.5
[ ] cowboy 1.0.3
[ ] cowlib 1.0.1
[ ] mochiweb 2.13.1
[ ] rabbitmq_amqp1_0 3.6.5
[ ] rabbitmq_auth_backend_ldap 3.6.5
運行命令啟動RabbitMQ
的管控台
[root@localhost sbin]# rabbitmq-plugins enable rabbitmq_management
然后管控台就啟動了,在瀏覽器的url地址欄輸入虛擬機的 ip 和端口號為15672,就進入了管控台。
默認端口號就是15672。
賬號密碼默認為guest
,登錄
7,消息隊列的說明
下圖是一個隊列。
上圖隊列說明:
1,第一個msg
是要發送的消息;
2,exchange.direct
是交換機,direct
模式的交換機,它會規定一個Routing Key
,如果隊列的Routing Key
和交換機指定的Routing Key
一樣,那么會把消息發送給該隊列,對不上號的肯定就不會發送了。
3,exchange.fanout
也是交換機,fanout
模式的交換機,它不會規定Routing Key
,類似廣播模式,該交換機將發送消息到每一個隊列中去。
4,中間綠色的當然是四個隊列了。
5,exchange.topic
負責從隊列中取信息,它和隊列的關系和交換機和隊列的關系很像,都是相互綁定,但是不同的是,交換機是發送,而它是接收。
China.#和*.news是模糊匹配原則,綁定可以模糊匹配,可以對應多個隊列。
三,springboot集成rabbitmq
前提是要在管控台添加消息隊列完成之后才行。
1,管控台添加消息隊列
第一步,先添加一個隊列
第二步,添加一個交換機
第三步,添加一個消息接收者(topic
模式的交換機)
第四步,綁定direct
交換機和隊列
在管控台點擊該交換機,進入然后綁定隊列。
第五步,綁定topic
(接收者)交換機和隊列,同上。
測試上面的消息隊列是否創建成功?
點擊交換機,發送消息
然后回到隊列,查看消息
證明隊列已經創建成功!
然后就可以落實到項目中去了!
2,springboot發送消息到隊列
新建一個springboot
項目,導入坐標
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
在application.yml
配置文件中配置rabbitmq
的主機地址
spring:
rabbitmq:
host: 192.168.186.128
在啟動類上加上RabbitMQ
的注解@EnableRabbit
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
//第一個參數:交換機名字 第二個參數:Routing Key的值 第三個參數:傳遞的消息對象
rabbitTemplate.convertAndSend("test.direct","test","springboot發來的消息");
}
運行測試類,回到rabbitmq
的管控台,查看名為test
的隊列收到消息。
還可以發送一個對象,前提是該對象必須序列化,不然會拋異常。可是雖然能夠發送成功,但拿到的是字節流,因此發送對象可以采用json
串的方式。
對象中必須加入無參構造才能保證序列化成功,否則會序列化失敗。
先寫一個配置類,該配置類是規定使用json
格式發送。
@Configuration
public class TestConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
User
實體類
@Data
public class User implements Serializable {
private String name;
private String address;
public User(String name, String address) {
this.name = name;
this.address = address;
}
public User() {
}
}
發送消息測試
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
//第一個參數:交換機名字 第二個參數:Routing Key的值 第三個參數:傳遞的消息對象
rabbitTemplate.convertAndSend("test.direct","test",new User("樊江鋒","河南鄭州"));
}
查看管控台,已經拿到了該json
格式的對象
3,springboot從隊列中接收消息
@Test
void receive(){
//指定隊列名
Object test = rabbitTemplate.receiveAndConvert("test");
System.err.println(test);
}
4,springboot監聽接收到的消息
當監聽到隊列中收到消息時,然后可以做一些處理。下面的監聽在項目啟動之后會開啟,前提是啟動類要加@EnableRabbit
注解。
//監聽接收到的數據(請求體)
@RabbitListener(queues = "test")
public void receive(User user){
System.out.println("收到消息:"+user);
}
//監聽接收到的請求頭
@RabbitListener(queues = "test")
public void receive2(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
測試一下監聽,給test
隊列發送一條消息。
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
//第一個參數:交換機名字 第二個參數:Routing Key的值 第三個參數:傳遞的消息對象
rabbitTemplate.convertAndSend("test.direct","test",new User("樊江鋒","河南鄭州"));
}
然后看控制台,會打印
收到消息:User{name='樊江鋒', address='河南鄭州'}
而且控制台隊列顯示未讀消息是0,顯然已經刪除了。
5,amqp管理組件
amqp
是用來管理組件的,上面的組件(交換機,隊列,以及交換機和隊列之間的綁定)都是在管控台操作的,這里使用amqp
來管理。
@Autowired
AmqpAdmin amqpAdmin;
void create(){
amqpAdmin.declareExchange(new DirectExchange("test"));//創建一個名為test的direct模式的交換機
amqpAdmin.declareQueue(new Queue("test",true));//創建一個名為test的隊列
//第一個參數:綁定的隊列名
//第二個參數:綁定的類型(選擇隊列)
//第三個參數:交換機名
//第四個參數:Routing Key的值
//第五個參數:需要傳的參數,這里不需要
amqpAdmin.declareBinding(new Binding("test",Binding.DestinationType.QUEUE,"topic_test","routingkey",null));
amqpAdmin.deleteExchange("test");//刪除交換機(參數:交換機名)
amqpAdmin.deleteQueue("test");//刪除隊列(參數:隊列名)
}
到這里結束!