概述
1.大多數應用中,可以通過消息服務中間件來提升系統異步能力和拓展解耦能力。
2.消息服務中的兩個重要概念:消息代理(Message broker)和目的地(destination)
當消息發送者發送消息后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。
3.消息隊列主要有兩種形式的目的地:
-
- 隊列:點對點方式通信(point-to-point)
- 主題:發布/訂閱消息服務
點對點式:消息發送者發送消息后,消息代理將其放入一個隊列中,消息接受者從隊列中讀取數據,接受者接收數據后,將消息移除隊列。
發布訂閱:消息發布者將消息發布到主題中,多個接受者可以訂閱主題,當消息到達時,所有的訂閱者都會接收到消息。
4.JMS(Java Message Service) Java消息服務:基於JVM消息代理的規范。
5.AMQP(Advanced Message Queuing Protocol):它是一個面向消息中間件的開放式標准應用層協議。兼容JMS,RabbitMQ是AMQP的一個實現。
JMS |
AMQP | |
---|---|---|
定義 | Java API | 網絡線級協議 |
跨平台 | 否 | 是 |
跨語言 | 否 | 是 |
Model | (1)、Peer-2-Peer (2)、Pub/Sub |
(1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 后四種都是pub/sub ,差別路由機制做了更詳細的划分 |
支持消息類型 | TextMessage MapMessage ByteMessage StreamMessage ObjectMessage Message |
byte[]通常需要序列化 |
RabbitMQ
Message:消息頭和消息體組成,消息體是不透明的,而消息頭上則是由一系列的可選屬性組成,屬性:路由鍵【routing-key】,優先級【priority】,指出消息可能需要持久性存儲【delivery-mode】
Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序
Exchange:交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列
- Exchange的4中類型:direct【默認】點對點,fanout,topic和headers【並不多用了】, 發布訂閱,不同類型的Exchange轉發消息的策略有所區別
Queue:消息隊列,用來保存消息直到發送給消費者,它是消息的容器,也是消息的終點,一個消息可投入一個或多個隊列,消息一直在隊列里面,等待消費者連接到這個隊列將數據取走。
Binding:綁定,隊列和交換機之間的關聯,多對多關系
Connection:網絡連接,例如TCP連接
Channel:信道,多路復用連接中的一條獨立的雙向數據流通道,信道是建立在真是的TCP鏈接之內的虛擬連接AMQP命令都是通過信道發送出去的。不管是發布消息,訂閱隊列還是接受消息,都是信道,減少TCP的開銷,復用一條TCP連接。
Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端的 應用程序
VirtualHost:小型的rabbitMQ,相互隔離
Broker:表示消息隊列 服務實體
Exchange的三種方式
- direct:根據路由鍵直接匹配,一對一
- fanout:不經過路由鍵,直接發送到每一個隊列
- topic:類似模糊匹配的根據路由鍵,來分配綁定的隊列。#匹配一個或者多個單詞,*匹配一個單詞
RabbitMQ安裝與使用
在RabbitMQ官網的下載頁面https://www.rabbitmq.com/download.html
中,我們可以獲取到針對各種不同操作系統的安裝包和說明文檔。這里,我們將對幾個常用的平台一一說明。
下面我們采用的Erlang和RabbitMQ Server版本說明:
- Erlang/OTP 19.1
- RabbitMQ Server 3.6.5
Windows安裝
- 安裝Erland,通過官方下載頁面
http://www.erlang.org/downloads
獲取exe安裝包,直接打開並完成安裝。 - 安裝RabbitMQ,通過官方下載頁面
https://www.rabbitmq.com/download.html
獲取exe安裝包。 - 下載完成后,直接運行安裝程序。
- RabbitMQ Server安裝完成之后,會自動的注冊為服務,並以默認配置啟動起來。
Docker安裝
1、打開虛擬機,在docker中安裝RabbitMQ
#1.安裝rabbitmq,使用鏡像加速 docker pull registry.docker-cn.com/library/rabbitmq:3-management [root@node1 ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE registry.docker-cn.com/library/rabbitmq 3-management c51d1c73d028 11 days ago 149 MB #2.運行rabbitmq ##### 端口:5672 客戶端和rabbitmq通信 15672:管理界面的web頁面 docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028 #3.查看運行 docker ps
2、打開網頁客戶端並登陸 網址:http://localhost:15672/,賬號【guest】,密碼【guest】,登陸。
3、添加 【direct】【faout】【topic】的綁定關系。
- 1.設置exchange。
- name:名稱
- type:direct(直連型。點對點),topic(模糊發布。例如:*.news/#.news),fanout(廣播模式。速度最快。發布訂閱)
- Durability:是否是持久化。
4、添加消息隊列
5、綁定隊列。點擊任意一個exchange,進行綁定。隊列與剛才新建的相同,routing key(路由件)也可以去隊列名相同。
6、發布信息測試,每一種都進行嘗試 。以點對點為例 (direct)
7、點擊隊列,可以看到點對點發送,路由件完全匹配的情況下已經有一條了。可以嘗試其他情況。點進去后可以查看具體信息。
SpringBoot整合RabbitMQ
(1)Java代碼的方式使用RabbitMQ
1.在pom.xml文件中添加依賴
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.在application.properties
中配置關於RabbitMQ的連接和用戶信息,用戶可以回到上面的安裝內容,在管理頁面中創建用戶。
spring: rabbitmq: host: 192.168.1.125 port: 5672 username: guest password: guest
3.創建消息生產者。通過注入RabbitTemplate
接口的實例來實現消息的發送,RabbitTemplate
接口定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來注入其具體實現。
@Autowired RabbitTemplate rabbitTemplate; @Test public void contextLoads() { //Message需要自己構建一個;定義消息體內容和消息頭 // rabbitTemplate.send(exchange, routingKey, message); //Object 默認當成消息體,只需要傳入要發送的對象,自動化序列發送給rabbitmq; Map<String,Object> map = new HashMap<>(); map.put("msg", "這是第一個信息"); map.put("data", Arrays.asList("helloWorld",123,true)); //對象被默認序列以后發送出去 exchange 和 routingKey都是在瀏覽器端定義好的。 rabbitTemplate.convertAndSend("exchange.direct","jl",map); }
4.因為發送過來的數據是被編碼的,所以需要對JSON進行序列化。
@Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
5.接收消息(取出隊列中的消息).接收后就會從消息隊列中移除。
@Test public void reciverAndConvert(){ Object o = rabbitTemplate.receiveAndConvert("test.news"); System.out.println(o.getClass()); System.out.println(o); }
6.廣播的形式發送。
@Test public void sendMesg(){ //廣播,就不需要寫rootingKey,因為所有的隊列都會收到 rabbitTemplate.convertAndSend("exchange.fanout","",new Book(1L,"<<Java編程思想>>")); }
(2)注解方式使用RabbitMQ
1.主程序開啟RabbitMQ的注解
@EnableRabbit //開啟基於注解的rabbitmq @SpringBootApplication public class AmqpApplication { public static void main(String[] args) { SpringApplication.run(AmqpApplication.class, args); } }
2.使用注解的方式接收
@Service public class BookService { @RabbitListener(queues = "jl") public void receive(Book book){ System.out.println("接收到消息:"+book); } @RabbitListener(queues = "jl.news") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
(3)創建 Exchange(交換器)、Queue(消息隊列)、Bind(綁定規則)--- AmqpAdmin。
1.創建一個Exange。參數中可以有DirectExchage,TopicExchage,fanoutExchage等。
@Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.direct")); System.out.println("Create Finish"); }
2.創建Queue
@Test public void createQueue(){ //參數1:名字 參數2:是否持久化 amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); System.out.println("Create Queue Finish"); }
3.創建Bind規則
@Test public void createBind(){ //參數1:目的地 參數2:類型(隊列(queue)或者交換器(exchange)) 參數3:exchange的名稱 參數4:路由件 參數5:參數 amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE , "amqpadmin.direct", "amqp.haha", null)); }
4.刪除Queue或者Exchange。只需要傳入名稱即可。
@Test public void deleteExchange(){ amqpAdmin.deleteExchange("amqpadmin.direct"); System.out.println("delete Finish"); }