轉http://www.jianshu.com/p/048e954dab40
概念:
分布式消息隊列
‘分布式消息隊列’包含兩個概念
一是‘消息隊列’,二是‘分布式’
那么就先看下消息隊列的概念,和為什么需要分布式
消息隊列的定義
“消息”指進程間傳送的數據
“隊列”是在消息的傳輸過程中保存消息的容器
消息被發送到隊列中,消息隊列充當中間人,將消息從源發送給目標
當系統中出現“生產“和“消費“的速度或穩定性等因素不一致時,就需要消息隊列,作為抽象層,彌合雙方的差異
例如
(1)服務員點菜快,廚師做菜慢,服務員只需要下單給廚師,然后就可以繼續去服務顧客,不需要等待廚師把菜做完
點菜單就相當於消息,放單子的位置就相當於隊列
(2)業務系統需要發短信,但短信發送模塊速度跟不上,業務系統就可以把發送短信的相關信息封裝為一個消息,放入隊列,短信發送模塊從隊列中獲取消息進行處理
消息隊列的好處
(1)提高系統響應速度
使用了消息隊列,生產者一方,把消息往隊列里一扔,就可以立馬返回響應用戶了,無需等待處理結果
(2)保證消息的傳遞
如果發送消息時接收者不可用,消息隊列會保留消息,直到成功地傳遞它
(3)解耦
只要消息格式不變,即使接收者的接口、位置、或者配置改變,也不會給發送者帶來任何改變
消息發送者無需知道消息接收者是誰,使得系統設計更清晰
為什么需要分布式消息隊列
(1)多系統協作需要分布式
例如消息隊列中的數據需要在多個系統間共享,所以需要提供分布式通信機制、協同機制
(2)可靠
消息會被持久化到分布式存儲中,這樣避免了單台機器存儲的消息由於機器問題導致消息的丟失
(3)可擴展
分布式消息隊列,會隨着訪問量的增加而方便的增加處理服務器
2. 消息隊列技術是分布式應用間交換信息的一種技術
現在電商網站某個搶購活動,並發怎么辦?消息隊列
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。有玩RabbitMq的哥們,多多交流!
用戶請求會post到后台一些信息如(用戶信息,商品信息)到消息隊列中。
消息隊列通過Windows服務去處理解析過來的信息,單線程處理(多線程可能會出現問題,你懂得)!
成功的話,插入到本次活動的成功記錄表里面;失敗的話,插入到有意購買表(方便業務人員銷售)!
怎樣通知用戶?
1,ajax異步去請求(隔兩分鍾去請求一次成功記錄,如果不請求庫的話我們會用Redis緩存)
2,長連接的方式(websocket,signalr之類的)
在整套的微服務架構中, 消息隊列是不可或缺的部分, 它能夠起到線程內同步或者異步調用無法達到的作用,優缺點分別是:
優點:
- 解耦
i. 只依賴消息的格式, 而不依賴發送者的ip和端口
ii. 多消費者的情況下, 發送者不需要關注消費者的任何信息 - 路由
不能互相訪問的網絡之間可以消息隊列實現訪問, 可以減少對現有網絡的修改。 - 消息可靠性
當消費者發生故障時, 消息可以被有效保存下來, 等待恢復后繼續訪問. - 異步調用
發送者異步發送消息, 不等待消息ack,不會對發送者本身產生響應速度的影響, 當然異步調用也是可以實現的。 - 方便擴展
集群部署消息隊列, 當流量增大和減小是可以通過調整部署來實現和發送方, 消費方無關。
缺點:
多出一個環節,需要保證消息隊列的可用性。
二. 常用消息隊列
目前常用的消息隊列大概有三種類型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它們的使用場景分別是:
1.RabbitMQ: 相對重量級高並發的情況,比如數據的異步處理 任務的串行執行等.
2.Kafka: 基於Pull的模式來處理,具體很高的吞吐量,一般用來進行 日志的存儲和收集.
3.Redis: 輕量級高並發,實時性要求高的情況,比如緩存,秒殺,及時的數據分析(ELK日志分析框架,使用的就是Redis).
三. SpingBoot 集成消息隊列
在SpingBoot中對這個三種都有支持
-
Redis
請參照 其他作者的 文章 http://www.jianshu.com/p/a2ab17707eff -
RabbitMQ
i. 配置<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
ii.實現方式 RabbitMQDemoConfigration.java
import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQDemo * * @author: sunjie * @date: 16/01/10 */ @Configuration @SuppressWarnings("SpringJavaAutowiringInspection") public class RabbitMQDemoConfigration { @Bean public CachingConnectionFactory myConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setHost("192.168.1.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/myHost"); return connectionFactory; } // 生成CachingConnectionFactory 也可以使用下面的方式,在application.properties // 中定義好屬性即可 // @Autowired // ConnectionFactory connectionFactory; @Bean public DirectExchange myExchange() { return new DirectExchange("myExchangeDemo", true, false); } @Bean public Queue myQueue() { return new Queue("myQueueDemo", true); } @Bean public Binding myExchangeBinding(@Qualifier("myExchange") DirectExchange directExchange, @Qualifier("myQueue") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("routeDemo"); } @Bean public RabbitTemplate myExchangeTemlate() { RabbitTemplate r = new RabbitTemplate(myConnectionFactory()); r.setExchange("myExchangeDemo"); r.setRoutingKey("routeDemo"); return r; } /** * 發送消息,工業使用需要自己做個性化實現 */ @Bean public void sendMessage(RabbitTemplate myExchangeTemlate) { String string = "Hello RabbitmQ"; myExchangeTemlate.convertAndSend(string); } /** * 接受消息,工業使用時需要在監聽類中實現process邏輯 */ @RabbitListener(queues = "myQueueDemo") public void process(Message message) { System.out.println("__________" + message.getBody().toString() + "__________"); try { this.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } }
-
Kafka 使用
i. pom.xml 配置<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> </dependency>
ii. spring-integration-kafka.xml 也可以研究通過bean方式來實現
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int:channel id="inputToKafka"> <int:queue/> </int:channel> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" channel="inputToKafka"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> </int-kafka:outbound-channel-adapter> <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="50000"/> <int-kafka:producer-context id="kafkaProducerContext"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="192.168.2.2:9092" key-class-type="java.lang.String" value-class-type="java.lang.String" topic="1_service" value-encoder="kafkaEncoder" key-encoder="kafkaEncoder"/> </int-kafka:producer-configurations> </int-kafka:producer-context> <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/> </beans>
iii. 客戶端實現代碼:
@Autowired MessageChannel inputToKafka; String value = "Hello Kafka"; Message<String> mess = MessageBuilder.withPayload(value) .setHeader(KafkaHeaders.TOPIC, appSetting.getStrValue("topic")).build(); inputToKafka.send(mess);
四. 后記
消息中間件有很多,實際使用中往往是根據架構師的技術棧相關,做到了解使用場景和基本原理,在項目中提升自己細節能力,做到個人和公司共同成長,才是好的方式.