Redis實現消息隊列


基於Redis消息隊列-實現短信服務化

1.Redis實現消息隊列原理

常用的消息隊列有RabbitMQ,ActiveMQ,個人覺得這種消息隊列太大太重,本文介紹下基於redis的輕量級消息隊列服務。 
一般來說,消息隊列有兩種模式,一種是發布者訂閱模式,另外一種是生產者和消費者模式。Redis的消息隊列,也是基於這2種原理的實現。 
發布者和訂閱者模式:發布者發送消息到隊列,每個訂閱者都能收到一樣的消息。 
生產者和消費者模式:生產者將消息放入隊列,多個消費者共同監聽,誰先搶到資源,誰就從隊列中取走消息去處理。注意,每個消息只能最多被一個消費者接收。

2.Redis消息隊列使用場景

在我們的項目中,使用消息隊列來實現短信的服務化,任何需要發送短信的模塊,都可以直接調用短信服務來完成短信的發送。比如用戶系統登錄注冊短信,訂單系統的下單成功的短信等。

3.SpringMVC中實現Redis消息隊列

因為我們短信只需要發送一次,所以我們使用的是消息隊列的生產者和消費者模式。  

3.1引入Maven依賴

引入Redis相應的maven依賴,這里需要spring-data-redis和jedis

 1  //pom.xml
 2     <dependency>
 3         <groupId>org.springframework.data</groupId>
 4         <artifactId>spring-data-redis</artifactId>
 5         <version>1.6.0.RELEASE</version>
 6     </dependency>
 7 
 8       <!-- jedis -->
 9       <dependency>
10             <groupId>redis.clients</groupId>
11             <artifactId>jedis</artifactId>
12              <version>2.5.1</version>
13         </dependency>

3.2配置redis生成者消費者模式

 1   //applicationContext-redis.xml
 2     <?xml version="1.0" encoding="UTF-8"?>
 3 <beans xmlns="http://www.springframework.org/schema/beans"
 4     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 5     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
 6     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache"
 7     xmlns:redis="http://www.springframework.org/schema/redis"
 8     xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
 9                             http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd     
10                             http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11                             http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
12                             http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop.xsd
13                             http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
14                             http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
15     <description>spring-data-redis配置</description>
16 
17     <bean id="redisConnectionFactory"
18         class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
19         <property name="hostName" value="${redis.host}"></property>
20         <property name="port" value="${redis.port}"></property>
21         <property name="usePool" value="true"></property>
22     </bean>
23 
24     <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
25         <property name="connectionFactory" ref="redisConnectionFactory"></property>
26     </bean>
27 
28     <bean id="jdkSerializer"
29         class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
30 
31     <bean id="smsMessageListener"
32         class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
33         <property name="delegate" ref="smsMessageDelegateListener" />
34         <property name="serializer" ref="jdkSerializer" />
35     </bean>
36 
37     <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage">
38         <property name="redisTemplate" ref="redisTemplate"/>
39     </bean>
40 
41     <redis:listener-container>
42         <redis:listener ref="smsMessageListener" method="handleMessage"
43             serializer="jdkSerializer" topic="sms_queue_web_online" />
44     </redis:listener-container>
45 
46     <!-- jedis -->
47     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
48         <property name="maxIdle" value="300" /> <!-- 最大能夠保持idel狀態的對象數  -->
49         <property name="maxTotal" value="60000" /> <!-- 最大分配的對象數 -->
50         <property name="testOnBorrow" value="true" /> <!-- 當調用borrow Object方法時,是否進行有效性檢查 -->
51     </bean>
52 
53     <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
54         <constructor-arg index="0" ref="jedisPoolConfig" />
55         <constructor-arg index="1" value="${redis.host}" />
56         <constructor-arg index="2" value="${redis.port}" type="int" />
57     </bean>
58 
59 </beans>

主要的配置說明: 
1.序列化:一般我們向Redis發送一個消息定義的Java對象,這個對象需要序列化。這里使用JdkSerializationRedisSerializer:

1 <bean id="jdkSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />

2.發送者:

1 <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage">
2       <property name="redisTemplate" ref="redisTemplate"/>
3</bean>

3.監聽者:

1 <bean id="smsMessageListener"
2         class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
3         <property name="delegate" ref="smsMessageDelegateListener" />
4         <property name="serializer" ref="jdkSerializer" />
5     </bean>
6     <redis:listener-container>
7         <redis:listener ref="smsMessageListener" method="handleMessage"
8             serializer="jdkSerializer" topic="sms_queue_web_online" />
9     </redis:listener-container>

smsMessageListener:消息監聽器 
redis:listener-Container:定義消息監聽,method:監聽消息執行的方法,serializer:序列化,topic:監聽主題(可以理解為隊列名稱)

3.3代碼實現

1.定義短信消息對象SmsMessageVo

 1 public class SmsMessageVo implements Serializable {
 2     //id
 3     private Integer smsId;
 4 
 5     //手機號
 6     private String mobile;
 7 
 8     //類型,1:驗證碼 2:訂單通知
 9     private Byte type;
10 
11     //短信創建時間
12     private Date createDate;
13 
14     //短信消息處理時間
15     private Date processTime;
16 
17     //短信狀態,1:未發送 2:發送成功 3:發送失敗
18     private Byte status;
19 
20     //短信內容
21     private String content;
22 
23     //省略setter和getter方法
24     ...

2.定義消息隊列發送對象SendMessage

 1 //SendMessage.java
 2 public class SendMessage {
 3 
 4     private RedisTemplate<String, Object> redisTemplate;
 5 
 6 
 7     public RedisTemplate<String, Object> getRedisTemplate() {
 8         return redisTemplate;
 9     }
10 
11 
12 
13     public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
14         this.redisTemplate = redisTemplate;
15     }
16 
17 
18 
19     public void sendMessage(String channel, Serializable message) {
20         redisTemplate.convertAndSend(channel, message);
21     }
22 }

3.發送消息

 1   String smsContent = templateToContent(template.getContent(),
 2                     regMsgCode);
 3     SmsMessageVo smsMessageVo = new SmsMessageVo();
 4     smsMessageVo.setMobile(mobile);
 5     smsMessageVo.setType((byte) SmsType.VERIFICATION.getType());
 6     smsMessageVo.setChannelId(1);
 7     smsMessageVo.setContent(smsContent);
 8     smsMessageVo.setCreateDate(new Date());
 9     smsMessageVo.setStatus((byte) SmsSendStatus.TO_SEND.getType());
10     smsMessageVo.setTemplateId(1);
11 
12     //先把待發送的短信存入數據庫
13     SmsQueue smsQueue = new SmsQueue();
14     BeanUtils.copyProperties(smsQueue, smsMessageVo);
15     smsQueueService.addSmsQueue(smsQueue);
16 
17     //異步發送短信到redis隊列
18     sendMessage.sendMessage(Constants.REDIS_QUEUE_SMS_WEB, smsMessageVo);
19     //Constants.REDIS_QUEUE_SMS_WEB = "sms_queue_web_online",和applicationContext-redis中topic配置一樣

4.監聽消息

 1 //SmsMessageDelegateListener.java
 2 @Component("smsMessageDelegateListener")
 3 public class SmsMessageDelegateListener {
 4 
 5     @Autowired
 6     private SmsQueueService smsQueueService;
 7 
 8     //監聽Redis消息
 9     public void handleMessage(Serializable message){
10         if(message instanceof SmsMessageVo){
11             SmsMessageVo messageVo = (SmsMessageVo) message;
12 
13             //發送短信
14             SmsSender smsSender = SmsSenderFactory.buildEMaySender();
15             smsSender.setMobile(messageVo.getMobile());
16             smsSender.setContent(messageVo.getContent());
17             boolean sendSucc = false;
18             //判斷短信類型
19             //驗證碼短信
20             if(messageVo.getType() == (byte)SmsType.VERIFICATION.getType()){
21                 sendSucc = smsSender.send();
22             }
23 
24 
25             if(!sendSucc){
26                 return;
27             }
28 
29             // 異步更新短信表狀態為發送成功
30             final Integer smsId = messageVo.getSmsId();
31             Executor executor = Executors.newSingleThreadExecutor();
32             executor.execute(new Runnable() {
33                 public void run() {
34                     SmsQueue smsQueue = new SmsQueue();
35                     smsQueue.setSmsId(smsId);
36                     smsQueue.setStatus((byte)SmsSendStatus.SEND.getType());
37                     smsQueue.setProcessTime(new Date());
38                     smsQueueService.updateSmsQueue(smsQueue);
39                 }
40             });
41 
42         }
43     }
44 }

4.總結

下面使用一張流程圖,來總結Redis消息隊列和短信服務。 

這里寫圖片描述

 

 

 

 

 

 

 

 

 
        

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM