redis 延時隊列


實現思路 

方式一 

1. 使用redis zset 數據結構   2.使用score排序   score為過期時間點   3.啟動線程不斷取出排序第一個  比較score和當前時間點   如果score小於或等於當前時間  說明此數據過期  需要處理  4.處理完畢在zset中移除

public class TestMain {
    private static final String ADDR="39.96.77.182";
    private static final int PORT=6379;
    //初始化jedis
    private static JedisPool jedisPool=new JedisPool(new GenericObjectPoolConfig(),ADDR,PORT,10000);
    public static Jedis getJedis() {
        return jedisPool.getResource();
    }
    //消息入隊
    public void productionDelayMessage(){
        //延遲5秒
        Calendar cal1 = Calendar.getInstance();
        cal1.add(Calendar.SECOND, 5);
        int second3later = (int) (cal1.getTimeInMillis() / 1000);
        Long orderId = TestMain.getJedis().zadd("OrderId", second3later, "OID0000001" );
        System.out.println(new Date()+"ms:redis生成了一個訂單任務:訂單ID為"+"OID0000001"+"==============="+orderId);
    }
    //消費者取訂單
    public void consumerDelayMessage(){
        Jedis jedis = TestMain.getJedis();
        while(true){
            Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);
            if(items == null || items.isEmpty()){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                continue;
            }
            int  score = (int) ((Tuple)items.toArray()[0]).getScore();
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if(nowSecond >= score){
                String orderId = ((Tuple)items.toArray()[0]).getElement();
                Long num = jedis.zrem("OrderId", orderId);
                System.out.println(num);
                if( num != null && num>0){
                    System.out.println(new Date() +"ms:redis消費了一個任務:消費的訂單OrderId為"+orderId);
                }

            }
        }
    }

    

    public static void main(String[] args) {
        TestMain appTest =new TestMain();
        appTest.productionDelayMessage();
        appTest.consumerDelayMessage();
    }

  執行結果

 

方式二:  redis過期回調

修改redis 配置 redis.conf   添加notify-keyspace-events Ex

編寫測試demo 

新建boot工程 加入redis依賴

新建redisconfig  注入 RedisMessageListenerContainer  Bean

@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

 編寫redis 監聽類  繼承KeyExpirationEventMessageListener

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("監聽到過期的key為:"+expiredKey);
    }
}

  運行boot工程    使用redis 客戶端 redis desktop manager添加一個key  設置過期時間

 

 延時兩秒  在工程控制太看到輸出

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM