redis隊列


1、lpush+rpop

  采用rpop需要不停調用rpop方法查看list中是否有待處理消息。每調用一次都會發起一次連接,造成不必要浪費

          

  代碼:

      producer:       

package com.eval.mind.service.redis;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;
public class Producer_Lpush extends Thread{
    
    public static final String MESSAGE_KEY = "message:queue";
    private Jedis jedis;
    private String producerName;
    private volatile int count;
 
    public Producer_Lpush(String name) {
        this.producerName = name;
        init();
    }
 
    private void init() {
        jedis=new Jedis("192.168.80.4",6379);
    }
    
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 當前未被處理消息條數為:" + size);
        count++;
    }
 
    public int getCount() {
        return count;
    }
    
    @Override
    public void run() {
        try {
            while (true) {
                putMessage(UUID.randomUUID().toString());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws InterruptedException{
        Producer_Lpush producer = new Producer_Lpush("myProducer");
        producer.start();
 
        for(; ;) {
            System.out.println("main : 已存儲消息條數:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}
View Code

    

         consumer:

               

package com.eval.mind.service.redis;

import redis.clients.jedis.Jedis;

/*
 * rpop從redis隊列中pop處一個String類型元素,String mes=jedis.rpop(key)
 * 缺點是消費端需要不停的調用rpop方法查看list是否有待處理消息。每調用依次都會發起依次連接,這會造成不必要浪費;為解決此參考Consumer_Brpop.java方法
 */
public class Consumer_Rpop extends Thread {
    private String customerName;
    private volatile int count;
    private Jedis jedis;

    public Consumer_Rpop(String name) {
            this.customerName = name;
            init();
        }

    private void init() {
        jedis = new Jedis("192.168.80.4", 6379);
    }

    public void processMessage() {
        String message = jedis.rpop(Producer_Lpush.MESSAGE_KEY); //如果redis隊列中沒有數據,也會一直調用
        if (message != null) {
            count++;
            handle(message);
        }
    }

    public void handle(String message) {
        System.out.println(customerName + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條");
    }

    @Override
    public void run() {
        while (true) {
            processMessage();
        }
    }

    public static void main(String[] args) {
        Consumer_Rpop customer = new Consumer_Rpop("yamikaze");
        customer.start();
    }
}
View Code

 

2、lpush+brpop

  brpop:blocking rpop,采用brpop時,如果redis隊列中不存在數據則調用List<String> messages=jedis.brpop(int i,key1,key2)時會阻塞,不會往下執行,一直等待redis隊列中再次push進數據后繼續執行pop操作

      * brpop支持多個列表(隊列)

   * brpop指令是支持隊列優先級的,比如這個例子中MESSAGE_KEY的優先級大於testKey(依據brpop中順序決定)。
   * 如果兩個列表中都有元素,會優先返回優先級高的列表中的元素,所以這兒優先返回MESSAGE_KEY
   * 0表示不限制等待,會一直阻塞在這兒

  代碼:

      producer:                              

package com.eval.mind.service.redis;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;
public class Producer_Lpush extends Thread{
    
    public static final String MESSAGE_KEY = "message:queue";
    private Jedis jedis;
    private String producerName;
    private volatile int count;
 
    public Producer_Lpush(String name) {
        this.producerName = name;
        init();
    }
 
    private void init() {
        jedis=new Jedis("192.168.80.4",6379);
    }
    
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 當前未被處理消息條數為:" + size);
        count++;
    }
 
    public int getCount() {
        return count;
    }
    
    @Override
    public void run() {
        try {
            while (true) {
                putMessage(UUID.randomUUID().toString());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws InterruptedException{
        Producer_Lpush producer = new Producer_Lpush("myProducer");
        producer.start();
 
        for(; ;) {
            System.out.println("main : 已存儲消息條數:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}
View Code

 

        consumer:   

package com.eval.mind.service.redis;

import java.util.List;

import org.springframework.util.CollectionUtils;

import redis.clients.jedis.Jedis;

/*
 * brpop:blocking rpop,這個指令只有在有元素時才返回,沒有則會阻塞到超時返回null;
 * brpop支持多個列表(隊列),支持隊列優先級,比如messagekey優先級大於testkey,如果兩個列表中都有元素,會優先返回messagekey列表中元素
 * 返回List<String>類型  List<String> messages=jedis.brpop(key);
 */
public class Consumer_Brpop extends Thread{
    private volatile int count;
    String name;
    private Jedis jedis;
    public Consumer_Brpop(String name) {
        this.name=name;
        init();
    }
    private void init() {
        jedis = new Jedis("192.168.80.4", 6379);
    }
    
    private void processMessage() {
        List<String> messages=jedis.brpop(0, "message:queue","message:tmp"); //如果redis隊列中沒有數據,此處會阻塞不會往下執行,直到redis隊列又push進數據
        if(!CollectionUtils.isEmpty(messages)) {
            String keyName=messages.get(0);
            String messageValue=messages.get(1);
            count++;
            handle(messageValue);
        }
    }
    
    public void handle(String message) {
        System.out.println(name + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條");
    }
    
    @Override
    public void run() {
        while (true) {
            processMessage();
        }
    }
    
    public static void main(String[] args) {
        Consumer_Brpop customer = new Consumer_Brpop("yamikaze");
        customer.start();
    }
}
View Code

 

 

  

3、publish+subscribe

   redis除了對消息隊列提供支持外,還提供一組命令用於支持發布/訂閱模式

   發布:

      publish指令可用於發布一條消息:publish channel message

   訂閱:

      subscribe指令用於接收一條消息:subscribe channel

      可以看到使用subscribe指令進入訂閱模式后,並沒有接收到publish發送的消息,這是因為只有在消息發送出去前才會收到,也就是說訂閱subscribe啟動要在          publish之前執行

   訂閱發布模式和消息隊列模式區別:消息隊列模式是通過key方式實現,取出就刪除了,其他進程取不到。訂閱發布可以支持多客戶端獲取同一個頻道                                (channel)發布的消息

    

      代碼:

    publish:     

package com.eval.mind.service.redis;

import org.apache.commons.lang3.StringUtils;

import redis.clients.jedis.Jedis;

public class Producer_Publish {
    public static final String Channel_key="channel:message";
    private Jedis jedis;
    
    public Producer_Publish() {
        jedis=new Jedis("192.168.80.4",6379);
    }
    
    public void publishMessage(String message) {
        if(StringUtils.isNoneBlank(message)) {
            return;
        }
        jedis.publish(Channel_key, message); //publish方法 發布消息
    }
    
    public static void main(String[] args) {
        Producer_Publish publisher=new Producer_Publish();
        publisher.publishMessage("Hello Publish Redis");
    }
}
View Code

 

    subscribe:   

package com.eval.mind.service.redis;

import org.apache.commons.lang3.StringUtils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class Consumer_Subscribe {

    private Jedis jedis;
    private static final String EXIT_COMMAND = "exit";

    public Consumer_Subscribe() {
        jedis = new Jedis("192.168.80.4", 6379);
    }

    public void subscribe(String channel) { // subscribe方法訂閱消息
        if (StringUtils.isBlank(channel)) {
            return;
        }
        // JedisPubSub類是一個沒有抽象方法的抽象類,里邊方法時一些空實現,可以選擇需要的方法覆蓋,這里使用的是subscribe指令,所以覆蓋了onMessage
        // 如果使用pubsubscribe指令則覆蓋onPmessage方法

        JedisPubSub jps = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                while (true) {       //相對於redis隊列只能消費一次,此處會對channel_key一直訂閱,
                    if (Producer_Publish.Channel_key.equals(channel)) {
                        System.out.println("接收到消息: channel : " + message);
                        // 接收到exit消息后退出
                        if (EXIT_COMMAND.equals(message)) {
                            System.exit(0);
                        }
                    }
                }
            }

            /**
             * 訂閱時
             */
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                if (Producer_Publish.Channel_key.equals(channel)) {
                    System.out.println("訂閱了頻道:" + channel);
                }
            }
        };
        jedis.subscribe(jps, channel);
    }

    public static void main(String[] args) {
        Consumer_Subscribe client = new Consumer_Subscribe();
        client.subscribe(Producer_Publish.Channel_key);
    }
}
View Code

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM