文章參考
SpringBoot(9) 基於Redis消息隊列實現異步操作
https://blog.csdn.net/wilsonsong1024/article/details/80573611
所做的改進
- 博客中實用的是jedis操作,在springboot的年代,我們不需要去寫redis的操作工具類了。
- 直接上redisTemplate的使用。
- handler的處理需要根據業務需求改造。
- 增加了測試部分
覺得后期的改進
- 消費redis的時候,看看有沒有阻塞的策略(我的代碼中是一直查詢,感覺不太好)
- 消費線程,直接使用的是new thread。這個不太好管理(后期用線程池優化)
涉及spring和springboot使用的部分
- RedisTemplate序列化的配置,以及api相關的應用
- fastjson的JSONObject等相關使用
- InitializingBean,ApplicationContextAware是使用,以及其實現接口的作用
- 線程池相關的問題學習
- 模板模式的抽象能力
代碼
pom.xml文件
<modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pig</groupId> <artifactId>about-redis</artifactId> <version>0.0.1-SNAPSHOT</version> <name>about-redis</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
AboutRedisApplication(這種redis序列化,可以在redis客戶端中看到字符串)
package com.pig.aboutredis; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @SpringBootApplication public class AboutRedisApplication { public static void main(String[] args) { try{ SpringApplication.run(AboutRedisApplication.class, args); }catch (Exception e){ e.printStackTrace(); } } @Bean public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){ RedisTemplate<String,Object> template=new RedisTemplate<>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer serializer=new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om=new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(om); template.setValueSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); // 如果這里不設置的話,hash的類型如果是自定義類型,就會報錯,序列化問題 template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } }
EventModel (return this 值得思考學習)
package com.pig.aboutredis.messagequeue.common; import lombok.Data; import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import java.util.HashMap; import java.util.Map; public class EventModel { private EventType type; private int actorId; private int entityType; private int entityId; private int entityOwerId; public Map<String,String> exts=new HashMap<>(); public EventModel() { } public EventModel(EventType type) { this.type = type; } public EventType getType() { return type; } public EventModel setType(EventType type) { this.type = type; return this; } public int getActorId() { return actorId; } public EventModel setActorId(int actorId) { this.actorId = actorId; return this; } public int getEntityType() { return entityType; } public EventModel setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityId() { return entityId; } public EventModel setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityOwerId() { return entityOwerId; } public EventModel setEntityOwerId(int entityOwerId) { this.entityOwerId = entityOwerId; return this; } public String getExts(String key) { return exts.get(key); } public EventModel setExts(String key,String value) { exts.put(key,value); return this; } @Override public String toString() { return "EventModel{" + "type=" + type + ", actorId=" + actorId + ", entityType=" + entityType + ", entityId=" + entityId + ", entityOwerId=" + entityOwerId + '}'; } }
EventType
package com.pig.aboutredis.messagequeue.common; //事件的各種類型 public enum EventType { LIKE(0),COMMENT(1),LOGIN(2),MAIL(3); private int value; EventType(int value){ this.value=value; } public int getValue(){ return value; } }
EventProducer
package com.pig.aboutredis.messagequeue.producer; import com.alibaba.fastjson.JSONObject; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.security.Key; @Component @Slf4j public class EventProducer { @Autowired private RedisTemplate redisTemplate; // 把事件分發出去 // 就是存儲到redis中的list數據類型中 public boolean fireEvent(EventModel model){ try{ String jsonString = JSONObject.toJSONString(model); String queueKey = RedisKeyUtil.getEventQueueKey(); redisTemplate.opsForList().leftPush(queueKey,jsonString); return true; }catch (Exception e){ e.printStackTrace(); return false; } } }
RedisKeyUtil 這個在我的代碼沒有任何意義,只需要一個共同的key(消費者和生產者)
package com.pig.aboutredis.messagequeue.utils; public class RedisKeyUtil { private static final String SPLIT=":"; private static final String BIZ_LIKE="LIKE"; private static final String BIZ_DISLIKE="DISLIKE"; private static final String BIZ_EVENTQUEUE="EVENTQUEUE"; public static String getLikeKey(int entityType,int entityId){ return BIZ_LIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId); } public static String getDisLikeKey(int entityType,int entityId){ return BIZ_DISLIKE+SPLIT+String.valueOf(entityType)+SPLIT+String.valueOf(entityId); } public static String getEventQueueKey(){ return BIZ_EVENTQUEUE; } }
EventHandler 這里用到了模板設計模式,很多抽象接口,使用到了模板模式
package com.pig.aboutredis.messagequeue.consumer; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import java.util.List; // 模板設計模式 public interface EventHandler { void doHandler(EventModel model); List<EventType> getSupportEventTypes(); }
EventConsumer 這種方法也可以自動注入所有EventHandler類型的bean
@Autowired
private Map<String,EventHandler> beans;
package com.pig.aboutredis.messagequeue.consumer; import com.alibaba.fastjson.JSON; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import com.pig.aboutredis.messagequeue.utils.RedisKeyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Handler; @Component @Slf4j public class EventConsumer implements InitializingBean,ApplicationContextAware { private Map<EventType,List<EventHandler>> config=new HashMap<>(); // 獲取spring的context private ApplicationContext context; @Autowired private RedisTemplate redisTemplate; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context=applicationContext; } public void afterPropertiesSet() throws Exception { Map<String,EventHandler> beans=context.getBeansOfType(EventHandler.class); if(!CollectionUtils.isEmpty(beans)){ for (Map.Entry<String,EventHandler> entry:beans.entrySet()){ // 當前handler能夠處理的類型 // 例如:LikeHandler 能夠處理 EventType.LIKE List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); // 初始化 config // key: EventType.LIKE類型 value: LikeHandler能夠處理type的handler for (EventType type : eventTypes) { // 如果不包含,就創建一個可以 // 如果包含,就add進入list if(!config.containsKey(type)){ config.put(type,new ArrayList<EventHandler>()); } config.get(type).add(entry.getValue()); } } } // 消費策略 // 有機會,優化為線程池類型。 // 真正的線上環境,是不可能這樣創建線程的,這樣不好控制,沒有高可用,不方便管理 Thread thread=new Thread(new Runnable() { @Override public void run() { while (true){ String key = RedisKeyUtil.getEventQueueKey(); // producer是從left進入,consumer從right消費 String rightPop = (String) redisTemplate.opsForList().rightPop(key); // 需要優化,redis api是否可以阻塞,如果有阻塞的可能,那么這里就不用判斷了 if(StringUtils.isEmpty(rightPop)){ continue; } EventModel eventModel = JSON.parseObject(rightPop, EventModel.class); if(!config.containsKey(eventModel.getType())){ log.error("不能識別的事件"); continue; } // 一個類型,多個handler可以消費 // 這里的消費策略,需要根據業務情況設置 for (EventHandler handler : config.get(eventModel.getType())) { handler.doHandler(eventModel); } } } }); thread.start(); } }
LikeHandler
package com.pig.aboutredis.messagequeue.consumer.handler; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import com.pig.aboutredis.messagequeue.consumer.EventHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; @Component @Slf4j public class LikeHandler implements EventHandler { @Override public void doHandler(EventModel model) { log.info("LikeHandler 消費了你的數據,開始消費"); log.info("LikeHandler 消費了你的數據,消費{}",model); log.info("LikeHandler 消費了你的數據,結束消費"); } @Override public List<EventType> getSupportEventTypes() { return Arrays.asList(EventType.LIKE); } }
測試
ProducerController
package com.pig.aboutredis.messagequeue.controller; import com.pig.aboutredis.messagequeue.common.EventModel; import com.pig.aboutredis.messagequeue.common.EventType; import com.pig.aboutredis.messagequeue.producer.EventProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private EventProducer producer; @GetMapping("/queue") public String redisQueue(){ EventModel model=new EventModel(); model.setType(EventType.LIKE); model.setActorId(11); producer.fireEvent(model); model.setActorId(22); producer.fireEvent(model); model.setActorId(33); producer.fireEvent(model); model.setActorId(44); producer.fireEvent(model); model.setActorId(55); producer.fireEvent(model); return "i am your father"; } }
http://localhost:8080/aboutredis/queue
日志結果
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,開始消費
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,消費EventModel{type=LIKE, actorId=11, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.254 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,結束消費
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,開始消費
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,消費EventModel{type=LIKE, actorId=22, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.351 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,結束消費
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,開始消費
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,消費EventModel{type=LIKE, actorId=33, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.454 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,結束消費
2020-04-02 22:28:53.538 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,開始消費
2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,消費EventModel{type=LIKE, actorId=44, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.539 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,結束消費
2020-04-02 22:28:53.623 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,開始消費
2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,消費EventModel{type=LIKE, actorId=55, entityType=0, entityId=0, entityOwerId=0}
2020-04-02 22:28:53.626 INFO 7872 --- [ Thread-10] c.p.a.m.consumer.handler.LikeHandler : LikeHandler 消費了你的數據,結束消費
目的
寫這個博客的目的是:熟悉redis和springboot
熟悉redis和springboot不能只是停留在api的使用階段。
而是要使用起來得心應手,就是想用redis和springboot能干的事情,都能夠很輕松的寫出來。
一句話:可以用它來展現自己所想。
后期有機會研究一下下面專題:(百度“redis實現”出現的關鍵詞)
- redis實現session共享
- redis實現分布式鎖原理
- redis實現分布式事務
- redis實現布隆過濾器
