RabbitMQ的安裝與使用(Centos7,linux版本)


1、主流的消息中間件簡單介紹哦。

  1)、ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線,並且它一個完全支持jms(java message service)規范的消息中間件。其豐富的api,多種集群構建模式使得他成為業界老牌消息中間件,在中小企業中應用廣泛。
如果不是高並發的系統,對於ActiveMQ,是一個不錯的選擇的,豐富的api,讓你開發的很愉快喲。
注意:MQ衡量指標:服務性能,數據存儲,集群架構。

  2)、kafka是LinkedIn開源的分布式發布/訂閱消息系統,目前歸屬於Apache頂級項目。kafka主要特點是基於Pull的模式來處理消息消費,最求高吞吐量,一開始的目的就是用於日志收集和傳輸,0.8版本開始支持復制,不支持事務,對消息的重復,丟失,錯誤沒有嚴格要求,適量產生大量數據的互聯網服務的數據收集業務。

  3)、RocketMQ是阿里開源的消息中間件,目前也已經孵化為了Apache頂級項目,它是純java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketMQ思路起源於kafka,它對消息的可靠傳輸以及事務性做了優化,目前在阿里集團被廣泛用於交易,充值,流計算、消息推送、日志流式處理,binglog分發等場景。

  4)、RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現的。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱模式)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。

2、RabbitMQ的簡單介紹。

  RabbitMQ是一個開源的消息代理和隊列服務器,用來通過普通協議在完全不同的應用之間共享數據(即RabbitMQ可以實現跨語言、跨平台操作),RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協議的。

3、RabbitMQ高性能的原因所在是什么呢?
  答:RabbitMQ所使用的開發語言是ErLang語言,ErLang其最初在於交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數據交互的性能是非常優秀的。Erlang的優點,Erlang有着和原生Socket一樣的延遲。性能十分優越。

4、AMQP高級消息隊列協議是什么?
  答:AMQP全稱是Advanced Message Queuing Protocol(高級消息隊列協議)。AMQP定義是具有現代特征的二進制協議。是一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。

5、AMQP協議模型。

詳細介紹如下所示:

1)、Server,又稱為Broker,接受客戶端的連接,實現AMQP實體服務。
2)、Connection,連接,應用程序與Broker的網絡連接。
3)、Channel,網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可建立多個Channel,每個Channel代表一個會話任務。
4)、Message,消息,服務器和應用程序之間傳送的數據,由Properties和Body組成。Propertie可以對消息進行修飾,比如消息的優先級,延遲等高級特性,Body則就是消息體內容。
5)、Virtual Host,虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個Virtual Host里面可以有若干個Exchange和Queue,同一個Virtual Host里面不能有相同名稱的Exchange或者Queue。
6)、Exchange,交換機,接受消息,根據路由鍵轉發消息到綁定的隊列。
7)、Binding,Exchange和Queue之間的虛擬連接,binding中可以包含routing key。
8)、Routing key,一個路由規則,虛擬機可以用它來確定如何路由一個特定消息。
9)、Queue,也稱為Message Queue,消息隊列,保存消息並將它們轉發給消費者。 

6、RabbitMQ的架構設計如下所示:

7、RabbitMQ的安裝。RabbitMQ的官方網址:https://www.rabbitmq.com/

可以選擇自己RabbitMQ的版本,以及對應的Erlang的版本。這里使用rabbitmq-server-3.6.5-1.noarch.rpm一鍵安裝方式進行安裝RabbitMQ的方式。一定要注意RabbitMQ的版本和Erlang的版本對應哦。點進去Erlang version可以自己對照版本。

搭建RabbitMQ所需包:

a)、erlang-18.3-1.el7.centos.x86_64.rpm這個是erlang語言基礎安裝包。

b)、rabbitmq-server-3.6.5-1.noarch.rpm這個是rabbitmq服務端安裝包。

c)、socat-1.7.3.2-1.1.el7.x86_64.rpm這個是socat密鑰。

可以下載安裝包,然后進行安裝即可:

1 wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
2 wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
3 wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

 首先安裝Erlang的語言基礎安裝包,安裝過程如下所示:

1 [root@slaver4 package]# ls
2 erlang-18.3-1.el7.centos.x86_64.rpm  haproxy-1.6.5.tar.gz  keepalived-1.2.18.tar.gz  rabbitmq_delayed_message_exchange-0.0.1.ez  rabbitmq-server-3.6.5-1.noarch.rpm  socat-1.7.3.2-1.1.el7.x86_64.rpm
3 [root@slaver4 package]# rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 
4 Preparing...                          ################################# [100%]
5 Updating / installing...
6    1:erlang-18.3-1.el7.centos         ################################# [100%]
7 [root@slaver4 package]#

開始安裝密鑰包,如下所示:

1 [root@slaver4 package]# rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm 
2 warning: socat-1.7.3.2-1.1.el7.x86_64.rpm: Header V4 RSA/SHA1 Signature, key ID 87e360b8: NOKEY
3 Preparing...                          ################################# [100%]
4 Updating / installing...
5    1:socat-1.7.3.2-1.1.el7            ################################# [100%]
6 [root@slaver4 package]#

開始安裝rabbitmq服務器端,如下所示:

1 [root@slaver4 package]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 
2 warning: rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA1 Signature, key ID 6026dfca: NOKEY
3 Preparing...                          ################################# [100%]
4 Updating / installing...
5    1:rabbitmq-server-3.6.5-1          ################################# [100%]
6 [root@slaver4 package]#

8、rpm安裝方式已經幫助你配置好了環境這些東西,比解壓縮安裝好點,因為解壓縮安裝還需要手動配置環境變量的。接下來,配置一下RabbitMQ。配置如下所示:

 1 [root@slaver4 package]# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/
 2 [root@slaver4 ebin]# ls
 3 background_gc.beam                   rabbit_epmd_monitor.beam               rabbit_plugins_main.beam
 4 delegate.beam                        rabbit_error_logger.beam               rabbit_plugins_usage.beam
 5 delegate_sup.beam                    rabbit_error_logger_file_h.beam        rabbit_policies.beam
 6 dtree.beam                           rabbit_exchange.beam                   rabbit_policy.beam
 7 file_handle_cache.beam               rabbit_exchange_parameters.beam        rabbit_prelaunch.beam
 8 file_handle_cache_stats.beam         rabbit_exchange_type_direct.beam       rabbit_prequeue.beam
 9 gatherer.beam                        rabbit_exchange_type_fanout.beam       rabbit_priority_queue.beam
10 gm.beam                              rabbit_exchange_type_headers.beam      rabbit_queue_consumers.beam
11 lqueue.beam                          rabbit_exchange_type_invalid.beam      rabbit_queue_index.beam
12 mirrored_supervisor_sups.beam        rabbit_exchange_type_topic.beam        rabbit_queue_location_client_local.beam
13 mnesia_sync.beam                     rabbit_file.beam                       rabbit_queue_location_min_masters.beam
14 mochinum.beam                        rabbit_framing.beam                    rabbit_queue_location_random.beam
15 pg2_fixed.beam                       rabbit_guid.beam                       rabbit_queue_location_validator.beam
16 pg_local.beam                        rabbit_hipe.beam                       rabbit_queue_master_location_misc.beam
17 rabbit_access_control.beam           rabbit_limiter.beam                    rabbit_recovery_terms.beam
18 rabbit_alarm.beam                    rabbit_log.beam                        rabbit_registry.beam
19 rabbit_amqqueue_process.beam         rabbit_memory_monitor.beam             rabbit_resource_monitor_misc.beam
20 rabbit_amqqueue_sup.beam             rabbit_mirror_queue_coordinator.beam   rabbit_restartable_sup.beam
21 rabbit_amqqueue_sup_sup.beam         rabbit_mirror_queue_master.beam        rabbit_router.beam
22 rabbit.app                           rabbit_mirror_queue_misc.beam          rabbit_runtime_parameters.beam
23 rabbit_auth_mechanism_amqplain.beam  rabbit_mirror_queue_mode_all.beam      rabbit_sasl_report_file_h.beam
24 rabbit_auth_mechanism_cr_demo.beam   rabbit_mirror_queue_mode.beam          rabbit_ssl.beam
25 rabbit_auth_mechanism_plain.beam     rabbit_mirror_queue_mode_exactly.beam  rabbit_sup.beam
26 rabbit_autoheal.beam                 rabbit_mirror_queue_mode_nodes.beam    rabbit_table.beam
27 rabbit.beam                          rabbit_mirror_queue_slave.beam         rabbit_trace.beam
28 rabbit_binding.beam                  rabbit_mirror_queue_sync.beam          rabbit_upgrade.beam
29 rabbit_boot_steps.beam               rabbit_mnesia.beam                     rabbit_upgrade_functions.beam
30 rabbit_channel_sup.beam              rabbit_mnesia_rename.beam              rabbit_variable_queue.beam
31 rabbit_channel_sup_sup.beam          rabbit_msg_file.beam                   rabbit_version.beam
32 rabbit_cli.beam                      rabbit_msg_store.beam                  rabbit_vhost.beam
33 rabbit_client_sup.beam               rabbit_msg_store_ets_index.beam        rabbit_vm.beam
34 rabbit_connection_helper_sup.beam    rabbit_msg_store_gc.beam               supervised_lifecycle.beam
35 rabbit_connection_sup.beam           rabbit_node_monitor.beam               tcp_listener.beam
36 rabbit_control_main.beam             rabbit_parameter_validation.beam       tcp_listener_sup.beam
37 rabbit_ctl_usage.beam                rabbit_password.beam                   truncate.beam
38 rabbit_dead_letter.beam              rabbit_password_hashing_md5.beam       vm_memory_monitor.beam
39 rabbit_diagnostics.beam              rabbit_password_hashing_sha256.beam    worker_pool.beam
40 rabbit_direct.beam                   rabbit_password_hashing_sha512.beam    worker_pool_sup.beam
41 rabbit_disk_monitor.beam             rabbit_plugins.beam                    worker_pool_worker.beam
42 [root@slaver4 ebin]# vim rabbit.app

修改內容如是:{loopback_users, <<"guest">>},修改為{loopback_users, [guest]}。這個是用戶的設置。必須修改的。

9、RabbitMQ安裝成功以后,就可以進行RabbitMQ的服務啟動和停止。

 1 [root@slaver4 ~]# rabbitmq-server start &
 2 [1] 14092
 3 [root@slaver4 ~]# 
 4               RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
 5   ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
 6   ##  ##
 7   ##########  Logs: /var/log/rabbitmq/rabbit@slaver4.log
 8   ######  ##        /var/log/rabbitmq/rabbit@slaver4-sasl.log
 9   ##########
10               Starting broker...
11  completed with 0 plugins.
12 
13 [root@slaver4 ~]#

啟動完成以后,如何驗證啟動是否正常呢,使用如下命令可以查看RabbitMQ啟動是否正常。可以看到RabbitMQ的進程號,以及協議名稱等等。

1 [root@slaver4 ~]# lsof -i:5672
2 COMMAND   PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
3 beam    14206 rabbitmq   48u  IPv6  70172      0t0  TCP *:amqp (LISTEN)
4 [root@slaver4 ~]# 

如何停止RabbitMQ呢,可以使用如下所示停止方式,如下所示:

1 [root@slaver4 ~]# rabbitmqctl stop
2 Stopping and halting node rabbit@slaver4 ...
3 Gracefully halting Erlang VM

可以使用[root@slaver4 ~]# rabbitmq-plugins list命令查看默認提供了什么樣的插件。

 1 [root@slaver4 ~]# rabbitmq-plugins list
 2  Configured: E = explicitly enabled; e = implicitly enabled
 3  | Status:   * = running on rabbit@slaver4
 4  |/
 5 [  ] amqp_client                       3.6.5
 6 [  ] cowboy                            1.0.3
 7 [  ] cowlib                            1.0.1
 8 [  ] mochiweb                          2.13.1
 9 [  ] rabbitmq_amqp1_0                  3.6.5
10 [  ] rabbitmq_auth_backend_ldap        3.6.5
11 [  ] rabbitmq_auth_mechanism_ssl       3.6.5
12 [  ] rabbitmq_consistent_hash_exchange 3.6.5
13 [  ] rabbitmq_event_exchange           3.6.5
14 [  ] rabbitmq_federation               3.6.5
15 [  ] rabbitmq_federation_management    3.6.5
16 [  ] rabbitmq_jms_topic_exchange       3.6.5
17 [  ] rabbitmq_management               3.6.5
18 [  ] rabbitmq_management_agent         3.6.5
19 [  ] rabbitmq_management_visualiser    3.6.5
20 [  ] rabbitmq_mqtt                     3.6.5
21 [  ] rabbitmq_recent_history_exchange  1.2.1
22 [  ] rabbitmq_sharding                 0.1.0
23 [  ] rabbitmq_shovel                   3.6.5
24 [  ] rabbitmq_shovel_management        3.6.5
25 [  ] rabbitmq_stomp                    3.6.5
26 [  ] rabbitmq_top                      3.6.5
27 [  ] rabbitmq_tracing                  3.6.5
28 [  ] rabbitmq_trust_store              3.6.5
29 [  ] rabbitmq_web_dispatch             3.6.5
30 [  ] rabbitmq_web_stomp                3.6.5
31 [  ] rabbitmq_web_stomp_examples       3.6.5
32 [  ] sockjs                            0.3.4
33 [  ] webmachine                        1.10.3

那么安裝RabbitMQ成功以后,如何安裝管理台或者管控台的插件呢,如下所示操作:

 1 [root@slaver4 ~]# rabbitmq-plugins enable rabbitmq_management
 2 The following plugins have been enabled:
 3   mochiweb
 4   webmachine
 5   rabbitmq_web_dispatch
 6   amqp_client
 7   rabbitmq_management_agent
 8   rabbitmq_management
 9 
10 Applying plugin configuration to rabbit@slaver4... started 6 plugins.
11 [root@slaver4 ~]# 

安裝好管控台插件以后就可以使用瀏覽器進行驗證(管控台的默認端口號是15672,5672是java端通信的端口號,25672是集群進行通信的端口號),訪問地址如是:http://192.168.110.133:15672/。賬號和密碼默認就是guest喲。

10、命令行和管控台的基本操作。

 1 常用命令如下所示:
 2 # 關閉應用
 3 [root@slaver4 ~]# rabbitmqctl stop_app
 4 # 啟動應用
 5 [root@slaver4 ~]# rabbitmqctl start_app
 6 # 節點狀態,查看集群節點狀態是什么樣子的
 7 [root@slaver4 ~]# rabbitmqctl status
 8 # 添加用戶
 9 [root@slaver4 ~]# rabbitmqctl add_user username password
10 # 列出所有用戶
11 [root@slaver4 ~]# rabbitmqctl list_users
12 # 刪除用戶
13 [root@slaver4 ~]# rabbitmqctl delete_user username
14 # 清除用戶權限
15 [root@slaver4 ~]# rabbitmqctl clear_permissions -p vhostpath username
16 # 列出用戶權限
17 [root@slaver4 ~]# rabbitmqctl list_user_permissions username
18 # 修改用戶密碼
19 [root@slaver4 ~]# rabbitmqctl change_password username newpassword
20 # 設置用戶權限
21 [root@slaver4 ~]# rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
22 
23 RabbitMQ支持對虛擬主機,交換機,隊列這些進行操作。常用命令如下所示:
24 # 創建虛擬主機
25 [root@slaver4 ~]# rabbitmqctl add_vhost vhostpath
26 # 列出所有虛擬主機
27 [root@slaver4 ~]# rabbitmqctl list_vhosts
28 # 列出虛擬主機上所有權限
29 [root@slaver4 ~]# rabbitmqctl list_permissions -p vhostpath
30 # 刪除虛擬主機
31 [root@slaver4 ~]# rabbitmqctl delete_vhosts vhostpath
32 # 列出所有隊列信息
33 [root@slaver4 ~]# rabbitmqctl list_queues
34 # 清除隊列里的信息
35 [root@slaver4 ~]# rabbitmqctl -p vhostpath purge_queue blue
36 
37 命令行和管控台的高級操作。
38 # 移除所有數據,要在rabbitmqctl stop_app之后使用
39 [root@slaver4 ~]# rabbitmqctl reset
40 # 組成集群命令,ram是加入節點的時候可以指定存儲模式。
41 [root@slaver4 ~]# rabbitmqctl join_cluster <clusternode> [--ram]
42 # 查看集群的狀態
43 [root@slaver4 ~]# rabbitmqctl cluster_status
44 # 修改集群節點的存儲形式
45 [root@slaver4 ~]# rabbitmqctl change_cluster_node_type disc | ram
46 # 忘記節點(摘除節點)
47 [root@slaver4 ~]# rabbitmqctl forget_cluster_node [--offline]
48 # 修改節點名稱
49 [root@slaver4 ~]# rabbitmqctl rename_cluster_node oladnode1 newnode1 [oldnode2] [newnode2...]

命令行可以操作的命令,在管控台也可以進行響應的操作,下面是管控台的菜單欄介紹:

11、RabbitMQ的消息生產和消費。生產者Producer發送一條消息,將消息投遞到Rabbitmq的集群中即Broker中。消費端進行監聽,監聽Rabbitmq隊列,獲取到數據進行消費。
  1)、ConnectionFactory,獲取連接工廠,需要配置相關信息ip地址、端口號port,虛擬主機vhost。
  2)、Connection,通過連接工廠獲取到一個連接。
  3)、Channel,通過連接創建一個Channel,網絡通信信道,可以發送和接收消息。Channel是Rabbitmq所有進行數據交互的關鍵。
  4)、Queue,創建一個隊列,具體的消息存儲隊列。真正的物理的隊列,存在於RabbitMQ的Broker上面。進行存儲消息的功能。
  5)、Producer生產者,生產者生產消息和Consumer消費者,消費者消費消息。

方式一,由於使用的maven構建的springboot2.x版本的項目,引入的依賴包如下所示:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
 6     <modelVersion>4.0.0</modelVersion>
 7     <parent>
 8         <groupId>org.springframework.boot</groupId>
 9         <artifactId>spring-boot-starter-parent</artifactId>
10         <version>2.1.1.RELEASE</version>
11         <relativePath /> <!-- lookup parent from repository -->
12     </parent>
13     <groupId>com.bie</groupId>
14     <artifactId>rabbitmq</artifactId>
15     <version>0.0.1-SNAPSHOT</version>
16     <name>rabbitmq</name>
17     <description>Demo project for Spring Boot</description>
18 
19     <properties>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter-web</artifactId>
31         </dependency>
32         <dependency>
33             <groupId>org.springframework.boot</groupId>
34             <artifactId>spring-boot-starter-test</artifactId>
35             <scope>test</scope>
36         </dependency>
37         <dependency>
38             <groupId>org.springframework.boot</groupId>
39             <artifactId>spring-boot-starter-amqp</artifactId>
40         </dependency>
41     </dependencies>
42 
43     <build>
44         <plugins>
45             <plugin>
46                 <groupId>org.springframework.boot</groupId>
47                 <artifactId>spring-boot-maven-plugin</artifactId>
48             </plugin>
49         </plugins>
50     </build>
51 
52 </project>

配置application.properties的配置文件,將rabbitmq所在的服務器地址,端口號,賬號,密碼,以及隊列的名稱。

 1 # 給當前項目起名稱.
 2 spring.application.name=rabbitmq
 3 
 4 # 配置rabbitmq的參數.
 5 # rabbitmq服務器的ip地址.
 6 spring.rabbitmq.host=192.168.110.133
 7 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號.
 8 spring.rabbitmq.port=5672
 9 # rabbitmq的賬號.
10 spring.rabbitmq.username=guest
11 # rabbitmq的密碼.
12 spring.rabbitmq.password=guest
13 
14 # 隊列的名稱
15 rabbitmq.queue=queue001

首先創建一個隊列,在項目啟動的時候,就進行加載,方便生產者生產的消息保存到隊列里面。

 1 package com.example.bie.config;
 2 
 3 import org.springframework.amqp.core.Queue;
 4 import org.springframework.beans.factory.annotation.Value;
 5 import org.springframework.context.annotation.Bean;
 6 import org.springframework.context.annotation.Configuration;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  * @Configuration項目啟動加載本類
13  * 
14  */
15 @Configuration
16 public class RabbitMqQueueConfig {
17 
18     @Value("${rabbitmq.queue}")
19     private String queueName;
20 
21     /**
22      * 創建一個隊列
23      * 
24      * @return
25      */
26     @Bean
27     public Queue createQueue() {
28         return new Queue(this.queueName);
29     }
30 
31 }

然后,創建好生產者和消費者以后,可以使用web項目的請求,創建一個控制類,來發送消息,觸發生產者生產消息,觸發消費者消費消息。

 1 package com.example.bie.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.example.bie.rabbitmq.producer.RabbitmqProducer;
 9 
10 /**
11  * 
12  * @author biehl
13  *
14  */
15 @Controller
16 public class RabbitmqController {
17 
18     @Autowired
19     private RabbitmqProducer rabbitmqProducer;
20 
21     @RequestMapping(value = "/sendMessage")
22     @ResponseBody
23     public void rabbitmqSendMessage() {
24         String msg = "消息產===>生者<===消息message: ";
25         for (int i = 0; i < 10000; i++) {
26             rabbitmqProducer.producer(msg + i);
27         }
28     }
29 
30 }

生產者生產消息的,實現類,如下所示:

 1 package com.example.bie.rabbitmq.producer;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  *
12  *         RabbitmqProducer消息發送者
13  *
14  * @Component加入到容器中.
15  * 
16  */
17 @Component
18 public class RabbitmqProducer {
19 
20     @Autowired
21     private AmqpTemplate rabbitmqAmqpTemplate;
22 
23     @Value("${rabbitmq.queue}")
24     private String queueName;
25 
26     /**
27      * 發送消息的方法
28      */
29     public void producer(String msg) {
30         // 向消息隊列中發送消息
31         // 參數1,隊列的名稱
32         // 參數2,發送的消息
33         this.rabbitmqAmqpTemplate.convertAndSend(this.queueName, msg);
34     }
35 
36 }

消費者消費消息的實現類,如下所示:

 1 package com.example.bie.rabbitmq.consumer;
 2 
 3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 4 import org.springframework.beans.factory.annotation.Value;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * 
 9  * @author biehl
10  *
11  *         RabbitmqConsumer消息消費者
12  * 
13  *         消費者是根據消息隊列的監聽器,進行消息的接收和消費。
14  * 
15  *         消息隊列發生變化,消息事件就會產生,觸發方法進行消息的接收。
16  * 
17  */
18 @Component
19 public class RabbitmqConsumer {
20 
21     @Value("${rabbitmq.queue}")
22     private String queueName;
23 
24     /**
25      * 消費者消費消息,接受消息的方法,采用消息隊列監聽機制.
26      * 
27      * @RabbitListener
28      * 
29      *                 意思是當隊列發生變化,消息事件產生了或者生產者發送消息了。
30      * 
31      *                 馬上就會觸發這個方法,進行消息的消費。
32      */
33     @RabbitListener(queues = "queue001")
34     public void consumer(String msg) {
35         // 打印消息
36         System.out.println("消費者===>消費<===消息message: " + msg);
37     }
38 
39 }

springboot2.x版本的主啟動類,如下所示:

 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqApplication.class, args);
11     }
12 
13 }

效果如下所示:

方式二,或者使用下面這種方式,直接進行生產者生產消息和消費者消費消息的測試,生產者生產消息的代碼如下所示:

 1 package com.example.bie;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.AMQP.BasicProperties;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         ConnectionFactory,獲取連接工廠。
16  * 
17  *         Connection,一個連接。
18  * 
19  *         Channel,數據通信信道,可以發送和接受消息。
20  * 
21  *         Queue,具體的消息存儲隊列。
22  * 
23  *         Producer和Consumer,生產和消費者。
24  */
25 public class RabbitMqProducer {
26 
27     public static void main(String[] args) {
28         try {
29             // 1、創建一個ConnectionFactory
30             ConnectionFactory connectionFactory = new ConnectionFactory();
31             // 配置服務器ip地址,端口號,虛擬主機
32             connectionFactory.setHost("192.168.110.133");
33             connectionFactory.setPort(5672);
34             connectionFactory.setVirtualHost("/");
35 
36             // 2、創建連接工廠創建連接
37             Connection connection = connectionFactory.newConnection();
38 
39             // 3、通過connection創建一個Channel
40             Channel channel = connection.createChannel();
41 
42             // 4、通過Channel發送數據。消息組成部分就是props(即消息的附加屬性)和body(消息實體)。
43             // 生產者發送消息,只需要指定exchange和routingKey。
44             String exchange = "";// 數據通信信道,交換機,接受消息,根據路由鍵轉發消息到綁定的隊列。
45             // 一個路由規則,虛擬機可以用它來確定如何路由一個特定消息。
46             String routingKey = "queue001";
47             BasicProperties props = null;// 消息的附加屬性
48             // 循環發送消息
49             System.out.println("開始生產消息......");
50             for (int i = 0; i < 100; i++) {
51                 // 消息實體
52                 // String msg = "Hello RabbitMQ!";
53                 byte[] body = (String.valueOf(i) + " hello RabbitMQ!!!").getBytes();
54                 // 如果exchange是空的話,會使用AMQP default這個Exchange。
55                 // 然后會根據routingKey的名稱去隊列里面找到名稱對應的,然后將消息路由過去。
56                 channel.basicPublish(exchange, routingKey, props, body);
57             }
58 
59             // 5、關閉連接,原則,由小到大進行關閉
60             channel.close();
61             connection.close();
62         } catch (IOException e) {
63             e.printStackTrace();
64         } catch (TimeoutException e) {
65             e.printStackTrace();
66         }
67     }
68 
69 }

消費者消費消息的代碼如下所示:

 1 package com.example.bie;
 2 
 3 import java.io.IOException;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 import com.rabbitmq.client.AMQP.Queue.DeclareOk;
 9 import com.rabbitmq.client.Channel;
10 import com.rabbitmq.client.Connection;
11 import com.rabbitmq.client.ConnectionFactory;
12 import com.rabbitmq.client.Consumer;
13 import com.rabbitmq.client.ConsumerCancelledException;
14 import com.rabbitmq.client.QueueingConsumer;
15 import com.rabbitmq.client.QueueingConsumer.Delivery;
16 import com.rabbitmq.client.ShutdownSignalException;
17 
18 /**
19  * 
20  * @author biehl
21  *
22  */
23 public class RabbitMqConsumer {
24 
25     public static void main(String[] args) {
26         try {
27             // 1、創建一個ConnectionFactory
28             ConnectionFactory connectionFactory = new ConnectionFactory();
29             // 配置服務器ip地址,端口號,虛擬主機
30             connectionFactory.setHost("192.168.110.133");
31             connectionFactory.setPort(5672);
32             connectionFactory.setVirtualHost("/");
33 
34             // 2、創建連接工廠創建連接
35             Connection connection = connectionFactory.newConnection();
36 
37             // 3、通過connection創建一個Channel
38             Channel channel = connection.createChannel();
39 
40             // 4、創建(聲明)一個隊列
41             String queue = "queue001";// 隊列
42             boolean durable = true;// 是否持久化,true是持久化,false是不持久化
43             // 獨占的方式,只有一個channel可以去監聽,其他channel不能進行監聽。保證了順序消費。
44             boolean exclusive = false;
45             boolean autoDelete = false;// 隊列沒有和Exchange綁定,就進行自動刪除
46             // 擴展參數
47             Map<String, Object> arguments = new HashMap<String, Object>();
48             DeclareOk declareOk = channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
49             System.out.println("consumerCount: " + declareOk.getConsumerCount());
50 
51             // 5、創建消費者,指定參數,消費者建立在那個channel連接之上
52             QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
53 
54             // 6、對channel進行設置。queue是設置要消費的隊列名稱。
55             boolean autoAck = true;// 是否自動簽收。
56             Consumer callback = queueingConsumer;//
57             channel.basicConsume(queue, autoAck, callback);
58 
59             // 7、獲取消息
60             // 消費者創建起來了,消費者監聽的隊列創建起來了。接下來就獲取消息。
61             // delivery是消息封裝的對象
62             System.out.println("等待消費......");
63             while (true) {
64                 // 獲取消息
65                 Delivery delivery = queueingConsumer.nextDelivery();
66                 String body = new String(delivery.getBody());
67                 System.out.println("消費端body: " + body);
68                 System.out.println("envelope" + delivery.getEnvelope().toString());
69             }
70 
71             // 8、關閉連接,原則,由小到大進行關閉
72             // channel.close();
73             // connection.close();
74         } catch (IOException e) {
75             e.printStackTrace();
76         } catch (TimeoutException e) {
77             e.printStackTrace();
78         } catch (ShutdownSignalException e) {
79             e.printStackTrace();
80         } catch (ConsumerCancelledException e) {
81             e.printStackTrace();
82         } catch (InterruptedException e) {
83             e.printStackTrace();
84         }
85     }
86 
87 }

實現的效果,除了控制台的輸出,你也可以在管控台里面查看對應的效果,如連接Connection的個數、Channel的個數、Exchange的個數、Queue的個數、Consumer的個數、以及主頁折線圖展示的最新消息個數、消費速率等等信息。觀察這些變化以達到監控的目的。

12、RabbitMQ的Exchange交換機。Exchange接受消息(即生產者生產的消息,將消息投遞到交換機Exchange上面),並且根據路由鍵轉發消息所綁定的隊列

RabbitMQ架構圖,概述,如下所示:

  1)、藍色的框,主要表示,生產者客戶端將消息投遞(Send Message)到交換機Exchange上面,通過路由關系,將生產者生產的消息路由到指定的隊列里面。
  2)、綠色的框,主要表示,消費者客戶端監聽隊列里面的消息(Receive Message),進行消費。消費者客戶端和隊列建立了監聽,然后接收消息。
  3)、紅色的虛線框,主要表示,RabbitMQ Server服務器。
  4)、黃色的框,主要表示,路由鍵Routing key,一個綁定的關系,即交換機Exchange和隊列Queue建立一個綁定Binding。交換機Exchange上面的消息到達那個隊列Queue的規則主要是由路由鍵Routing key來指定的。

13、交換機的屬性,如下所示:

  1)、Name:交換機的名稱,可以自己指定交換機的名稱。
  2)、Type:交換機的類型direct,topic,fanout,headers。
    a)、Direct Exchange(即直連交換機,路由鍵Routing key必須一致性),所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。路由規則:Direct Exchange直連交換機,Routing key的名稱必須完全匹配(即生產者生產消息攜帶的路由鍵和將交換機和隊列綁定的路由鍵必須一致),就會將交換機Exchange上面的消息發送到(路由到)隊列Queue上面。
      注意:Direct模式可以使用RabbitMQ自帶的Exchange,即default Exchange,所以不需要將Exchange進行任何綁定(binding)操作,消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄。

    b)、Topic Exchange(即話題交換機,路由鍵Routing key規則匹配或者成為模糊匹配),所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上。Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic。注意:可以使用通配符進行模糊匹配。符號,"#"匹配一個或者多個詞,符號"*"匹配不多不少一個詞(即*號僅僅可以匹配一個詞)。路由規則:生產者生產的消息攜帶的路由鍵Routing key,如果交換機與隊列Queue綁定的路由鍵,和生產者生產消息攜帶的路由鍵規則匹配上,就可以將交換機上面的消息發送到該隊列上。

    c)、Fanout Exchange(即廣播交換機,沒有路由鍵Routing key的概念),不處理路由鍵,只需要簡單的將隊列綁定到交換機上面。發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上面(即,一個或者多個隊列綁定交換機,那么交換機會將消息轉發到一個或者多個隊列上面)。Fanout交換機轉發消息是最快的(性能最好,因為廣播交換機,不做匹配,沒有路由規則)。 

    d)、Headers Exchange,根據消息頭進行路由,不是很常用。

  3)、Durability:是否需要持久化,true為持久化,false表示非持久化。
  4)、Auto Delete:當最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange。值為true表示自動刪除,值為false表示不進行自動刪除。
  5)、Internal:當前Exchange是否用於RabbitMQ內部使用,默認為false。基本不使用該屬性。
  6)、Arguments:擴展參數,用戶擴展AMQP協議自制定化使用。

14、RabbitMQ的綁定Binding。

  答:Binding綁定,是Exchange和Exchange、Queue之間的連接關系。即交換機和交換機可以綁定,交換機和隊列可以進行綁定。Binding中可以包含Routing key或者參數。

15、RabbitMQ的消息隊列Queue。

  答:消息隊列Queue,實際存儲消息數據,在實際的物理磁盤中有一塊空間創建隊列。包含的屬性有,Durability是否持久化,Durable是持久化,Transient是不進行持久化。Auto delete,如果選擇yes代表當最后一個監聽被移除之后,該Queue會自動被刪除。

16、RabbitMQ的消息Message。

  答:消息Message,服務器和應用程序之間傳送的數據。消息本質就是一段數據,由Properties和Payload(即Body)組成。常用屬性,delivery mode(消息到Broker上,可以做持久化,也可以做內存級別的非持久化),headers(自定義屬性)。content_type,content_encoding(字符集),priority(優先級)。
correlation_id(唯一id),reply_to(消息失敗了返回到那個隊列),expiration(消息的過期時間),message_id(消息的id)。timestamp,type,user_id,app_id,cluster_id。

17、RabbitMQ的虛擬主機Virtual host。

  答:虛擬主機Virtual host,用於進行邏輯隔離,最上層的消息路由。虛擬主機不是物理的概念。一個Virtual Host里面可以有若個干Exchange和Queue。同一個Virtual Host里面不能有相同名稱的Exchange或者Queue。

 

作者:別先生

博客園:https://www.cnblogs.com/biehongli/

如果您想及時得到個人撰寫文章以及著作的消息推送,可以掃描上方二維碼,關注個人公眾號哦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM