Java實現Redis消息隊列


這里我使用Redis的發布、訂閱功能實現簡單的消息隊列,基本的命令有publish、subscribe等。

在Jedis中,有對應的java方法,但是只能發布字符串消息。為了傳輸對象,需要將對象進行序列化,並封裝成字符串進行處理。

使用Redis實現消息隊列


 

封裝一個消息對象

public class Message implements Serializable{

private static final long serialVersionUID = 1L;

private String titile;
private String info;

public Message(String titile,String info){
this.titile=titile;
this.info=info;
}

public String getTitile() {
return titile;
}
public void setTitile(String titile) {
this.titile = titile;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}

  

為這個消息對象提供序列化方法

public class MessageUtil {

//convert To String
public static String convertToString(Object obj,String charset) throws IOException{

ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return str;	
}

//convert To Message
public static Object convertToMessage(byte[] bytes) throws Exception{
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
return sIn.readObject();

}
}

  

從Jedis連接池中獲取連接

public class RedisUtil {

/**
* Jedis connection pool
* @Title: config
*/
public static JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle("redis");
String host=bundle.getString("host");
int port=Integer.valueOf(bundle.getString("port"));
int timeout=Integer.valueOf(bundle.getString("timeout"));
//	String password=bundle.getString("password");

JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString("maxActive")));
config.setMaxWait(Integer.valueOf(bundle.getString("maxWait")));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString("testOnBorrow")));
config.setTestOnReturn(Boolean.valueOf(bundle.getString("testOnReturn")));

JedisPool pool=new JedisPool(config, host, port, timeout);

return pool;
}
}

  

創建Provider類

public class Producer {

private Jedis jedis;
private JedisPool pool;

public Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource(); 
}


public void provide(String channel,Message message) throws IOException{
String str1=MessageUtil.convertToString(channel,"UTF-8");
String str2=MessageUtil.convertToString(message,"UTF-8");
jedis.publish(str1, str2);
}

//close the connection
public void close() throws IOException {
//將Jedis對象歸還給連接池,關閉連接
pool.returnResource(jedis); 
}
}

  

創建Consumer類

public class Consumer {

private Jedis jedis;
private JedisPool pool;

public Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource(); 
}


public void consum(String channel) throws IOException{
JedisPubSub jedisPubSub = new JedisPubSub() {
// 取得訂閱的消息后的處理 
public void onMessage(String channel, String message) { 
System.out.println("Channel:"+channel); 
System.out.println("Message:"+message.toString());
} 

// 初始化訂閱時候的處理 
public void onSubscribe(String channel, int subscribedChannels) { 
System.out.println("onSubscribe:"+channel); 
} 

// 取消訂閱時候的處理 
public void onUnsubscribe(String channel, int subscribedChannels) { 
System.out.println("onUnsubscribe:"+channel); 
} 

// 初始化按表達式的方式訂閱時候的處理 
public void onPSubscribe(String pattern, int subscribedChannels) { 
// System.out.println(pattern + "=" + subscribedChannels); 
} 

// 取消按表達式的方式訂閱時候的處理 
public void onPUnsubscribe(String pattern, int subscribedChannels) { 
// System.out.println(pattern + "=" + subscribedChannels); 
} 

// 取得按表達式的方式訂閱的消息后的處理 
public void onPMessage(String pattern, String channel, String message) { 
System.out.println(pattern + "=" + channel + "=" + message); 
} 
};

jedis.subscribe(jedisPubSub, channel);
}

//close the connection
public void close() throws IOException {
//將Jedis對象歸還給連接池
pool.returnResource(jedis); 
}
}

  

測試方法

public static void main(String[] args){

Message msg=new Message("hello!", "this is the first message!");

Producer producer=new Producer();
Consumer consumer=new Consumer();
try {
producer.provide("chn1",msg);
consumer.consum("chn1");
} catch (IOException e) {
e.printStackTrace();
}
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM