ActiveMQ實現延遲消息隊列


https://my.oschina.net/u/3081871/blog/903920

 https://blog.csdn.net/yuyecsdn/article/details/90746833

 

####延遲消息使用場景 在實際業務中,比如說一些定時任務,超時處理等,在我們公司的業務中,訂單未支付超時關閉就是最典型的使用延遲消息隊列的場景。
####ActiveMQ如何實現延遲消息隊列
1.第一步需要修改activemq.xml配置文件,開啟延時發送

<broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" > ... </broker> 
  1. 第二步消息生產者在發送消息的時候需進行設置,上代碼
TextMessage message = session.createTextMessage("這是一條延遲消息”); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 200000L);//設置延遲時間 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 3000L);//設置重復投遞間隔(非必要,根據實際情況) message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 5L);//重復投遞次數(非必要,根據實際情況) messageProducer.send(message); 
    1. 發送消息 查看ActiveMQ Web管理端
      輸入圖片說明
      在管理端可以看到發送的延遲消息。
      ActiveMQ 發送延遲消息還支持 linux中corntab中的表達式。
    2. JAVA實現redis超時失效key 的監聽觸發

      1. 過期事件通過Redis的訂閱與發布功能(pub/sub)來進行分發。

        而對超時的監聽呢,並不需要自己發布,只有修改配置文件redis.conf中的:notify-keyspace-events Ex,默認為notify-keyspace-events "" 

        復制代碼
         1 # K    鍵空間通知,以__keyspace@<db>__為前綴  
         2 # E    鍵事件通知,以__keysevent@<db>__為前綴  
         3 # g    del , expipre , rename 等類型無關的通用命令的通知, ...  
         4 # $    String命令  
         5 # l    List命令  
         6 # s    Set命令  
         7 # h    Hash命令  
         8 # z    有序集合命令  
         9 # x    過期事件(每次key過期時生成)  
        10 # e    驅逐事件(當key在內存滿了被清除時生成)  
        11 # A    g$lshzxe的別名,因此”AKE”意味着所有的事件  
        復制代碼

        修改好配置文件后,redis會對設置了expire的數據進行監聽,當數據過期時便會將其從redis中刪除:

        1.先寫一個監聽器:

        復制代碼
         1 public class KeyExpiredListener extends JedisPubSub {  
         2   
         3     @Override  
         4     public void onPSubscribe(String pattern, int subscribedChannels) {  
         5         System.out.println("onPSubscribe "  
         6                 + pattern + " " + subscribedChannels);  
         7     }  
         8   
         9     @Override  
        10     public void onPMessage(String pattern, String channel, String message) {  
        11   
        12         System.out.println("onPMessage pattern "  
        13                         + pattern + " " + channel + " " + message);  
        14     }  
        15   
        16   
        17   
        18 }  
        復制代碼

         

        2.訂閱者:

        復制代碼
         1 public class Subscriber {  
         2   
         3     public static void main(String[] args) {  
         4         JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");  
         5   
         6         Jedis jedis = pool.getResource();  
         7         jedis.psubscribe(new KeyExpiredListener(), "__key*__:*");  
         8   
         9     }  
        10   
        11 }  
        復制代碼

         

        3.測試類:

        復制代碼
        public class TestJedis {  
          
            public static void main(String[] args) {  
                JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");  
          
                Jedis jedis = pool.getResource();  
                jedis.set("notify", "你還在嗎");  
                jedis.expire("notify", 10);  
          
            }  
        }  
        復制代碼

        4.結果:

        先啟動訂閱者,然后執行測試類,然后等待10秒之后再監聽類的方法中就可以獲得回調。非常需要主要的時,過期監聽的管道默認是__keyevent@0__:expired,艾特后面的0表示第幾個是數據庫,redis默認的數據庫是0~15一共16個數據庫。所以如果你存入的數據庫是2,那么數據接收的管道就是__keyevent@2__:expired


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM