分布式 redis 延時任務 基於 springboot 示例


Lilishop 技術棧

官方公眾號 & 開源不易,如有幫助請點Star

image-20210511171611793

介紹

官網https://pickmall.cn

Lilishop 是一款Java開發,基於SpringBoot研發的B2B2C多用戶商城,前端使用 Vue、uniapp開發 系統全端全部代碼開源

本系統用於教大家如何運用系統中的每一個細節,如:支付、第三方登錄、日志收集、分布式事務、秒殺場景等各個場景學習方案

git地址 https://gitee.com/beijing_hongye_huicheng/lilishop-spring-learning

本文學習 分布式延時任務

延時任務介紹

即指定一個時間,執行提前約定好的任務,例如:定時取消訂單,定時上下架商品,定時開啟活動等。

延時任務與定時任務的區別

延時任務適用於個性化的業務場景,比如某訂單自動取消,某活動自動開啟,某商品自動上下架子。還有一個就是較為精確的,需要實時的事情。

而定時任務適用於全平台的業務,比如計算商品評分統一結算,分銷中的可提現金額批量結算,平台統計/店鋪統計數據生成等。總的來說就是定時掃描,每天,每小時,每分鍾,每個月,不管怎么樣都要執行。比如定時上下架,用定時任務也可以,但是要實現精確的任務調度,創建一個每秒任務,是不太理智的。

兩個場景需要互補,具體應用什么場景,可以再自己斟酌斟酌。

思路介紹

  1. 項目啟動時啟用一個線程,線程用於間隔一定時間去查詢redis的待執行任務。其任務id為對象json格式化之后的字符串,值為要執行的時間。
  2. 查詢到執行的任務時,將其從redis的信息中進行刪除。(刪除成功才執行延時任務,否則不執行,這樣可以避免分布式系統延時任務多次執行。)
  3. 刪除redis中的記錄之后,啟用子線程執行任務。將執行id,也就是json的字符串翻轉回要執行的任務信息,這樣可以得到用什么執行器去執行任務,參數有哪些。
  4. 執行延時任務

實際使用

實際場景中,還會設計延時任務修改,刪除等,這些場景建議在執行任務創建時,redis標記要執行的任務,如果刪除或者修改任務時,修改redis中的標識即可,當然也可以在業務邏輯中做補充的條件判定,都可以。

另外具體執行任務建議使用mq去實現,相當於在執行任務時,線程只是發布一個mq,交給消費者去消費具體的事情。

代碼中的進程掃描5秒,也就代表一個延時任務最多延遲5秒去執行,實戰場景中可以調整至1秒,或者更低,但是不太建議。另外redis的性能杠杠的,不用太擔心redis的連接數導致性能問題。

使用步驟

  1. 啟用redis,可以本地啟動,也可以用ELK中docker-compose啟動。

  2. 啟動springboot應用。

  3. 請求springboot 應用 http://127.0.0.1:8080

  4. 查看控制台輸出內容

    2021-06-09 12:41:33.168 INFO 40730 --- [nio-8888-exec-1] l.t.p.d.AbstractDelayQueueMachineFactory : 增加延時任務, 緩存key test_delay, 等待時間 10
    2021-06-09 12:41:33.168 INFO 40730 --- [nio-8888-exec-1] c.l.t.p.i.impl.RedisTimerTrigger : 定時執行在【2021-06-09 12:41:43】,消費【test params】
    2021-06-09 12:41:44.399 INFO 40730 --- [ Thread-5] l.t.p.d.AbstractDelayQueueMachineFactory : 延時任務開始執行任務:[{"score":1.623213703E9,"value":"{"triggerTime":1623213703,"triggerExecutor":"testTimeTriggerExecutor","param":"test params"}"}]
    2021-06-09 12:41:44.403 INFO 40730 --- [pool-2-thread-2] c.l.t.p.i.e.TestTimeTriggerExecutor : 執行器具執行任務test params

關鍵類介紹

緩存操作類 用於延時任務的核型邏輯,間隔查詢需要執行的延時任務,考的就是redis的Sorted Set屬性來試下排序,執行任務。
/**
 * 向Zset里添加成員
 *
 * @param key   key值
 * @param score 分數,通常用於排序
 * @param value 值
 * @return 增加狀態
 */
@Override
public boolean zAdd(String key, long score, String value) {
    Boolean result = redisTemplate.opsForZSet().add(key, value, score);
    return result;

}


/**
 * 獲取 某key 下 某一分值區間的隊列
 *
 * @param key  緩存key
 * @param from 開始時間
 * @param to   結束時間
 * @return 數據
 */
@Override
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {
    Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
    return set;
}

/**
 * 移除 Zset隊列值
 *
 * @param key   key值
 * @param value 刪除的集合
 * @return 刪除數量
 */
@Override
public Long zRemove(String key, String... value) {
    return redisTemplate.opsForZSet().remove(key, value);
}
延時隊列 抽象類,具體延時隊列需繼承
package cn.lili.trigger.plugin.delay;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.cache.Cache;
import cn.lili.trigger.plugin.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 延時隊列工廠
 *
 * @author paulG
 * @since 2020/11/7
 **/
@Slf4j
public abstract class AbstractDelayQueueMachineFactory {

    @Autowired
    private Cache cache;

    /**
     * 插入任務id
     *
     * @param jobId 任務id(隊列內唯一)
     * @param time  延時時間(單位 :秒)
     * @return 是否插入成功
     */
    public boolean addJob(String jobId, Integer time) {
        Calendar instance = Calendar.getInstance();
        instance.add(Calendar.SECOND, time);
        long delaySeconds = instance.getTimeInMillis() / 1000;
        boolean result = cache.zAdd(setDelayQueueName(), delaySeconds, jobId);
        log.info("增加延時任務, 緩存key {}, 等待時間 {}", setDelayQueueName(), time);
        return result;

    }

    /**
     * 延時隊列機器開始運作
     */
    private void startDelayQueueMachine() {
        log.info("延時隊列機器{}開始運作", setDelayQueueName());

        // 監聽redis隊列
        while (true) {
            try {
                // 獲取當前時間的時間戳
                long now = System.currentTimeMillis() / 1000;
                // 獲取當前時間前的任務列表
                Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);

                // 如果任務不為空
                if (!CollectionUtils.isEmpty(tuples)) {
                    log.info("延時任務開始執行任務:{}", JSONUtil.toJsonStr(tuples));

                    for (DefaultTypedTuple tuple : tuples) {
                        String jobId = (String) tuple.getValue();
                        // 移除緩存,如果移除成功則表示當前線程處理了延時任務,則執行延時任務
                        Long num = cache.zRemove(setDelayQueueName(), jobId);
                        // 如果移除成功, 則執行
                        if (num > 0) {
                            ThreadPoolUtil.execute(() -> invoke(jobId));
                        }
                    }
                }

            } catch (Exception e) {
                log.error("處理延時任務發生異常,異常原因為{}", e.getMessage(), e);
            } finally {
                // 間隔5秒鍾搞一次
                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

    }

    /**
     * 最終執行的任務方法
     *
     * @param jobId 任務id
     */
    public abstract void invoke(String jobId);


    /**
     * 要實現延時隊列的名字
     */
    public abstract String setDelayQueueName();


    @PostConstruct
    public void init() {
        new Thread(this::startDelayQueueMachine).start();
    }

}
延時隊列示例實現
package cn.lili.trigger.plugin.delay;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 測試延時隊列
 *
 * @author paulG
 * @version v4.1
 * @date 2020/11/17 7:19 下午
 * @description
 * @since 1
 */
@Component
public class TestDelayQueue extends AbstractDelayQueueMachineFactory {

    @Autowired
    private TimeTrigger timeTrigger;

    @Override
    public void invoke(String jobId) {
        TimeTriggerMsg timeTriggerMsg = JSONUtil.toBean(jobId, TimeTriggerMsg.class);

        TimeTriggerExecutor executor = (TimeTriggerExecutor) SpringContextUtil.getBean(timeTriggerMsg.getTriggerExecutor());
        executor.execute(timeTriggerMsg.getParam());

    }

    @Override
    public String setDelayQueueName() {
        return "test_delay";
    }
}
延時任務接口
package cn.lili.trigger.plugin.interfaces;


import cn.lili.trigger.plugin.model.TimeTriggerMsg;

/**
 * 延時執行接口
 *
 * @author Chopper
 */
public interface TimeTrigger {


    /**
     * 添加延時任務
     *
     * @param timeTriggerMsg 延時任務信息
     */
    void add(TimeTriggerMsg timeTriggerMsg);

}
Redis延時任務實現類
package cn.lili.trigger.plugin.interfaces.impl;

import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.delay.TestDelayQueue;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * redis 延時任務
 *
 * @author Chopper
 * @version v1.0
 * 2021-06-09 11:00
 */
@Component
@Slf4j
public class RedisTimerTrigger implements TimeTrigger {

    @Autowired
    private TestDelayQueue testDelayQueue;

    @Override
    public void add(TimeTriggerMsg timeTriggerMsg) {
        //計算延遲時間 執行時間-當前時間
        Integer delaySecond = Math.toIntExact(timeTriggerMsg.getTriggerTime() - DateUtil.getDateline());
        //設置延時任務
        if (Boolean.TRUE.equals(testDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), delaySecond))) {
            log.info("定時執行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消費【" + timeTriggerMsg.getParam().toString() + "】");
        } else {
            log.error("延時任務添加失敗:{}", timeTriggerMsg);
        }
    }
}
延時任務執行器接口
package cn.lili.trigger.plugin.interfaces;

/**
 * 延時任務執行器接口
 *
 * @author Chopper
 */
public interface TimeTriggerExecutor {


    /**
     * 執行任務
     *
     * @param object 任務參數
     */
    void execute(Object object);

}
延時任務實現
package cn.lili.trigger.plugin.interfaces.execute;

import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * test執行器
 *
 * @author Chopper
 * @version v1.0
 * 2021-06-09 10:49
 */
@Component
@Slf4j
public class TestTimeTriggerExecutor implements TimeTriggerExecutor {

    @Override
    public void execute(Object object) {
        log.info("執行器具執行任務{}", object);
    }
}
延時任務消5息模型
package cn.lili.trigger.plugin.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 延時任務消息
 *
 * @author Chopper
 * @version v1.0
 * @since 2019-02-12 下午5:46
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TimeTriggerMsg implements Serializable {


    private static final long serialVersionUID = 8897917127201859535L;

    /**
     * 執行器 執行時間
     */
    private Long triggerTime;
    /**
     * 執行器beanId
     */
    private String triggerExecutor;


    /**
     * 執行器參數
     */
    private Object param;


}
控制器
package cn.lili.trigger.controller;

import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class TestController {

    @Autowired
    private TimeTrigger timeTrigger;

    @GetMapping
    public void test(Integer seconds) {
        Long executeTime = DateUtil.getDateline() + 5;
        if (seconds != null) {
            executeTime = DateUtil.getDateline() + seconds;
        }
        TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executeTime, "testTimeTriggerExecutor", "test params");
        timeTrigger.add(timeTriggerMsg);

    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM