ActiveMQ实现延迟消息队列


https://my.oschina.net/u/3081871/blog/903920

 https://blog.csdn.net/yuyecsdn/article/details/90746833

 

####延迟消息使用场景 在实际业务中,比如说一些定时任务,超时处理等,在我们公司的业务中,订单未支付超时关闭就是最典型的使用延迟消息队列的场景。
####ActiveMQ如何实现延迟消息队列
1.第一步需要修改activemq.xml配置文件,开启延时发送

<broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" > ... </broker> 
  1. 第二步消息生产者在发送消息的时候需进行设置,上代码
TextMessage message = session.createTextMessage("这是一条延迟消息”); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 200000L);//设置延迟时间 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 3000L);//设置重复投递间隔(非必要,根据实际情况) message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 5L);//重复投递次数(非必要,根据实际情况) messageProducer.send(message); 
    1. 发送消息 查看ActiveMQ Web管理端
      输入图片说明
      在管理端可以看到发送的延迟消息。
      ActiveMQ 发送延迟消息还支持 linux中corntab中的表达式。
    2. JAVA实现redis超时失效key 的监听触发

      1. 过期事件通过Redis的订阅与发布功能(pub/sub)来进行分发。

        而对超时的监听呢,并不需要自己发布,只有修改配置文件redis.conf中的:notify-keyspace-events Ex,默认为notify-keyspace-events "" 

        复制代码
         1 # K    键空间通知,以__keyspace@<db>__为前缀  
         2 # E    键事件通知,以__keysevent@<db>__为前缀  
         3 # g    del , expipre , rename 等类型无关的通用命令的通知, ...  
         4 # $    String命令  
         5 # l    List命令  
         6 # s    Set命令  
         7 # h    Hash命令  
         8 # z    有序集合命令  
         9 # x    过期事件(每次key过期时生成)  
        10 # e    驱逐事件(当key在内存满了被清除时生成)  
        11 # A    g$lshzxe的别名,因此”AKE”意味着所有的事件  
        复制代码

        修改好配置文件后,redis会对设置了expire的数据进行监听,当数据过期时便会将其从redis中删除:

        1.先写一个监听器:

        复制代码
         1 public class KeyExpiredListener extends JedisPubSub {  
         2   
         3     @Override  
         4     public void onPSubscribe(String pattern, int subscribedChannels) {  
         5         System.out.println("onPSubscribe "  
         6                 + pattern + " " + subscribedChannels);  
         7     }  
         8   
         9     @Override  
        10     public void onPMessage(String pattern, String channel, String message) {  
        11   
        12         System.out.println("onPMessage pattern "  
        13                         + pattern + " " + channel + " " + message);  
        14     }  
        15   
        16   
        17   
        18 }  
        复制代码

         

        2.订阅者:

        复制代码
         1 public class Subscriber {  
         2   
         3     public static void main(String[] args) {  
         4         JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");  
         5   
         6         Jedis jedis = pool.getResource();  
         7         jedis.psubscribe(new KeyExpiredListener(), "__key*__:*");  
         8   
         9     }  
        10   
        11 }  
        复制代码

         

        3.测试类:

        复制代码
        public class TestJedis {  
          
            public static void main(String[] args) {  
                JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");  
          
                Jedis jedis = pool.getResource();  
                jedis.set("notify", "你还在吗");  
                jedis.expire("notify", 10);  
          
            }  
        }  
        复制代码

        4.结果:

        先启动订阅者,然后执行测试类,然后等待10秒之后再监听类的方法中就可以获得回调。非常需要主要的时,过期监听的管道默认是__keyevent@0__:expired,艾特后面的0表示第几个是数据库,redis默认的数据库是0~15一共16个数据库。所以如果你存入的数据库是2,那么数据接收的管道就是__keyevent@2__:expired


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM