1.MQ
消息隊列(Message Queue,簡稱MQ)——應用程序和應用程序之間的通信方法
應用:不同進程Process/線程Thread之間通信
比較流行的中間件:
ActiveMQ
RabbitMQ(非常重量級,更適合於企業級的開發)
Kafka(高吞吐量的分布式發布訂閱消息系統)
RocketMQ
在高並發、可靠性、成熟度等方面,RabbitMQ是首選
Kafka的性能(吞吐量、TPS)比RabbitMq要高出來很多,但Kafka主要定位在日志方面,如果業務方面還是建議選擇RabbitMq
2.AMQP
Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計
主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全
3.RabbitMQ
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫
支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX
用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗
(1)安裝
需要先安裝Erlang ,再安裝RabbitMQ
環境:win7
Erlang
下載 :
https://www.erlang-solutions.com/resources/download.html
安裝:
雙擊下載的文件(esl-erlang_22.1~windows_amd64.exe) ,下一步進行安裝
安裝完后開始菜單多了
RabbitMQ
下載 :
https://www.rabbitmq.com/download.html
安裝:
雙擊下載的文件(rabbitmq-server-3.8.1.exe) ,下一步進行安裝
安裝完后開始菜單多了
選擇開始菜單的RabbitMQ Command Prompt(sbin dir)
進入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin輸入命令
rabbitmq-plugins enable rabbitmq_management
啟動了管理工具
服務啟動 net start RabbitMQ
服務停止 net stop RabbitMQ
服務啟動后,瀏覽器打開http://localhost:15672/
使用賬號 guest ,密碼 guest
能夠登錄,安裝成功
(2)用戶管理
Admin選項卡
A.添加用戶
用戶角色:
超級管理員(administrator)
監控者(monitoring)
策略制定者(policymaker)
普通管理者(management)
其他
B.創建Virtual Hosts
C.設置權限
選中Admin用戶,進入權限設置
已添加權限
(3)spring boot整合RabbitMQ
添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.1.RELEASE</version> </dependency>
添加配置
#對於rabbitMQ的支持 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.virtual-host=testhost spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true
添加RabbitMQ配置類
package com.example.demo.configure; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { public static final String RABBITMQ_QUEUE_NAME = "Queue1"; public static final String RABBITMQ_ORDER_QUEUE_NAME = "OrderQueue1"; private final static Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class); @Autowired private CachingConnectionFactory cachingConnectionFactory; @Bean public Queue commonQueue() { return new Queue(RabbitMqConfig.RABBITMQ_QUEUE_NAME); } @Bean public Queue orderQueue() { return new Queue(RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME); } @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } // 建立Queue與Exchange的綁定關系 @Bean public Binding bindingExchangeMessage(Queue orderQueue, DirectExchange directExchange) { return BindingBuilder.bind(orderQueue).to(directExchange).with( RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME); } @Bean public RabbitTemplate rabbitTemplate() { cachingConnectionFactory.setPublisherConfirms(true); cachingConnectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) logger.info("消息發送成功: correlationData:({}),ack:({ack}),cause:({})", correlationData, ack, cause); else logger.info("消息發送失敗: correlationData:({}),ack:({ack}),cause:({})", correlationData, ack, cause); }); rabbitTemplate.setReturnCallback( (message, replyCode, replyText, exchange, routingKey) -> logger.info( "消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); return rabbitTemplate; } }
生產者
package com.example.demo.mq; import com.example.demo.configure.RabbitMqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class OrderMaker { private final static Logger logger = LoggerFactory.getLogger(OrderMaker.class); @Autowired private RabbitTemplate rabbitTemplate; public void send(String content) { this.rabbitTemplate.convertAndSend(RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME,content); } }
測試入口
package com.example.demo.controller; import com.example.demo.mq.OrderMaker; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class Demo { @Autowired private OrderMaker orderMaker; @RequestMapping(value = "/testMq",method = RequestMethod.GET,produces = MediaType.ALL_VALUE) public String testMq(String msg) { orderMaker.send(msg); System.out.println(msg); return "Successfully."; } }
使用postman測試http://127.0.0.1:8080/testMq?msg=hahaha,this is a test
在http://localhost:15672中
OrderQueue1隊列有兩條消息
查看消息
消費者
package com.example.demo.mq; import com.example.demo.configure.RabbitMqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME) public class OrderListener { private final static Logger logger = LoggerFactory.getLogger(OrderListener.class); @RabbitHandler public void process(String orderMsg) { logger.info("訂單消費者收到消息:" + orderMsg); } }
重新啟動
log輸出
2019-11-13 14:36:51.500 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.example.demo.mq.OrderListener - 訂單消費者收到消息:hahaha,this is a test
2019-11-13 14:36:51.516 [AMQP Connection 127.0.0.1:5672] INFO com.example.demo.configure.RabbitMqConfig - 消息發送成功: correlationData:(null),ack:({ack}),cause:(true)
這樣就實現了簡單的隊列,生產者將消息發送到隊列,消費者從隊列中獲取消息
P:消息的生產者
C:消息的消費者
紅色:隊列