我在生產項目里是如何使用Redis發布訂閱的?(二)Java版代碼實現(含源碼)


上篇文章講了在實際項目里的哪些業務場景用到Redis發布訂閱,這篇文章就講一下,在Java中如何實現的。

 

圖解代碼結構

發布訂閱的理論以及使用場景大家都已經有了大致了解了,但是怎么用代碼實現發布訂閱呢?在這里給大家分享一下實現方式。

 

我們以上篇文章的第三種使用場景為例,先來看一下整體實現類圖吧。

 

解釋一下,這里我們首先定義一個統一接口`ICacheUpdate`,只有一個`update`方法,我們令`Service`層實現這個方法,執行具體的更新操作。

 

我們再來看`RedisMsgPubSub`,它繼承`redis.clients.jedis.JedisPubSub`,主要重寫其`onMessage()`方法(訂閱的頻道有消息到來時會觸發這個方法),我們在這個方法里調用`RedisMsgPubSub`的`update`方法執行更新操作。

 

當我們有多個`Service`實現`ICacheUpdate`時,我們就非常迫切地需要一個管理器來集中管理這些`Service`,並且當觸發onMessage方法時要告訴onMessage方法具體調用哪個`ICacheUpdate`的實現類,所以我們有了`PubSubManager`。並且我們單獨開啟一個線程來維護發布訂閱,所以管理器繼承了`Thread`類。

 

代碼實現

具體代碼:

統一接口

public interface ICacheUpdate {
    public void update(); }

 

Service層

實現ICacheUpdate的update方法,執行具體的更新操作

public class InfoService implements ICacheUpdate {
  private static Logger logger = LoggerFactory.getLogger(InfoService.class); @Autowired private RedisCache redisCache; @Autowired private InfoMapper infoMapper; /** * 按信息類型分類查詢信息 * @return */ public Map<String, List<Map<String, Object>>> selectAllInfo(){ Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>(); List<String> infoTypeList = infoMapper.selectInfoType();//信息表中所有涉及的信息類型 logger.info("-------按信息類型查找公共信息開始----"+infoTypeList); if(infoTypeList!=null && infoTypeList.size()>0) { for (String infoType : infoTypeList) { List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType); resultMap.put(infoType, result); } } return resultMap; } @Override public void update() { //緩存首頁信息 logger.info("InfoService selectAllInfo 刷新緩存"); Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo(); Set<String> keySet = resultMap.keySet(); for(String key:keySet){ List<Map<String, Object>> value = resultMap.get(key); redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value); } } }

Redis發布訂閱的擴展類

 作用:

 1、統一管理ICacheUpdate,把所有實現ICacheUpdate接口的類添加到updates容器

 2、重寫onMessage方法,訂閱到消息后進行刷新緩存的操作

public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class); private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>(); //1、由updates統一管理ICacheUpdate public boolean addListener(String key , ICacheUpdate update) { if(update == null) return false;     updates.put(key, update);     return true; } /** * 2、重寫onMessage方法,訂閱到消息后進行刷新緩存的操作 * 訂閱頻道收到的消息 */ @Override public void onMessage(String channel, String message) { logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message); ICacheUpdate updater = null; if(StringUtil.isNotEmpty(message)) updater = updates.get(message); if(updater!=null) updater.update(); } //other code... }

發布訂閱的管理器

執行的操作:

1、將所有需要刷新加載的Service類(實現ICacheUpdate接口)添加到RedisMsgPubSub的updates中

2、啟動線程訂閱pubsub_config頻道,收到消息后的五秒后再次訂閱(避免訂閱到一次消息后結束訂閱)

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class); public static Jedis jedis; RedisMsgPubSub msgPubSub = new RedisMsgPubSub(); //頻道 public static final String PUNSUB_CONFIG = "pubsub_config"; //1.將所有需要刷新加載的Service類(實現ICacheUpdate接口)添加到RedisMsgPubSub的updates中 public boolean addListener(String key, ICacheUpdate listener){ return msgPubSub.addListener(key,listener); } @Override public void run(){ while (true){ try { JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class); if(jedisPool!=null){ jedis = jedisPool.getResource(); if(jedis!=null){ //2.啟動線程訂閱pubsub_config頻道 阻塞  jedis.subscribe(msgPubSub,PUNSUB_CONFIG); } } } catch (Exception e) { logger.error("redis connect error!"); } finally { if(jedis!=null) jedis.close(); } try { //3.收到消息后的五秒后再次訂閱(避免訂閱到一次消息后結束訂閱) Thread.sleep(5000); } catch (InterruptedException e) { logger.error("InterruptedException in redis sleep!"); } } } }

到此,Redis的發布訂閱大致已經實現。我們什么時候啟用呢?我們可以選擇在啟動項目時完成訂閱和基礎數據的加載,所以我們通過實現`javax.servlet.SevletContextListener`來完成這一操作。然后將監聽器添加到`web.xml`。

CacheInitListener.java

/**
 * 加載系統參數
 */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class); @Override public void contextDestroyed(ServletContextEvent arg0) { } @Override public void contextInitialized(ServletContextEvent arg0) { logger.info("---CacheListener初始化開始---"); init(); logger.info("---CacheListener初始化結束---"); } public void init() { try { //獲得管理器 PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class); InfoService infoService = SpringTools.getBean("infoService", InfoService.class); //添加到管理器 pubSubManager.addListener("infoService", infoService); //other service... //啟動線程執行訂閱操作  pubSubManager.start(); //初始化加載  loadParamToRedis(); } catch (Exception e) { logger.info(e.getMessage(), e); } } private void loadParamToRedis() { InfoService infoService = SpringTools.getBean("infoService", InfoService.class); infoService.update(); //other service...  } }

web.xml

<listener>
  <listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>

【end】

上篇文章講了在實際項目里的哪些業務場景用到Redis發布訂閱,這篇文章就講一下,在Java中如何實現的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM