Jedis實現發布訂閱功能


Redis為我們提供了publish/subscribe(發布/訂閱)功能。我們可以對某個channel(頻道)進行subscribe(訂閱),當有人在這個channel上publish(發布)消息時,redis就會通知我們,這樣我們可以收到別人發布的消息。
作為Java的redis客戶端,Jedis提供了publish/subscribe的接口。本文講述如何使用Jedis來實現redis的publish/subscribe。

定義Subscriber類

Jedis定義了抽象類JedisPubSub,在這個類中,定義publish/subsribe的回調方法。通過繼承JedisPubSub類並重新實現這些回調方法,當publish/subsribe事件發生時,我們可以定制自己的處理邏輯。

在以下例子中,我們定義了Subscriber類,這個類繼承了JedisPubSub類,並重新實現了其中的回調方法。

import redis.clients.jedis.JedisPubSub;


public class Subscriber extends JedisPubSub {
    public Subscriber() {
    }

    public void onMessage(String channel, String message) {
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }

    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", 
                channel, subscribedChannels));
    }

    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", 
                channel, subscribedChannels));

    }
}
 

定義SubThread線程類

由於Jedis的subscribe操作是阻塞的,因此,我們另起一個線程來進行subscribe操作。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;


public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

 

在上面的代碼中,我們從JedisPool獲取一個Jedis實例,並使用這個Jedis實例進行subscribe的操作。
Jedissubscribe的聲明如下:

public void subscribe(final JedisPubSub jedisPubSub, final String… channels)

第一個參數接受一個JedisPubSub對象,第二個參數指定對哪個頻道進行訂閱。上例中,我們把我們定義的Subscriber對象傳給subscribe方法。
當publish/subscribe的事件發生時,會自動調用我們Subscriber的方法。

定義Publisher類

Publisher類接受用戶的輸入,並將輸入發布到channel。當用戶輸入”quit”后,輸入結束。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Publisher {
    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public void start() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
         }
    }
}

 

定義入口代碼

如下是我們的程序入口代碼。

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class PubSubDemo 
{
    public static void main( String[] args )
    {
        // 替換成你的reids地址和端口
        String redisIp = "192.168.229.154";
        int reidsPort = 6379;
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort));

        SubThread subThread = new SubThread(jedisPool);
        subThread.start();

        Publisher publisher = new Publisher(jedisPool);
        publisher.start();
    }
}

 

在上面的代碼中,我們首先生成了一個JedisPool的redis連接池,這是由於Jedis不是線程安全的,JedisPool是線程安全的。而我們的程序在主線程和訂閱線程(SubThread)均需要使用Jedis,故在程序中我們使用JedisPool。具體也可以參考在多線程環境中使用Jedis
由於Jedis的subcribe操作是阻塞的,故我們另起了一個線程來進行subcribe操作。
通過調用Publisher::start()方法,接受用戶的輸入,並publish到指定的channel。

輸出

redis pool is starting, redis ip 192.168.229.154, redis port 6379
subscribe redis, channel mychannel, thread will be blocked
subscribe redis channel success, channel mychannel, subscribedChannels 1

這時輸入

hello

控制窗口中輸出

receive redis published message, channel mychannel, message hello

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM