手把手教你用redis實現一個簡單的mq消息隊列(java)


眾所周知,消息隊列是應用系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.

但是如果你不想為你的系統引入一個重量級(相對 redis 來說)的 mq,但是想要享受解耦、異步消息等特性,通過本文你就 get 到了,通過 redis 實現一個簡單版的 mq。

為什么是 redis

  • redis 通常作為緩存服務引入,因此大部分系統都會有 redis
  • redis 本身的資源消耗是極小的,符合我們的輕量要求
  • redis 速度很快,幾乎不會出現速度瓶頸
  • redis 有持久化方案,調整配置項可以在數據安全和速度間進行取舍(參考這篇)[https://segmentfault.com/a/1190000002906345]

如何實現

利用 redis 的隊列結構來實現消息隊列。redis 單個隊列最多支持 2*32-1 條數據,對於大部分應用是完全夠用的。

簡單來說就是:

  • 每個 topic 對應一條隊列
  • 從隊列一段寫入數據,從另一端讀取數據
  • 消費失敗,重新將消息放入隊列

注意:代碼僅供個人嘗鮮使用,請勿用於真實生產環境

代碼僅可在 springboot 環境中使用

首先定義注解和接口類

注解代碼如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
    /**
     * 隊列主題
     */
    String topic() default "default_es_topic";
}

被該注解修飾的類,將會接收 topic 下的消息。

接口代碼如下:

public interface RedisConsumer {

    /**
     * 功能描述: 消費方法,消費者類必須繼承此方法
     *
     * @param message 數據載體
     * @author 123
     * @date 2020/3/28 22:41
     */
    void deal(String message);
}

本接口用於定於接受消息的處理方法。

掃描注解修飾類

本部分為核心代碼,首先需要獲取代碼中被注解修飾的類,然后建立一個循環從 redis 隊列中取數據,最后調用類對象的 deal 方法消費消息,如果 deal 方法拋出錯誤,認為消費失敗,重新將該數據放入隊列中。

  1. 掃描部分代碼如下:
/**
 *  MqConfiguration.java
 */
@Override
public void run(ApplicationArguments args) {
    Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
    map.values().forEach(item -> {
        if (!(item instanceof RedisConsumer)) {
            log.warn("注意檢測到被@EsConsumer注解的類{}未實現RedisConsumer接口", item.getClass().getCanonicalName());
            return;
        }
        MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
        MqConsumer annotation = annotations[0];
        String topic = annotation.topic();
        if (topicMap.containsKey(topic)) {
            log.error("多個消費者{},消費同一個消息:{},已忽略", item.getClass().getCanonicalName(), topic);
        } else {
            topicMap.put(topic, (RedisConsumer) item);
        }

    });
    log.info("redis訂閱信息匯總完畢!!!!!!");
    //由一個線程始終循環獲取es隊列數據
    threadPoolExecutor.execute(loop());
}

run 方法在 spring 掃描完畢后調用,通過實現ApplicationRunner接口實現,通過 spring 的方法來獲取所有被MqConsumer接口注解的類(否則需要自己寫類加載器)。數據匯總完畢后使用一個線程來進行無線循環從 redis 隊列中取數據。

  1. 執行線程部分代碼如下:
private Runnable loop() {
    return () -> {
        while (true) {
            AtomicInteger count = new AtomicInteger(0);
            topicMap.forEach((k, v) -> {
                try {
                    String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
                    if (message == null) {
                        count.getAndIncrement();
                    } else {
                        pushTask(v, message, k);
                    }
                } catch (RedisConnectionFailureException connException) {
                    log.error("redis無法連接,10s后重試", connException);
                    sleep(10);
                } catch (Exception e) {
                    log.error("redis消息隊列異常", e);
                }
            });
            if (count.get() == topicMap.keySet().size()) {
                //當所有的隊列都為空時休眠1s
                sleep(1);
            }
        }
    };
}
private void pushTask(RedisConsumer item, String value, String key) {
    threadPoolExecutor.execute(() -> {
        try {
            item.deal(value);
        } catch (Exception e) {
            log.error("執行消費任務出錯", e);
            //非廣播消息進行數據回補
            mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
        }
    });
}

loop 方法無限循環根據 topic 從 redis 中取數據,如果取到數據,調用 pushTask 方法執行,如果執行報錯將會進行數據回補。

完整代碼見本文結尾

測試

運行項目后調用,MainController中的接口即可測試。

完整代碼:github

本文原創發布於:手把手教你用 redis 實現一個簡單的 mq 消息隊列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM