第一步:創建一個發布者
package work; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * 發布 * author:songyan * date: 2019/10/17 **/ public class Publisher extends Thread { private final JedisPool jedisPool; private String chanelName; public Publisher(JedisPool jedisPool, String chanelName) { this.jedisPool = jedisPool; this.chanelName = chanelName; System.out.println("【發布者\""+chanelName+"\"初始化成功】"); System.out.println("請輸入要發布的消息:"); } @Override public void run() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); while (true) { String line = null; try { line = reader.readLine(); if (!"quit".equals(line)) { System.out.println(chanelName+"發布消息成功"); jedis.publish(chanelName, line); } else { break; } } catch (IOException e) { e.printStackTrace(); } } } }
第二步:創建一個訂閱者的監聽器
package work; import redis.clients.jedis.JedisPubSub; /** * 監聽 * author:songyan * date: 2019/10/17 **/ public class SubscriberListener extends JedisPubSub { private String subName; public SubscriberListener(String subName) { this.subName = subName; } // 取得訂閱的消息后的處理 public void onMessage(String channel, String message) { System.out.println(String.format("【"+subName + "接收到消息】頻道:%s;消息:%s。" , channel , message)); } // 初始化訂閱時候的處理 public void onSubscribe(String channel, int subscribedChannels) { System.out.println(String.format( "【"+subName + "訂閱頻道成功】頻道:%s;頻道數:%d。" , channel , subscribedChannels)); } // 取消訂閱時候的處理 public void onUnsubscribe(String channelName, int subscribedChannels) { System.out.println(String.format( "【"+subName + "取消訂閱】頻道:%s;頻道數:%d。",channelName , subscribedChannels)); } }
第三步:創建一個訂閱者
package work; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * 訂閱 * author:songyan * date: 2019/10/17 **/ public class Subscriber extends Thread { //jedis連接池 private final JedisPool jedisPool; private final SubscriberListener subscriberListener; private String channelName; public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) { super(); this.jedisPool = jedisPool; this.subscriberListener = subscriberListener; this.channelName = channelName; } @Override public void run() { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(subscriberListener,channelName);// 通過jedis.subscribe()方法去訂閱,入參是1.訂閱者、2.頻道名稱 } catch (Exception e) { e.printStackTrace(); System.out.println(String.format("頻道訂閱失敗:%s",e)); } finally { if (null != jedis) jedis.close(); } } }
第四步:測試(編寫客戶端)
(1)發布者客戶端
package work.test; import redis.clients.jedis.JedisPool; import work.Publisher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 發布測試 * author:songyan * date: 2019/10/17 **/ public class PublishClient { public static void main(String[] args) throws InterruptedException { //創建連接池 JedisPool jedisPool = new JedisPool("192.168.159.133"); //創建線程池,並設定線程數量 ExecutorService executorService = Executors.newFixedThreadPool(5); //創建一個發布者 Publisher publisher = new Publisher(jedisPool,"發布者1"); executorService.submit(publisher); executorService.shutdown(); executorService.awaitTermination(600, TimeUnit.SECONDS); } }
執行main方法,創建一個發布者。
(2)訂閱者客戶端
package work.test; import redis.clients.jedis.JedisPool; import work.Subscriber; import work.SubscriberListener; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 訂閱者客戶端 * author:songyan * date: 2019/10/17 **/ public class SubscriberClient { public static void main(String[] args) throws InterruptedException { //創建redis連接池 JedisPool jedisPool = new JedisPool("192.168.159.133"); //創建線程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //創建訂閱者 final SubscriberListener subscriberListener = new SubscriberListener("訂閱者一號"); //訂閱頻道 Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "發布者1"); executorService.submit(subscriber); executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); //30s后取消訂閱 Thread.sleep(3000); subscriberListener.onUnsubscribe("發布者1", 0); } }
執行main方法,創建一個訂閱者(訂閱上面發布者的頻道)。
發布者發布信息:
訂閱者接收到訂閱信息:
訂閱者取消訂閱: