在Redis中,有個Pub/Sub,他的主要的工作流程如:
redis訂閱一個模式頻道如:chat_*,然后由小a想找人聊天了,就發送一個消息“現在有人聊天嗎?chat_a”,末尾的chat_a為標識,表示你要在chat_* 這個圈子里面說。這個時候,chat_*這個圈子的管理員,就會對所有加入這個圈子的人發送一條消息。消息內容就是小a說的話。說白了,就是有個大喇叭,你說話聲音不夠大,但是你想讓所有人都聽到你的消息,那么你就要先對喇叭說話,然后喇叭把你的話擴散。。。。
還是根據代碼說,直接描述比抽象函數還要抽象。
首先我們先在配置文件里面配置下訂閱的頻道對應的監聽:
1 <!--chat--> 2 <bean id="msgListener" class="com.anhoo.util.MyMsgListener"/> 3 4 <bean id="listenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"> 5 <property name="connectionFactory" ref="jedisConnFactory"/> 6 <property name="messageListeners"> 7 <map> 8 <entry key-ref="msgListener" value-ref="patternTopic"/> 9 </map> 10 </property> 11 </bean> 12 13 <bean id="patternTopic" class="org.springframework.data.redis.listener.PatternTopic"> 14 <constructor-arg value="chat_*"/> 15 </bean>
2行是根據監聽消息的接口寫的監聽類,當監聽到有消息的時候,就會調用onMessage類
public class MyMsgListener implements MessageListener { @Autowired SubService subService;
@Override public void onMessage(Message message, byte[] bytes) { subService.isCall(message); // System.out.println(SerializeUtil.unserialize(bytes).toString()); System.out.println("當前的Message值為:" + message.toString()); } }
4-11行 是pub/sub監聽配置,redis的配置文件是 jedisConnFactory,監聽的頻道模式為 patternTopic,監聽到的消息處理類為 msgListener
13-15行 是監聽頻道的設置
消息的entity為:
public class MessageEntity implements Serializable { private String user; private String content; //省略get set }
Controller層代碼,主要有三個方法,sendMessage 是在頁面中發送發送消息,(前台有一個ajax方法一直在請求)callMsg 當監聽到新的消息的時候,返回監聽到的消息,addChatUser 當有新的用戶加入的時候,做記錄,1是方便以后根據用戶返回數據,2是防止重復的用戶名。
@Controller @RequestMapping("/back") public class ChatController { @Autowired PubService pubService; @Autowired SubService subService; @Autowired ChatService chatService; @ResponseBody @RequestMapping("/send") public ResultMsg sendMessage(MessageEntity messageEntity) { ResultMsg resultMsg = new ResultMsg(); if (messageEntity != null && !messageEntity.getUser().equals("") && !messageEntity.getContent().equals("")) { pubService.sendMessage(messageEntity); resultMsg.setMsg("發送成功!"); resultMsg.setCode(ResultCode.SUCCESS); return resultMsg; } resultMsg.setMsg("輸入信息有誤!"); resultMsg.setCode(ResultCode.FAIL); return resultMsg; } @ResponseBody @RequestMapping("/callBack") public ResultMsg callMsg(String user) throws InterruptedException { ResultMsg resultMsg = new ResultMsg(); Logger logger = LogManager.getLogger(ChatController.class); MessageEntity message; message = subService.callBack(user); if (message != null) { resultMsg.setCode(ResultCode.SUCCESS); resultMsg.setContent(message); return resultMsg; } else { resultMsg.setCode(ResultCode.FAIL); return resultMsg; } } @ResponseBody @RequestMapping("/join") public ResultMsg addChatUser(String user) { ResultMsg resultMsg = new ResultMsg(); if (chatService.addUser(user) > 0) { resultMsg.setCode(ResultCode.SUCCESS); } else { resultMsg.setCode(ResultCode.FAIL); resultMsg.setMsg("昵稱已經存在,請重新輸入!"); } return resultMsg; } }
Service 有三個 接口類為:
ChatService int addUser(String user); PubService void sendMessage(MessageEntity messageEntity); SubService void isCall(Message message); MessageEntity callBack(String user) throws InterruptedException;
ChatService接口的具體方法代碼
@Service public class ChatServiceImpl implements ChatService { @Autowired StringRedisTemplate stringRedisTemplate; @Override public int addUser(String user) { //判斷userList 已經其中的用戶是否已經存在 if (stringRedisTemplate.hasKey("userList") && stringRedisTemplate.opsForZSet().score("userList", user) != null) { return 0; } else { //增加新的用戶,但是要判斷下,是否是第一次剛啟動的時候 int currentInidex; if (stringRedisTemplate.hasKey("msgList")) { currentInidex = (int) (-1 - stringRedisTemplate.opsForList().size("msgList")); } else { currentInidex = -1; } stringRedisTemplate.opsForZSet().add("userList", user, currentInidex); return 1; } } }
PubService接口的具體方法代碼:
@Service public class PubServiceImpl implements PubService { @Autowired StringRedisTemplate stringRedisTemplate; @Override public void sendMessage(MessageEntity messageEntity) { //消息的頻道為chat_* String channel = "chat_"; String content = messageEntity.getContent(); //使得發送消息的 頻道為chat_用戶名 例如chat_jack 為了后面能根據這個得到 jack用戶 stringRedisTemplate.convertAndSend(channel + messageEntity.getUser(), content); } }
SubService接口的具體方法代碼:
@Service public class SubServiceImpl implements SubService { @Autowired StringRedisTemplate stringRedisTemplate; @Autowired JedisConnectionFactory jedisConnFactory; @Override public void isCall(Message message) { MessageEntity messageEntity = new MessageEntity(); //請參考配置文件,本例中key,value的序列化方式均為string。 //其中key必須為stringSerializer。和redisTemplate.convertAndSend對應 messageEntity.setUser(stringRedisTemplate.getStringSerializer().deserialize(message.getChannel()).split("_")[1]); messageEntity.setContent(stringRedisTemplate.getValueSerializer().deserialize(message.getBody()).toString()); stringRedisTemplate.opsForList().leftPush("msgList", JSON.toJSONString(messageEntity)); // Jedis jedis = (Jedis) jedisConnFactory.getConnection().getNativeConnection(); // stringRedisTemplate.opsForValue().set("broadcast",jedis.pubsubNumPat().toString() ); // System.out.println(jedis.pubsubNumPat().toString()); // jedis.close(); } @Override public MessageEntity callBack(String user) throws InterruptedException { //模擬1s 查看一次 不至於一直在連接redis 低於1s的頻率連接redis會報錯 Thread.sleep(1000); // String msgTxt = stringRedisTemplate.opsForList().rightPop("msgList"); //獲取當前user 對應的消息 坐標值 Double index = stringRedisTemplate.opsForZSet().score("userList", user); long l = new Double(index).longValue(); if (stringRedisTemplate.hasKey("msgList")) { String msgTxt = stringRedisTemplate.opsForList().index("msgList", l); //只有當msgList 有新的消息的時候,才會獲取消息 if (msgTxt != null && msgTxt != "") { // list.remove(user); MessageEntity messageEntity = JSON.parseObject(msgTxt, MessageEntity.class); //消息坐標加-1 stringRedisTemplate.opsForZSet().incrementScore("userList", user, -1); return messageEntity; } } return null; } }
service層用到的redis主要是 list和zset,當有用戶發送消息的時候,就把消息放到list中,獲取第一條可以是: opsForList().index("msgList", -1) ,第二條為:opsForList().index("msgList", -2),第三台為opsForList().index("msgList", -3)……以此類推,又因為我們前端有個ajax一直發送請求,按道理是只要我們list中有消息,我們就把他拿出來,在頁面展示。但是這里又不能實時的判斷當前是不是所有的用戶都獲取過一次,而且僅僅只能為一次。這個時候就根據list的長度以及zset的score來判斷了。過程為:當有用戶加入的時候, 如果是第一個用戶,那么就把他的zset的score設為-1,此時list中的消息為空,只有當我們發送一條消息的時候,onMessage做出響應,再把發送的消息存到list中,這個時候,一直發送請求的ajax發現,此時消息的長度為了1(可以通過 opsForList().index("msgList", -1)得到),而且當前用戶的score標志為-1,正好他們一致。那么就把這個消息取出來,在前台頁面展示,然后把score自自增-1,等待list里面再次有消息放入(長度為2,可以通過opsForList().index("msgList", -2)獲取)的時候才滿足取出消息的條件。。以此循環;如果不是第一個加入的,就把現在消息的長度放到score中,只有當接受下一條數據的時候才展示。
前台代碼。js部分:
$(function () { $("#user").focus(); }); function loading(user) { eoooxy.ajax("post", "/back/callBack", {"user":user}, function (r) { if (!eoooxy.isEmpty(r) && r.code == '100') { var o = r.content; var h = "<div style='margin: 10px 20px 10px 20px;'><label>" + o.user + "</label><br><label>" + o.content + "</label></div>"; $("#chatSpace").append(h); $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight; //$("#content").focus(); loading(user); } else { console.log("當前沒有消息,繼續請求……"); loading(user); } }, "json"/*, function (XMLHttpRequest, status) { if (status == 'timeout') {//超時,status還有success,error等值的情況 loading(); } }, 3000*/) } function chatting() { if (eoooxy.isEmpty($("#user").val())) { alert("必須先輸入昵稱,然后點擊開始聊天!"); return false; } var data = {"user": $("#user").val()}; eoooxy.ajax("post", "/back/join", data, function (r) { if (!eoooxy.isEmpty(r) && r.code == '100') { $("#user").attr("disabled", "disabled"); loading($("#user").val()); } else { alert(r.msg); } }, "json") } function sendInfo() { var message = $("#content"); var data = {"user": $("#user").val(), "content": $("#content").val()}; // var data = {"user": "jack", "content": $("#content").val()}; eoooxy.ajax("post", "/back/send", data, function (r) { if (!eoooxy.isEmpty(r) && r.code == '100') { message.val(''); $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight; message.focus(); } else { alert(r.msg); } }) }
jsp部分:
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %> <html> <head> <%@include file="/WEB-INF/view/common/meta.jsp" %> <title>聊天室</title> ${f:addJs("resources/js/back/chat.js")} </head> <body> <div style="width: auto;" align="center"> <h1>多人聊天室</h1> <span> <input id="user" style="resize: none;outline: none;" placeholder="昵稱,必須輸入"> <button onclick="chatting()">開始聊天</button> </span> <div id="chatSpace" style="width: 600px;height: 500px; border: solid 1px #CCCCCC; overflow-y: auto;text-align: left"> </div> <div style="width: 600px; height: 100px;border: solid 1px #CCCCCC; margin-top: 20px;"> <textarea id="content" style="width: 590px;height: 60px; resize: none;border: 0px;outline: none;margin: 5px;"></textarea> <button style="float: right;margin-right: 10px;" id="btn_send" onclick="sendInfo()">確定</button> </div> </div> </body> </html>
以上就是根據pub/sub 以及ajax長連接寫的一個在線實時聊天系統(實際上延遲1s),如果有錯,請指出,謝謝!
因為這邊采用的是ajax長連接(就是一直問:有沒有消息啊,有沒有消息啊,有的話拿走然后繼續問。。),所以會占用資源。如果我們能更好的優化他,我們可以使用H5的新的特性WebSocket來構建實時的聊天系統,具體這邊我就不介紹了,因為我也還沒搞透徹,沒有調查沒有發言權。。。
示例代碼git連接:https://github.com/eoooxy/anhoo