手把手教你SpringBoot集成消息服務中間件RabbitMQ


一,消息服務中間件的概述

1,大多應用中,可通過消息服務中間件來提升系統異步通信、擴展解耦能力。

2,消息服務中有兩個概念:消息代理和目的地

當消息發送者發送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。

3,消息隊列主要由兩種形式的目的地。

​ (1)隊列:點對點消息通信(一對一)

​ (2)主題:發布/訂閱消息通信(一對多)

1,消息隊列的應用場景:

一個網站,用戶需要注冊,注冊后還需要發送注冊郵件,還要發送注冊短信,假如每一步都需要花費5秒的話,那么注冊使用同步的方式,需要花費15秒,如果使用異步的方式注冊,讓注冊郵件和注冊短信同時進行,那么花費10秒。

那么問題來了,哪里用到了消息服務中間件呢?

用戶注冊信息之后,花費0.5秒的時間把信息寫入消息中間件,然后提示用戶注冊成功,用戶注冊就只花費了5.5秒的時間。

寫入消息中間件之后就不用管了,剩下的就交給消息隊列自己執行了。

2,消息服務中間件的作用詳述

應用解耦:

訂單系統---------庫存系統

訂單系統每次減少一個庫存,都要訪問庫存系統,告訴他已經賣了一單了,庫存要減少一個。

訂單系統----------消息隊列-----------庫存系統

訂單系統減少一個庫存時,寫入消息隊列,然后庫存系統訂閱該消息隊列,得到訂單系統寫入的信息,然后庫存系統自己減少一個庫存。

流量削峰:

雙十一那天會同時有超級多的用戶同時搶購某一個東西,比如淘寶,那么如果特別多的用戶同時訪問服務器,服務器肯定是受不了的,解決辦法就是把接到的請求訂單信息寫入消息隊列,然后讓服務器訂閱該消息隊列,自己取信息,如果請求數量超過了消息隊列的上限,那么消息隊列會拒收,所以減小了服務器的壓力。

3,消息隊列的模式

點對點式:

消息發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取后被移出隊列。

消息只有唯一的發送者和接受者。

發布訂閱式:

發送者(發布者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么就會在消息到達時同時收到消息。

JMS,JAVA消息服務,基於JVM消息代理的規范,ActiveMQ,HornetMQ是JMS實現。

AMQP,高級消息隊列協議,也是一個消息代理的規范,兼容JMS,RabbitMQ是AMQP的實現。

二,RabbitMQ的安裝和使用

1,下載和安裝

RabbitMQ官網下載安裝包:https://www.rabbitmq.com/

在安裝RabbitMQ之前,首先需要安裝ErLang包,因為RabbitMQ是基於ErLang語言的。

下載的RabbitMQ安裝包和RabbitMQ安裝包的版本要一致,否則環境會搭建失敗,出現很多問題。

如果查看對應一致的版本?

RabbitMQ官網有和ErLang版本對照表。

在這里插入圖片描述

本次安裝選擇在Linux虛擬機進行安裝,在安裝之前要准備三個安裝包。

ErLang的 rpm包

RabbitMQ是Erlang語言編寫,所以Erang環境必須要有,注:Erlang環境一定要與RabbitMQ版本匹配:https://www.rabbitmq.com/which-erlang.html

Erlang下載地址:https://www.rabbitmq.com/releases/erlang/(根據自身需求及匹配關系,下載對應rpm包)

RabbitMQ的 rpm包

RabbitMQ下載地址:https://www.rabbitmq.com/releases/rabbitmq-server/(根據自身需求及匹配關系,下載對應rpm包)

socat的 rpm包

 rabbitmq安裝依賴於socat,所以需要下載socat。

 socat下載地址:http://repo.iotti.biz/CentOS/6/x86_64/socat-1.7.3.2-1.el6.lux.x86_64.rpm

 根據自身需求下載對應系統socat依賴:(http://repo.iotti.biz/CentOS/)

下載好之后的rpm包

在這里插入圖片描述

打開Linux虛擬機,進入/usr/local/software文件夾,使用xftp工具把rpm安裝包上傳到該文件夾下。

在這里插入圖片描述

安裝順序:1:erlang 2:socat(密鑰) 3:rabbitmq

首先安裝erlangrpm安裝包

[fanjiangfeng@localhost software]$ rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 

注意一點:要切換到root用戶才有權限安裝,否則沒有權限。

在這里插入圖片描述

其次安裝socatrpm安裝包(該密鑰為RabbitMQ服務所依賴,如果不安裝的話,rabbitMQ會安裝失敗)

[root@localhost software]# rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm 

最后安裝RabbitMQrpm安裝包

[root@localhost software]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 

安裝到此結束。

2,修改配置文件

安裝完成后,rabbitmq會默認安裝到/usr/lib/rabbitmq/lib路徑下。

進入ebin目錄,修改配置文件rabbit.app

[root@localhost ebin]# pwd
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
vim rabbit.app

在這里插入圖片描述

找到下面這個地方

在這里插入圖片描述

修改為

在這里插入圖片描述

RabbitMQ安裝就完成了。

3,命令行與管控台-基礎操作

rabbitmqctl stop_app 關閉應用

rabbitmqctl start_app 開啟應用

rabbitmqctl status 節點狀態

rabbitmqctl add_user username password 添加用戶

rabbitmqctl list_users 列出所有用戶

rabbitmqctl delete_user usernmae 刪除用戶

rabbitmqctl clear_permissions -p vhostpath username 清除用戶權限

rabbitmqctl add_vhost vhostpath 創建虛擬主機

rabbitmqctl list_vhosts 列出所有虛擬主機

rabbitmqctl list_permissions -p vhostpath 列出虛擬主機上所有權限

rabbitmqctl delete_vhost vhostpath 刪除虛擬主機

rabbitmqctl list_queues 查看所有隊列信息

rabbitmqctl -p vhostpath purge_queue blue 清除隊列里的消息

4,命令行與管控台-高級操作

rabbitmqctl reset 移除所有數據,要在rabbitmqctl stop_app 之后使用

rabbitmqctl join_cluster [--ram] 組成集群命令

rabbitmqctl cluster_status 查看集群狀態

5,來些基本操作

首先啟動rabbitmq服務

[root@localhost software]# rabbitmqctl start_app

異常:Error: unable to connect to node rabbit@localhost: nodedown

如果出現上面的異常的話,解決方式如下:

systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
rabbitmqctl start_app
## 成功啟動rabbitmq
## 啟動neutrou-server,openstack恢復正常
systemctl start neutron-server.service

基本操作

[root@localhost software]# lsof -i:5672   查看rabbitmq是否啟動(此為已啟動的狀態)
COMMAND  PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
beam    6172 rabbitmq   48u  IPv6  73786      0t0  TCP *:amqp (LISTEN)
[root@localhost software]# rabbitmqctl list_queues  查看消息隊列(空,因為剛安裝的)
Listing queues ...
[root@localhost software]# rabbitmqctl list_vhosts  查看虛擬主機(空)
Listing vhosts ...
/
[root@localhost software]#

6,管控台

啥是管控台?

說白了,命令行是命令的方式進行管理,而管控台則是可視化視圖的方式進行管理,當然管控台更方便了。只要是rabbitmqctl命令行有的,管控台全都有!

進入/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/sbin的目錄

[root@localhost sbin]# ls
rabbitmqctl  rabbitmq-defaults  rabbitmq-env  rabbitmq-plugins  rabbitmq-server

運行命令查看已安裝的插件列表

[root@localhost sbin]# rabbitmq-plugins list
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@localhost
 |/
[  ] amqp_client                       3.6.5
[  ] cowboy                            1.0.3
[  ] cowlib                            1.0.1
[  ] mochiweb                          2.13.1
[  ] rabbitmq_amqp1_0                  3.6.5
[  ] rabbitmq_auth_backend_ldap        3.6.5

運行命令啟動RabbitMQ的管控台

[root@localhost sbin]# rabbitmq-plugins enable rabbitmq_management

然后管控台就啟動了,在瀏覽器的url地址欄輸入虛擬機的 ip 和端口號為15672,就進入了管控台。

默認端口號就是15672。

在這里插入圖片描述

賬號密碼默認為guest,登錄

在這里插入圖片描述

7,消息隊列的說明

下圖是一個隊列。

在這里插入圖片描述

上圖隊列說明:

1,第一個msg是要發送的消息;

2,exchange.direct是交換機,direct模式的交換機,它會規定一個Routing Key,如果隊列的Routing Key和交換機指定的Routing Key一樣,那么會把消息發送給該隊列,對不上號的肯定就不會發送了。

3,exchange.fanout也是交換機,fanout模式的交換機,它不會規定Routing Key,類似廣播模式,該交換機將發送消息到每一個隊列中去。

4,中間綠色的當然是四個隊列了。

5,exchange.topic負責從隊列中取信息,它和隊列的關系和交換機和隊列的關系很像,都是相互綁定,但是不同的是,交換機是發送,而它是接收。

China.#和*.news是模糊匹配原則,綁定可以模糊匹配,可以對應多個隊列。

三,springboot集成rabbitmq

前提是要在管控台添加消息隊列完成之后才行。

1,管控台添加消息隊列

第一步,先添加一個隊列

在這里插入圖片描述

第二步,添加一個交換機

在這里插入圖片描述

第三步,添加一個消息接收者(topic模式的交換機)

在這里插入圖片描述

第四步,綁定direct交換機和隊列

在管控台點擊該交換機,進入然后綁定隊列。

在這里插入圖片描述

第五步,綁定topic(接收者)交換機和隊列,同上。

測試上面的消息隊列是否創建成功?

點擊交換機,發送消息

在這里插入圖片描述

然后回到隊列,查看消息

在這里插入圖片描述

證明隊列已經創建成功!

然后就可以落實到項目中去了!

2,springboot發送消息到隊列

新建一個springboot項目,導入坐標

		<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

application.yml配置文件中配置rabbitmq的主機地址

spring:
  rabbitmq:
    host: 192.168.186.128

在啟動類上加上RabbitMQ的注解@EnableRabbit

 @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
            //第一個參數:交換機名字  第二個參數:Routing Key的值  第三個參數:傳遞的消息對象
       rabbitTemplate.convertAndSend("test.direct","test","springboot發來的消息");
    }

運行測試類,回到rabbitmq的管控台,查看名為test的隊列收到消息。

在這里插入圖片描述

還可以發送一個對象,前提是該對象必須序列化,不然會拋異常。可是雖然能夠發送成功,但拿到的是字節流,因此發送對象可以采用json串的方式。

對象中必須加入無參構造才能保證序列化成功,否則會序列化失敗。

在這里插入圖片描述

先寫一個配置類,該配置類是規定使用json格式發送。

@Configuration
public class TestConfig {
    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

User實體類

@Data
public class User implements Serializable {
    private String name;
    private String address;

    public User(String name, String address) {
        this.name = name;
        this.address = address;
    }
     public User() {
    }
}    

發送消息測試

@Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
     //第一個參數:交換機名字  第二個參數:Routing Key的值  第三個參數:傳遞的消息對象
     rabbitTemplate.convertAndSend("test.direct","test",new User("樊江鋒","河南鄭州"));
    }

查看管控台,已經拿到了該json格式的對象

3,springboot從隊列中接收消息

  @Test
    void receive(){
        //指定隊列名
        Object test = rabbitTemplate.receiveAndConvert("test");
        System.err.println(test);
    }

4,springboot監聽接收到的消息

當監聽到隊列中收到消息時,然后可以做一些處理。下面的監聽在項目啟動之后會開啟,前提是啟動類要加@EnableRabbit注解。

 //監聽接收到的數據(請求體)
    @RabbitListener(queues = "test")
    public void receive(User user){
        System.out.println("收到消息:"+user);
    }
    //監聽接收到的請求頭
    @RabbitListener(queues = "test")
    public void receive2(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }

測試一下監聽,給test隊列發送一條消息。

 @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
    //第一個參數:交換機名字  第二個參數:Routing Key的值  第三個參數:傳遞的消息對象
    rabbitTemplate.convertAndSend("test.direct","test",new User("樊江鋒","河南鄭州"));
    }

然后看控制台,會打印

收到消息:User{name='樊江鋒', address='河南鄭州'}

而且控制台隊列顯示未讀消息是0,顯然已經刪除了。

5,amqp管理組件

amqp是用來管理組件的,上面的組件(交換機,隊列,以及交換機和隊列之間的綁定)都是在管控台操作的,這里使用amqp來管理。

 @Autowired
    AmqpAdmin amqpAdmin;

    void create(){
        amqpAdmin.declareExchange(new DirectExchange("test"));//創建一個名為test的direct模式的交換機
        amqpAdmin.declareQueue(new Queue("test",true));//創建一個名為test的隊列

        //第一個參數:綁定的隊列名
        //第二個參數:綁定的類型(選擇隊列)
        //第三個參數:交換機名
        //第四個參數:Routing Key的值
        //第五個參數:需要傳的參數,這里不需要
        amqpAdmin.declareBinding(new Binding("test",Binding.DestinationType.QUEUE,"topic_test","routingkey",null));
        amqpAdmin.deleteExchange("test");//刪除交換機(參數:交換機名)
        amqpAdmin.deleteQueue("test");//刪除隊列(參數:隊列名)
    }

到這里結束!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM