在學習了Redis做為消息隊列之后研究 了redis聊天的功能。
其實用關系型數據庫也可以實現消息功能,自己就曾經用mysql寫過一個簡單的消息的功能。RDB中思路如下:
**
在實際中可以完全借助mysql數據庫實現聊天功能,建立一個表,保存接收人的username、message、isConsumed等信息,用戶登錄之后采用心跳機制不停的檢測數據庫並消費消息。
心跳可以做好多事,比如檢測檢測當前用戶是否已經登錄,如果已經登錄剔除之前已經登錄的用戶,實現一個用戶一次登錄的功能。
心跳可以采用JS的周期函數不停的向后台發起異步請求,后台查詢未消息的消息
**
1.Redis實現一對一的聊天功能(基於lpush和brpop實現)
簡單的實現一個用戶向另一個用戶發送多條信息,實現的思路是:
一對一聊天的思路:(采用Lpush和Brpop實現)
1.消息生產者生產消息到redis中:生產消息的時候根據接收人的userName與消息的類型發送到對應的key,采用lpush發送消息(根據userName生成key)
2.消息的消費者根據userName,從userName的key中消費對應的消息。如果有必要可以將消息寫到RDB中避免數據的丟失。(根據userName生成key的規則獲取用戶對應的消息)
3.消息的內容頭部加入發送者,例如原來消息內容是:hello,為了知道消息的發送者可以改為:張三*-*hello(為了獲取消息的發送者)
下面直接上代碼:
User.java(只有一個userName有用)
package cn.xm.jwxt.bean.system; import java.util.List; import java.util.Set; public class User { private String username;//用戶姓名 public String getUsername() { return username; } public void setUsername(String username) { this.username = username == null ? null : username.trim(); } }
redis-chat.properties
redis.url=127.0.0.1 redis.port=6379 redis.maxIdle=30 redis.minIdle=10 redis.maxTotal=100 redis.maxWait=20000
Jedis工具類:(返回Jedis連接)
package cn.xm.redisChat.util; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @Author: qlq * @Description * @Date: 21:32 2018/10/9 */ public class JedisPoolUtils { private static JedisPool pool = null; static { //加載配置文件 InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis-chat.properties"); Properties pro = new Properties(); try { pro.load(in); } catch (IOException e) { e.printStackTrace(); } //獲得池子對象 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大閑置個數 poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大閑置個數 poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小閑置個數 poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大連接數 pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString())); } //獲得jedis資源的方法 public static Jedis getJedis() { return pool.getResource(); } }
消息生產者:(處理消息頭部加上消息的發送者,並且根據接受者的userName生成key)
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; /** * @Author: qlq * @Description 消息生產者(根據消息的) * @Date: 23:02 2018/10/13 */ public class RedisMessageProducer { private static final Logger log = LoggerFactory.getLogger(RedisMessageProducer.class); /** * 發送消息的方法 * * @param sendUser 發送消息的用戶 * @param sendToUser 接收消息的用戶 * @param messages 可變參數返送多條消息 * @return */ public static boolean sendMessage(User sendUser, User sendToUser, String... messages) { Jedis jedis = JedisPoolUtils.getJedis(); try { String key = sendToUser.getUsername() + ":msg"; //將消息的內容加上消息的發送人以 *-* 分割,不能用增強for循環 for (int i = 0, length_1 = messages.length; i < length_1; i++) { messages[i] = sendUser.getUsername() + "*-*" + messages[i]; } Long lpush = jedis.lpush(key, messages);//返回值是還有多少消息未消費 log.debug("user {} send message [{}] to {}", sendUser.getUsername(), messages, sendToUser.getUsername()); log.debug("user {} has {} messages ", sendToUser.getUsername(), lpush); } catch (Exception e) { log.error("sendMessage error", e); } finally { jedis.close(); } return true; } }
消息的消費者:(采用線程池獲取消息,根據接收消息的userName從對應的key中獲取對應的消息,並解析消息的key和發送者和內容)
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @Author: qlq * @Description 消息的消費者 * @Date: 23:44 2018/10/13 */ public class RedisMessageConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMessageConsumer.class); /** * 參數是初始化線程池子的大小 */ private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2); /** * 消費消息 * * @param consumerUser 接收消息的用戶 */ public static void consumerMessage(final User consumerUser) { final Jedis jedis = JedisPoolUtils.getJedis(); //新建一個線程,線程池獲取消息 Runnable runnable = new Runnable() { @Override public void run() { while (true){ List<String> messages = jedis.brpop(0, consumerUser.getUsername() + ":msg");//0是timeout,返回的是一個集合,第一個是消息的key,第二個是消息的內容 String key = messages.get(0);//第一個是key String message = messages.get(1);//第二個是消息 String sendUserName = message.substring(0, message.indexOf("*-*"));//獲取消息的發送者 message = message.substring(message.indexOf("*-*")+3);//獲取消息內容 log.debug("ThreadName is {},user {} consumer message {} ,sended by {}", Thread.currentThread().getName(),consumerUser.getUsername(), message, sendUserName); } } }; //線程池中獲取消息 //第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位 batchTaskPool.scheduleWithFixedDelay(runnable, 3,5, TimeUnit.SECONDS); } }
測試類:(lisi和wangwu消費消息)
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; /** * @Author: qlq * @Description 消息消息 * @Date: 0:04 2018/10/14 */ public class ConsumerMessageApp { public static void main(String[] args) { User sndToUser = new User(); sndToUser.setUsername("lisi"); User sndToUser2 = new User(); sndToUser2.setUsername("wangwu"); RedisMessageConsumer.consumerMessage(sndToUser); RedisMessageConsumer.consumerMessage(sndToUser2); } }
zhangsan給lisi和wangwu發送消息
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; /** * @Author: qlq * @Description 生產消息測試 * @Date: 23:59 2018/10/13 */ public class ProducerMessageApp { public static void main(String[] args) { User sndUser = new User(); sndUser.setUsername("zhangsan"); User sndToUser = new User(); sndToUser.setUsername("lisi"); User sndToUser2 = new User(); sndToUser2.setUsername("wangwu"); RedisMessageProducer.sendMessage(sndUser, sndToUser, "給李四的消息一", "給李四的消息二"); RedisMessageProducer.sendMessage(sndUser, sndToUser2, "給王五的消息一", "給王五的消息二"); } }
1.先啟動消費者
2.啟動消費者之后
消費者控制台如下:
生產者控制台如下:
3.再次啟動消費者之后
消費者控制台:
生產者控制台:
至此實現了簡單的一對一聊天,實際上就是簡單的一個用戶給另一個用戶發送消息。上面采用這種方式實現的即使用戶上線也會接受之前未接受的消息。只有BRPOP之后消息才會消失。
實際中可以根據需求進行實際的開發,實際中有消息類型、內容等。
有時間的話可以用kindeditor實現一個簡單的一對一web聊天系統,這個功能待完成。==============
2.群聊功能(基於發布/訂閱實現)
群聊的思路:采用發布訂閱模式實現(publish和subscribe實現)
1.每個channel都有一個頻道,每一個channel代表一個群,用戶每次訂閱這個房間都代表進入這個群,可以發送與接收消息
2.發送者每次發送消息都需要先進入房間,也就是訂閱channel,之后可以向該頻道發送消息
3.接收者需要先進入房間,也就是訂閱channel,然后接收消息
也就是不管發送消息與接收消息,都需要訂閱channel進入房間。用戶進入某個房間可以存入到zSet,每次進入某個房間和發送消息先判斷是否已經進入某個房間。比如:
房間room1,則channel就是room1,保存其成員的set就是room1members。
總的來說:
每個房間都是一個channel,進入房間的成員訂閱該channel。每個房間的成員保存在一個zset中,key可以定義為roomName+"users"。
用戶退出房間的時候需要退出該房間,也就是退訂該channel,同時在zset中移除該成員。(退訂只能調用JedisPubSub的unsubscribe實現)。
也就是一個房間對應的信息如下:
一個channel,名稱為 roomName 發送的群消息到這個channel
一個zset,對應的key為roomName+"users",保存的是進入該房間的用戶,用戶退出房間需要借助退出房間的消息以及退訂實現
發送群聊消息直接publish消息到該roomName即可。
下面上代碼:
RoomUtil 用戶進入某個房間與退出某個房間。退出房間需要發出退出房間的信號(也就是發送一條退訂的消息)
package cn.xm.redisChat.groupChat; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; /** * @Author: qlq * @Description 進入房間的工具類(訂閱某個channel,代表進入房間) * @Date: 16:15 2018/10/14 */ public class RoomUtil { private static final Logger log = LoggerFactory.getLogger(RoomUtil.class); /** * 進入房間 * * @param user 用戶 * @param roomName */ public static void enterRoom(User user, String roomName) { Jedis jedis = JedisPoolUtils.getJedis(); String username = user.getUsername(); Boolean sismember = jedis.sismember(roomName + "users", username); if (!sismember) { jedis.sadd(roomName + "users", username); log.info("{} 已經成功進入房間 {}!", username, roomName); } else { log.info("{} 已經進入房間,不能重復進入!", username); } } /** * 退出房間 * * @param user 用戶 * @param roomName 房間名稱 */ public static void exitRoom(User user, String roomName) { Jedis jedis = JedisPoolUtils.getJedis(); String username = user.getUsername(); Boolean sismember = jedis.sismember(roomName + "users", username); if (sismember) { //從成員組中移除 jedis.srem(roomName + "users", username); //發送退訂信號(房間內的成員收到該信號后退訂) String exitSignal = user.getUsername() + ":exit:" + roomName; jedis.publish(roomName, exitSignal); log.info("{} 已經發出移除房間 {}的信號!", username, roomName); } else { log.info("{} 已經不在房間內!", username); } } /** * 判斷用戶是否在某個房間 * * @param user * @param roomName * @return */ public static boolean userIsInRoom(User user, String roomName) { Jedis jedis = JedisPoolUtils.getJedis(); return jedis.sismember(roomName + "users", user.getUsername()); } }
消息生產者:(publish發布消息到指定的channe)
package cn.xm.redisChat.groupChat; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; /** * @Author: qlq * @Description 消息生產者 * @Date: 18:09 2018/10/14 */ public class MessageProducer { private static final Logger log = LoggerFactory.getLogger(MessageProducer.class); public static void produceMsg(final User sendUser, String roomName, String... messages) { //發送消息 Jedis jedis = JedisPoolUtils.getJedis(); for (int i = 0, length_1 = messages.length; i < length_1; i++) { String msg = sendUser.getUsername() + "*-*" + messages[i]; log.debug(msg); jedis.publish(roomName, msg);//發送消息 } } }
消息消費者:
開啟線程獲取消息,如果收到的是自己退訂的信號則自己退出房間(取消訂閱該channel),訂閱之后線程會一直阻塞,退訂之后才會結束線程,也就是局部線程t會一直阻塞,直到收到退訂信號之后才會結束線程(也就不再獲取消息)
package cn.xm.redisChat.groupChat; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; /** * @Author: qlq * @Description 消息消費者(訂閱頻道消費消息) * @Date: 16:12 2018/10/14 */ public class MessageConsumer { private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class); public static void consumerMsg(final User user, final String roomName) { if (!RoomUtil.userIsInRoom(user, roomName)) { RoomUtil.enterRoom(user, roomName); } final Jedis jedis = JedisPoolUtils.getJedis(); //新建一個線程,線程池獲取消息 Thread t = new Thread() { @Override public void run() { jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { //如果收到退訂信號就退訂頻道 String exitSignal = user.getUsername() + ":exit:" + roomName; if (exitSignal.equals(message)) { unsubscribe(channel); log.info("============" + exitSignal + "============,channel->{}", channel); } else if (!message.contains(":exit:")) { log.info("{} consume msg:{},room is->{}", user.getUsername(), message, channel); } } @Override public void unsubscribe(String... channels) { log.info("==============unsubscribe {}========", channels); super.unsubscribe(channels); } }, roomName); } }; t.start(); } }
測試類:
package cn.xm.redisChat.groupChat; import cn.xm.jwxt.bean.system.User; import org.junit.Test; /** * @Author: qlq * @Description * @Date: 21:46 2018/10/14 */ public class MsgProduceApp { public static void main(String[] args) { String roomName1 = "room1", roomName2 = "room2"; User sendUser = new User(); sendUser.setUsername("zhangsan"); MessageProducer.produceMsg(sendUser, roomName1, "消息一", "消息二"); MessageProducer.produceMsg(sendUser, roomName2, "消息一一", "消息二二"); } /** * 將用戶lisi移除房間2 */ @Test public void fun2() { User user = new User(); user.setUsername("lisi"); RoomUtil.exitRoom(user, "room2"); } }
package cn.xm.redisChat.groupChat; import cn.xm.jwxt.bean.system.User; /** * @Author: qlq * @Description * @Date: 18:19 2018/10/14 */ public class MsgConsumeApp { public static void main(String[] args) { String roomName1 = "room1", roomName2 = "room2"; User sendUser = new User(); sendUser.setUsername("lisi"); User sendUser2 = new User(); sendUser2.setUsername("wangwu"); MessageConsumer.consumerMsg(sendUser, roomName1); MessageConsumer.consumerMsg(sendUser, roomName2); MessageConsumer.consumerMsg(sendUser2, roomName2); } }
測試過程如下:
1.先調用MsgConsumeApp訂閱房間
2.調用MsgProduceApp生產消息
生產者控制台:
消費者控制台:
3.調用fun2 退出房間:
消費者控制台:
4.再次調用生產者生產消息:(lisi不接收房間2的消息)
至此完成了群聊功能,實際上群聊還可以用線程池處理接收消息的線程,暫時用遠程的Thread處理。