Redis為我們提供了publish/subscribe(發布/訂閱)功能。我們可以對某個channel(頻道)進行subscribe(訂閱),當有人在這個channel上publish(發布)消息時,redis就會通知我們,這樣我們可以收到別人發布的消息。
作為Java的redis客戶端,Jedis提供了publish/subscribe的接口。本文講述如何使用Jedis來實現redis的publish/subscribe。
定義Subscriber類
Jedis定義了抽象類JedisPubSub
,在這個類中,定義publish/subsribe的回調方法。通過繼承JedisPubSub
類並重新實現這些回調方法,當publish/subsribe事件發生時,我們可以定制自己的處理邏輯。
在以下例子中,我們定義了Subscriber
類,這個類繼承了JedisPubSub
類,並重新實現了其中的回調方法。
import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub { public Subscriber() { } public void onMessage(String channel, String message) { System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message)); } public void onSubscribe(String channel, int subscribedChannels) { System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } }
定義SubThread線程類
由於Jedis的subscribe
操作是阻塞的,因此,我們另起一個線程來進行subscribe操作。
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); private final String channel = "mychannel"; public SubThread(JedisPool jedisPool) { super("SubThread"); this.jedisPool = jedisPool; } @Override public void run() { System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel)); Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(subscriber, channel); } catch (Exception e) { System.out.println(String.format("subsrcibe channel error, %s", e)); } finally { if (jedis != null) { jedis.close(); } } } }
在上面的代碼中,我們從JedisPool
獲取一個Jedis
實例,並使用這個Jedis
實例進行subscribe
的操作。Jedis
的subscribe
的聲明如下:
public void subscribe(final JedisPubSub jedisPubSub, final String… channels)
第一個參數接受一個JedisPubSub
對象,第二個參數指定對哪個頻道進行訂閱。上例中,我們把我們定義的Subscriber
對象傳給subscribe
方法。
當publish/subscribe的事件發生時,會自動調用我們Subscriber
的方法。
定義Publisher類
Publisher
類接受用戶的輸入,並將輸入發布到channel。當用戶輸入”quit”后,輸入結束。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class Publisher { private final JedisPool jedisPool; public Publisher(JedisPool jedisPool) { this.jedisPool = jedisPool; } public void start() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); while (true) { String line = null; try { line = reader.readLine(); if (!"quit".equals(line)) { jedis.publish("mychannel", line); } else { break; } } catch (IOException e) { e.printStackTrace(); } } } }
定義入口代碼
如下是我們的程序入口代碼。
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class PubSubDemo { public static void main( String[] args ) { // 替換成你的reids地址和端口 String redisIp = "192.168.229.154"; int reidsPort = 6379; JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort); System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort)); SubThread subThread = new SubThread(jedisPool); subThread.start(); Publisher publisher = new Publisher(jedisPool); publisher.start(); } }
在上面的代碼中,我們首先生成了一個JedisPool
的redis連接池,這是由於Jedis
不是線程安全的,JedisPool
是線程安全的。而我們的程序在主線程和訂閱線程(SubThread)均需要使用Jedis
,故在程序中我們使用JedisPool
。具體也可以參考在多線程環境中使用Jedis。
由於Jedis
的subcribe操作是阻塞的,故我們另起了一個線程來進行subcribe操作。
通過調用Publisher::start()
方法,接受用戶的輸入,並publish到指定的channel。
輸出
redis pool is starting, redis ip 192.168.229.154, redis port 6379
subscribe redis, channel mychannel, thread will be blocked
subscribe redis channel success, channel mychannel, subscribedChannels 1
這時輸入
hello
控制窗口中輸出
receive redis published message, channel mychannel, message hello