基於Redis消息隊列-實現短信服務化
1.Redis實現消息隊列原理
常用的消息隊列有RabbitMQ,ActiveMQ,個人覺得這種消息隊列太大太重,本文介紹下基於redis的輕量級消息隊列服務。
一般來說,消息隊列有兩種模式,一種是發布者訂閱模式,另外一種是生產者和消費者模式。Redis的消息隊列,也是基於這2種原理的實現。
發布者和訂閱者模式:發布者發送消息到隊列,每個訂閱者都能收到一樣的消息。
生產者和消費者模式:生產者將消息放入隊列,多個消費者共同監聽,誰先搶到資源,誰就從隊列中取走消息去處理。注意,每個消息只能最多被一個消費者接收。
2.Redis消息隊列使用場景
在我們的項目中,使用消息隊列來實現短信的服務化,任何需要發送短信的模塊,都可以直接調用短信服務來完成短信的發送。比如用戶系統登錄注冊短信,訂單系統的下單成功的短信等。
3.SpringMVC中實現Redis消息隊列
因為我們短信只需要發送一次,所以我們使用的是消息隊列的生產者和消費者模式。
3.1引入Maven依賴
引入Redis相應的maven依賴,這里需要spring-data-redis和jedis
1 //pom.xml 2 <dependency> 3 <groupId>org.springframework.data</groupId> 4 <artifactId>spring-data-redis</artifactId> 5 <version>1.6.0.RELEASE</version> 6 </dependency> 7 8 <!-- jedis --> 9 <dependency> 10 <groupId>redis.clients</groupId> 11 <artifactId>jedis</artifactId> 12 <version>2.5.1</version> 13 </dependency>
3.2配置redis生成者消費者模式
1 //applicationContext-redis.xml 2 <?xml version="1.0" encoding="UTF-8"?> 3 <beans xmlns="http://www.springframework.org/schema/beans" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 5 xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" 6 xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache" 7 xmlns:redis="http://www.springframework.org/schema/redis" 8 xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 9 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd 10 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd 12 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd 13 http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd 14 http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"> 15 <description>spring-data-redis配置</description> 16 17 <bean id="redisConnectionFactory" 18 class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> 19 <property name="hostName" value="${redis.host}"></property> 20 <property name="port" value="${redis.port}"></property> 21 <property name="usePool" value="true"></property> 22 </bean> 23 24 <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> 25 <property name="connectionFactory" ref="redisConnectionFactory"></property> 26 </bean> 27 28 <bean id="jdkSerializer" 29 class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" /> 30 31 <bean id="smsMessageListener" 32 class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"> 33 <property name="delegate" ref="smsMessageDelegateListener" /> 34 <property name="serializer" ref="jdkSerializer" /> 35 </bean> 36 37 <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage"> 38 <property name="redisTemplate" ref="redisTemplate"/> 39 </bean> 40 41 <redis:listener-container> 42 <redis:listener ref="smsMessageListener" method="handleMessage" 43 serializer="jdkSerializer" topic="sms_queue_web_online" /> 44 </redis:listener-container> 45 46 <!-- jedis --> 47 <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> 48 <property name="maxIdle" value="300" /> <!-- 最大能夠保持idel狀態的對象數 --> 49 <property name="maxTotal" value="60000" /> <!-- 最大分配的對象數 --> 50 <property name="testOnBorrow" value="true" /> <!-- 當調用borrow Object方法時,是否進行有效性檢查 --> 51 </bean> 52 53 <bean id="jedisPool" class="redis.clients.jedis.JedisPool"> 54 <constructor-arg index="0" ref="jedisPoolConfig" /> 55 <constructor-arg index="1" value="${redis.host}" /> 56 <constructor-arg index="2" value="${redis.port}" type="int" /> 57 </bean> 58 59 </beans>
主要的配置說明:
1.序列化:一般我們向Redis發送一個消息定義的Java對象,這個對象需要序列化。這里使用JdkSerializationRedisSerializer:
1 <bean id="jdkSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
2.發送者:
1 <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage"> 2 <property name="redisTemplate" ref="redisTemplate"/> 3</bean>
3.監聽者:
1 <bean id="smsMessageListener" 2 class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"> 3 <property name="delegate" ref="smsMessageDelegateListener" /> 4 <property name="serializer" ref="jdkSerializer" /> 5 </bean> 6 <redis:listener-container> 7 <redis:listener ref="smsMessageListener" method="handleMessage" 8 serializer="jdkSerializer" topic="sms_queue_web_online" /> 9 </redis:listener-container>
smsMessageListener:消息監聽器
redis:listener-Container:定義消息監聽,method:監聽消息執行的方法,serializer:序列化,topic:監聽主題(可以理解為隊列名稱)
3.3代碼實現
1.定義短信消息對象SmsMessageVo
1 public class SmsMessageVo implements Serializable { 2 //id 3 private Integer smsId; 4 5 //手機號 6 private String mobile; 7 8 //類型,1:驗證碼 2:訂單通知 9 private Byte type; 10 11 //短信創建時間 12 private Date createDate; 13 14 //短信消息處理時間 15 private Date processTime; 16 17 //短信狀態,1:未發送 2:發送成功 3:發送失敗 18 private Byte status; 19 20 //短信內容 21 private String content; 22 23 //省略setter和getter方法 24 ...
2.定義消息隊列發送對象SendMessage
1 //SendMessage.java 2 public class SendMessage { 3 4 private RedisTemplate<String, Object> redisTemplate; 5 6 7 public RedisTemplate<String, Object> getRedisTemplate() { 8 return redisTemplate; 9 } 10 11 12 13 public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) { 14 this.redisTemplate = redisTemplate; 15 } 16 17 18 19 public void sendMessage(String channel, Serializable message) { 20 redisTemplate.convertAndSend(channel, message); 21 } 22 }
3.發送消息
1 String smsContent = templateToContent(template.getContent(), 2 regMsgCode); 3 SmsMessageVo smsMessageVo = new SmsMessageVo(); 4 smsMessageVo.setMobile(mobile); 5 smsMessageVo.setType((byte) SmsType.VERIFICATION.getType()); 6 smsMessageVo.setChannelId(1); 7 smsMessageVo.setContent(smsContent); 8 smsMessageVo.setCreateDate(new Date()); 9 smsMessageVo.setStatus((byte) SmsSendStatus.TO_SEND.getType()); 10 smsMessageVo.setTemplateId(1); 11 12 //先把待發送的短信存入數據庫 13 SmsQueue smsQueue = new SmsQueue(); 14 BeanUtils.copyProperties(smsQueue, smsMessageVo); 15 smsQueueService.addSmsQueue(smsQueue); 16 17 //異步發送短信到redis隊列 18 sendMessage.sendMessage(Constants.REDIS_QUEUE_SMS_WEB, smsMessageVo); 19 //Constants.REDIS_QUEUE_SMS_WEB = "sms_queue_web_online",和applicationContext-redis中topic配置一樣
4.監聽消息
1 //SmsMessageDelegateListener.java 2 @Component("smsMessageDelegateListener") 3 public class SmsMessageDelegateListener { 4 5 @Autowired 6 private SmsQueueService smsQueueService; 7 8 //監聽Redis消息 9 public void handleMessage(Serializable message){ 10 if(message instanceof SmsMessageVo){ 11 SmsMessageVo messageVo = (SmsMessageVo) message; 12 13 //發送短信 14 SmsSender smsSender = SmsSenderFactory.buildEMaySender(); 15 smsSender.setMobile(messageVo.getMobile()); 16 smsSender.setContent(messageVo.getContent()); 17 boolean sendSucc = false; 18 //判斷短信類型 19 //驗證碼短信 20 if(messageVo.getType() == (byte)SmsType.VERIFICATION.getType()){ 21 sendSucc = smsSender.send(); 22 } 23 24 25 if(!sendSucc){ 26 return; 27 } 28 29 // 異步更新短信表狀態為發送成功 30 final Integer smsId = messageVo.getSmsId(); 31 Executor executor = Executors.newSingleThreadExecutor(); 32 executor.execute(new Runnable() { 33 public void run() { 34 SmsQueue smsQueue = new SmsQueue(); 35 smsQueue.setSmsId(smsId); 36 smsQueue.setStatus((byte)SmsSendStatus.SEND.getType()); 37 smsQueue.setProcessTime(new Date()); 38 smsQueueService.updateSmsQueue(smsQueue); 39 } 40 }); 41 42 } 43 } 44 }
4.總結
下面使用一張流程圖,來總結Redis消息隊列和短信服務。