前言
Rabbitmq是一個開源的消息代理軟件,是AMQP協議的實現。核心作用就是創建消息隊列,異步發送和接收消息。通常用來在高並發中處理削峰填谷、延遲處理、解耦系統之間的強耦合、處理秒殺訂單。 入門rabbitmq之前主要是想了解下秒殺排隊訂單入庫后,異步通知客戶端秒殺結果。
基礎知識
1、基本概念(角色)
了解rabbitmq之前先要了解3個基本概念:生產者、消費者、代理(隊列)。 rabbitmq在生產者和代理中間做了一層抽象。這樣消息生產者和隊列就沒有直接聯系,在中間加入了一層交換器(Exchange)。這樣消息生產者把消息交給交換器,交換器根據路由策略再把消息轉發給對應隊列。
2、消息發送原理
首先要發送消息必須先連接到rabbitmq-server。那怎么連接和發送消息呢?首先你的應用程序和rabbitmq會創建一個TCP鏈接。一旦TCP鏈接並通過認證。認證就是你試圖連接rabbitmq時服務器的用戶名和密碼。認證通過,你的應用程序和rabbitmq之間就創建了一條AMQP信道(Channel),后續所有的消息都是基於這個信道完成。
3、為什么不直接通過TCP直接發送消息
對於操作系統來說創建和銷毀TCP連接是非常昂貴的開銷,而在並發高峰期時再去處理TCP創建與銷毀顯然是不合適的。這就造成了TCP的巨大浪費,而且操作系統每秒創建TCP的能力也是有限的,因此直接通過TCP發送消息會很快遇到瓶頸。
交換器(Exchange)
前面提到rabbitmq在你的應用程序和隊列中間增加了一層代理,代理根據路由策略把消息路由到指定隊列。 交換器分為4類:
- direct(默認)
- headers
- fanout
- topic
1、direct。是交換器的默認實現,根據路由規則匹配上就會把消息投遞到對應隊列。
2、headers。 是一個自定義匹配規則類型,在隊列和交換器綁定時,會設置一組鍵值對,消息中也包含一組鍵值對,當這些鍵值對匹配上,則會投遞消息到對應隊列。
3、fanout。是一種發布訂閱模式,當你發送一條消息時,交換器會把消息廣播到所有附加到這個交換器的隊列上。 對於fanout來說routingkey是無效的。
4、topic。可以更靈活的匹配自己想訂閱的消息。也是routingkey用處最大的一種。類似我們配置request mapping中的通配符。
安裝rabbitmq
我是本機ubuntu16中安裝,沒有配置軟件源,安裝速度倒還能接收。rabbitmq是erlang開發,所以先安裝erlang,再安裝rabbitmq-server
sudo apt-get install erlang
sudo apt-get install rabbitmq-server
安裝完成后查看運行狀態 systemctl status rabbitmq-server
啟動 service rabbitmq-server start
停止 serivce rabbitmq-server stop
重啟 service rabbitmq-server restart
安裝好rabbitmq后,啟用web客戶端 rabbitmq-plugines enable rabbitmq_management。
啟動后默認使用guest/guest訪問,僅支持localhost訪問: http://localhost:15672。 web站點默認端口15672。 rabbitmq默認端口5672
在SpringBoot中集成rabbitmq
1、配置信息
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
2、默認交換器(direct)實現
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectConfig { @Bean public Queue directQueue(){ return new Queue("direct",false); //隊列名字,是否持久化 } @Bean public DirectExchange directExchange(){ return new DirectExchange("direct",false,false);//交換器名稱、是否持久化、是否自動刪除 } @Bean Binding binding(Queue queue, DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("direct"); } }
消息生產者(發送者)
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息發送--生產消息 */ @Component public class Sender { @Autowired AmqpTemplate rabbitmqTemplate; public void send(String message){ System.out.println("發送消息:"+message); rabbitmqTemplate.convertAndSend("direct",message); } }
消息消費者(接收者)
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "direct") public class Receiver { @RabbitHandler public void handler(String message){ System.out.println("接收消息:"+message); } }
OK,來測試下,默認情況下,只能本機訪問,我本地是在ubuntu虛擬機中,我在虛擬機中運行demo
import com.zhangfei.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/rabbitmq") public class MyRabbitmqController { @Autowired Sender sender; @RequestMapping("/sender") @ResponseBody public String sender(){ System.out.println("send string:hello world"); sender.send("hello world"); return "sending..."; } }
運行結果
參考資料
https://www.cnblogs.com/vipstone/p/9950434.html 【推薦。作者:王磊】這里基礎概念里有示意圖,可以對rabbit涉及到的基礎概念和流程有一個直觀的認識