基于Redis消息队列-实现短信服务化
1.Redis实现消息队列原理
常用的消息队列有RabbitMQ,ActiveMQ,个人觉得这种消息队列太大太重,本文介绍下基于redis的轻量级消息队列服务。
一般来说,消息队列有两种模式,一种是发布者订阅模式,另外一种是生产者和消费者模式。Redis的消息队列,也是基于这2种原理的实现。
发布者和订阅者模式:发布者发送消息到队列,每个订阅者都能收到一样的消息。
生产者和消费者模式:生产者将消息放入队列,多个消费者共同监听,谁先抢到资源,谁就从队列中取走消息去处理。注意,每个消息只能最多被一个消费者接收。
2.Redis消息队列使用场景
在我们的项目中,使用消息队列来实现短信的服务化,任何需要发送短信的模块,都可以直接调用短信服务来完成短信的发送。比如用户系统登录注册短信,订单系统的下单成功的短信等。
3.SpringMVC中实现Redis消息队列
因为我们短信只需要发送一次,所以我们使用的是消息队列的生产者和消费者模式。
3.1引入Maven依赖
引入Redis相应的maven依赖,这里需要spring-data-redis和jedis
1 //pom.xml 2 <dependency> 3 <groupId>org.springframework.data</groupId> 4 <artifactId>spring-data-redis</artifactId> 5 <version>1.6.0.RELEASE</version> 6 </dependency> 7 8 <!-- jedis --> 9 <dependency> 10 <groupId>redis.clients</groupId> 11 <artifactId>jedis</artifactId> 12 <version>2.5.1</version> 13 </dependency>
3.2配置redis生成者消费者模式
1 //applicationContext-redis.xml 2 <?xml version="1.0" encoding="UTF-8"?> 3 <beans xmlns="http://www.springframework.org/schema/beans" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 5 xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" 6 xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache" 7 xmlns:redis="http://www.springframework.org/schema/redis" 8 xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 9 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd 10 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd 12 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd 13 http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd 14 http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"> 15 <description>spring-data-redis配置</description> 16 17 <bean id="redisConnectionFactory" 18 class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> 19 <property name="hostName" value="${redis.host}"></property> 20 <property name="port" value="${redis.port}"></property> 21 <property name="usePool" value="true"></property> 22 </bean> 23 24 <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> 25 <property name="connectionFactory" ref="redisConnectionFactory"></property> 26 </bean> 27 28 <bean id="jdkSerializer" 29 class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" /> 30 31 <bean id="smsMessageListener" 32 class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"> 33 <property name="delegate" ref="smsMessageDelegateListener" /> 34 <property name="serializer" ref="jdkSerializer" /> 35 </bean> 36 37 <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage"> 38 <property name="redisTemplate" ref="redisTemplate"/> 39 </bean> 40 41 <redis:listener-container> 42 <redis:listener ref="smsMessageListener" method="handleMessage" 43 serializer="jdkSerializer" topic="sms_queue_web_online" /> 44 </redis:listener-container> 45 46 <!-- jedis --> 47 <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> 48 <property name="maxIdle" value="300" /> <!-- 最大能够保持idel状态的对象数 --> 49 <property name="maxTotal" value="60000" /> <!-- 最大分配的对象数 --> 50 <property name="testOnBorrow" value="true" /> <!-- 当调用borrow Object方法时,是否进行有效性检查 --> 51 </bean> 52 53 <bean id="jedisPool" class="redis.clients.jedis.JedisPool"> 54 <constructor-arg index="0" ref="jedisPoolConfig" /> 55 <constructor-arg index="1" value="${redis.host}" /> 56 <constructor-arg index="2" value="${redis.port}" type="int" /> 57 </bean> 58 59 </beans>
主要的配置说明:
1.序列化:一般我们向Redis发送一个消息定义的Java对象,这个对象需要序列化。这里使用JdkSerializationRedisSerializer:
1 <bean id="jdkSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
2.发送者:
1 <bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage"> 2 <property name="redisTemplate" ref="redisTemplate"/> 3</bean>
3.监听者:
1 <bean id="smsMessageListener" 2 class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"> 3 <property name="delegate" ref="smsMessageDelegateListener" /> 4 <property name="serializer" ref="jdkSerializer" /> 5 </bean> 6 <redis:listener-container> 7 <redis:listener ref="smsMessageListener" method="handleMessage" 8 serializer="jdkSerializer" topic="sms_queue_web_online" /> 9 </redis:listener-container>
smsMessageListener:消息监听器
redis:listener-Container:定义消息监听,method:监听消息执行的方法,serializer:序列化,topic:监听主题(可以理解为队列名称)
3.3代码实现
1.定义短信消息对象SmsMessageVo
1 public class SmsMessageVo implements Serializable { 2 //id 3 private Integer smsId; 4 5 //手机号 6 private String mobile; 7 8 //类型,1:验证码 2:订单通知 9 private Byte type; 10 11 //短信创建时间 12 private Date createDate; 13 14 //短信消息处理时间 15 private Date processTime; 16 17 //短信状态,1:未发送 2:发送成功 3:发送失败 18 private Byte status; 19 20 //短信内容 21 private String content; 22 23 //省略setter和getter方法 24 ...
2.定义消息队列发送对象SendMessage
1 //SendMessage.java 2 public class SendMessage { 3 4 private RedisTemplate<String, Object> redisTemplate; 5 6 7 public RedisTemplate<String, Object> getRedisTemplate() { 8 return redisTemplate; 9 } 10 11 12 13 public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) { 14 this.redisTemplate = redisTemplate; 15 } 16 17 18 19 public void sendMessage(String channel, Serializable message) { 20 redisTemplate.convertAndSend(channel, message); 21 } 22 }
3.发送消息
1 String smsContent = templateToContent(template.getContent(), 2 regMsgCode); 3 SmsMessageVo smsMessageVo = new SmsMessageVo(); 4 smsMessageVo.setMobile(mobile); 5 smsMessageVo.setType((byte) SmsType.VERIFICATION.getType()); 6 smsMessageVo.setChannelId(1); 7 smsMessageVo.setContent(smsContent); 8 smsMessageVo.setCreateDate(new Date()); 9 smsMessageVo.setStatus((byte) SmsSendStatus.TO_SEND.getType()); 10 smsMessageVo.setTemplateId(1); 11 12 //先把待发送的短信存入数据库 13 SmsQueue smsQueue = new SmsQueue(); 14 BeanUtils.copyProperties(smsQueue, smsMessageVo); 15 smsQueueService.addSmsQueue(smsQueue); 16 17 //异步发送短信到redis队列 18 sendMessage.sendMessage(Constants.REDIS_QUEUE_SMS_WEB, smsMessageVo); 19 //Constants.REDIS_QUEUE_SMS_WEB = "sms_queue_web_online",和applicationContext-redis中topic配置一样
4.监听消息
1 //SmsMessageDelegateListener.java 2 @Component("smsMessageDelegateListener") 3 public class SmsMessageDelegateListener { 4 5 @Autowired 6 private SmsQueueService smsQueueService; 7 8 //监听Redis消息 9 public void handleMessage(Serializable message){ 10 if(message instanceof SmsMessageVo){ 11 SmsMessageVo messageVo = (SmsMessageVo) message; 12 13 //发送短信 14 SmsSender smsSender = SmsSenderFactory.buildEMaySender(); 15 smsSender.setMobile(messageVo.getMobile()); 16 smsSender.setContent(messageVo.getContent()); 17 boolean sendSucc = false; 18 //判断短信类型 19 //验证码短信 20 if(messageVo.getType() == (byte)SmsType.VERIFICATION.getType()){ 21 sendSucc = smsSender.send(); 22 } 23 24 25 if(!sendSucc){ 26 return; 27 } 28 29 // 异步更新短信表状态为发送成功 30 final Integer smsId = messageVo.getSmsId(); 31 Executor executor = Executors.newSingleThreadExecutor(); 32 executor.execute(new Runnable() { 33 public void run() { 34 SmsQueue smsQueue = new SmsQueue(); 35 smsQueue.setSmsId(smsId); 36 smsQueue.setStatus((byte)SmsSendStatus.SEND.getType()); 37 smsQueue.setProcessTime(new Date()); 38 smsQueueService.updateSmsQueue(smsQueue); 39 } 40 }); 41 42 } 43 } 44 }
4.总结
下面使用一张流程图,来总结Redis消息队列和短信服务。