根據redis的pub/sub機制,寫一個即時在線聊天應用


在Redis中,有個Pub/Sub,他的主要的工作流程如:  

redis訂閱一個模式頻道如:chat_*,然后由小a想找人聊天了,就發送一個消息“現在有人聊天嗎?chat_a”,末尾的chat_a為標識,表示你要在chat_* 這個圈子里面說。這個時候,chat_*這個圈子的管理員,就會對所有加入這個圈子的人發送一條消息。消息內容就是小a說的話。說白了,就是有個大喇叭,你說話聲音不夠大,但是你想讓所有人都聽到你的消息,那么你就要先對喇叭說話,然后喇叭把你的話擴散。。。。

還是根據代碼說,直接描述比抽象函數還要抽象。

首先我們先在配置文件里面配置下訂閱的頻道對應的監聽:

 1   <!--chat-->
 2     <bean id="msgListener" class="com.anhoo.util.MyMsgListener"/>
 3 
 4     <bean id="listenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
 5         <property name="connectionFactory" ref="jedisConnFactory"/>
 6         <property name="messageListeners">
 7             <map>
 8                 <entry key-ref="msgListener" value-ref="patternTopic"/>
 9             </map>
10         </property>
11     </bean>
12 
13     <bean id="patternTopic" class="org.springframework.data.redis.listener.PatternTopic">
14         <constructor-arg value="chat_*"/>
15     </bean>

2行是根據監聽消息的接口寫的監聽類,當監聽到有消息的時候,就會調用onMessage類

public class MyMsgListener implements MessageListener {
    @Autowired
    SubService subService;
@Override
public void onMessage(Message message, byte[] bytes) { subService.isCall(message); // System.out.println(SerializeUtil.unserialize(bytes).toString()); System.out.println("當前的Message值為:" + message.toString()); } }

4-11行 是pub/sub監聽配置,redis的配置文件是  jedisConnFactory,監聽的頻道模式為 patternTopic,監聽到的消息處理類為 msgListener

13-15行 是監聽頻道的設置

消息的entity為:

public class MessageEntity implements Serializable {

    private String user;
    private String content;

    //省略get set
}

Controller層代碼,主要有三個方法,sendMessage 是在頁面中發送發送消息,(前台有一個ajax方法一直在請求)callMsg 當監聽到新的消息的時候,返回監聽到的消息,addChatUser 當有新的用戶加入的時候,做記錄,1是方便以后根據用戶返回數據,2是防止重復的用戶名。

@Controller
@RequestMapping("/back")
public class ChatController {

    @Autowired
    PubService pubService;

    @Autowired
    SubService subService;

    @Autowired
    ChatService chatService;

    @ResponseBody
    @RequestMapping("/send")
    public ResultMsg sendMessage(MessageEntity messageEntity) {
        ResultMsg resultMsg = new ResultMsg();
        if (messageEntity != null && !messageEntity.getUser().equals("") && !messageEntity.getContent().equals("")) {
            pubService.sendMessage(messageEntity);
            resultMsg.setMsg("發送成功!");
            resultMsg.setCode(ResultCode.SUCCESS);
            return resultMsg;
        }
        resultMsg.setMsg("輸入信息有誤!");
        resultMsg.setCode(ResultCode.FAIL);
        return resultMsg;
    }

    @ResponseBody
    @RequestMapping("/callBack")
    public ResultMsg callMsg(String user) throws InterruptedException {
        ResultMsg resultMsg = new ResultMsg();
        Logger logger = LogManager.getLogger(ChatController.class);
        MessageEntity message;
        message = subService.callBack(user);
        if (message != null) {
            resultMsg.setCode(ResultCode.SUCCESS);
            resultMsg.setContent(message);
            return resultMsg;
        } else {
            resultMsg.setCode(ResultCode.FAIL);
            return resultMsg;
        }
    }

    @ResponseBody
    @RequestMapping("/join")
    public ResultMsg addChatUser(String user) {
        ResultMsg resultMsg = new ResultMsg();
        if (chatService.addUser(user) > 0) {
            resultMsg.setCode(ResultCode.SUCCESS);
        } else {
            resultMsg.setCode(ResultCode.FAIL);
            resultMsg.setMsg("昵稱已經存在,請重新輸入!");
        }
        return resultMsg;
    }
}

Service 有三個 接口類為:

ChatService
int addUser(String user); 

PubService
void sendMessage(MessageEntity messageEntity);

SubService
void isCall(Message message);
MessageEntity callBack(String user) throws InterruptedException;

ChatService接口的具體方法代碼

@Service
public class ChatServiceImpl implements ChatService {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public int addUser(String user) {
        //判斷userList 已經其中的用戶是否已經存在
        if (stringRedisTemplate.hasKey("userList") && stringRedisTemplate.opsForZSet().score("userList", user) != null) {
            return 0;
        } else {
            //增加新的用戶,但是要判斷下,是否是第一次剛啟動的時候
            int currentInidex;
            if (stringRedisTemplate.hasKey("msgList")) {
                currentInidex = (int) (-1 - stringRedisTemplate.opsForList().size("msgList"));
            } else {
                currentInidex = -1;
            }
            stringRedisTemplate.opsForZSet().add("userList", user, currentInidex);
            return 1;
        }
    }
}

PubService接口的具體方法代碼:

@Service
public class PubServiceImpl implements PubService {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void sendMessage(MessageEntity messageEntity) {
        //消息的頻道為chat_*
        String channel = "chat_";
        String content = messageEntity.getContent();
        //使得發送消息的 頻道為chat_用戶名  例如chat_jack 為了后面能根據這個得到 jack用戶
        stringRedisTemplate.convertAndSend(channel + messageEntity.getUser(), content);
    }
}

SubService接口的具體方法代碼:

@Service
public class SubServiceImpl implements SubService {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Autowired
    JedisConnectionFactory jedisConnFactory;

    @Override
    public void isCall(Message message) {
        MessageEntity messageEntity = new MessageEntity();
        //請參考配置文件,本例中key,value的序列化方式均為string。
        //其中key必須為stringSerializer。和redisTemplate.convertAndSend對應
        messageEntity.setUser(stringRedisTemplate.getStringSerializer().deserialize(message.getChannel()).split("_")[1]);
        messageEntity.setContent(stringRedisTemplate.getValueSerializer().deserialize(message.getBody()).toString());
        stringRedisTemplate.opsForList().leftPush("msgList", JSON.toJSONString(messageEntity));

//        Jedis jedis = (Jedis) jedisConnFactory.getConnection().getNativeConnection();
//        stringRedisTemplate.opsForValue().set("broadcast",jedis.pubsubNumPat().toString() );
//        System.out.println(jedis.pubsubNumPat().toString());
//        jedis.close();

    }

    @Override
    public MessageEntity callBack(String user) throws InterruptedException {

        //模擬1s 查看一次 不至於一直在連接redis 低於1s的頻率連接redis會報錯
        Thread.sleep(1000);
//            String msgTxt = stringRedisTemplate.opsForList().rightPop("msgList");
        //獲取當前user 對應的消息 坐標值
        Double index = stringRedisTemplate.opsForZSet().score("userList", user);

        long l = new Double(index).longValue();
        if (stringRedisTemplate.hasKey("msgList")) {
            String msgTxt = stringRedisTemplate.opsForList().index("msgList", l);

            //只有當msgList 有新的消息的時候,才會獲取消息
            if (msgTxt != null && msgTxt != "") {
//                list.remove(user);
                MessageEntity messageEntity = JSON.parseObject(msgTxt, MessageEntity.class);

                //消息坐標加-1
                stringRedisTemplate.opsForZSet().incrementScore("userList", user, -1);
                return messageEntity;
            }
        }
        return null;

    }
}

service層用到的redis主要是 list和zset,當有用戶發送消息的時候,就把消息放到list中,獲取第一條可以是: opsForList().index("msgList", -1) ,第二條為:opsForList().index("msgList", -2),第三台為opsForList().index("msgList", -3)……以此類推,又因為我們前端有個ajax一直發送請求,按道理是只要我們list中有消息,我們就把他拿出來,在頁面展示。但是這里又不能實時的判斷當前是不是所有的用戶都獲取過一次,而且僅僅只能為一次。這個時候就根據list的長度以及zset的score來判斷了。過程為:當有用戶加入的時候, 如果是第一個用戶,那么就把他的zset的score設為-1,此時list中的消息為空,只有當我們發送一條消息的時候,onMessage做出響應,再把發送的消息存到list中,這個時候,一直發送請求的ajax發現,此時消息的長度為了1(可以通過 opsForList().index("msgList", -1)得到),而且當前用戶的score標志為-1,正好他們一致。那么就把這個消息取出來,在前台頁面展示,然后把score自自增-1,等待list里面再次有消息放入(長度為2,可以通過opsForList().index("msgList", -2)獲取)的時候才滿足取出消息的條件。。以此循環;如果不是第一個加入的,就把現在消息的長度放到score中,只有當接受下一條數據的時候才展示。

前台代碼。js部分:

$(function () {
    $("#user").focus();
});

function loading(user) {
    eoooxy.ajax("post", "/back/callBack", {"user":user}, function (r) {
        if (!eoooxy.isEmpty(r) && r.code == '100') {
            var o = r.content;
            var h = "<div style='margin: 10px 20px 10px 20px;'><label>" + o.user + "</label><br><label>" + o.content + "</label></div>";
            $("#chatSpace").append(h);
            $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight;
            //$("#content").focus();
            loading(user);
        } else {
            console.log("當前沒有消息,繼續請求……");
            loading(user);
        }
    }, "json"/*, function (XMLHttpRequest, status) {
        if (status == 'timeout') {//超時,status還有success,error等值的情況
            loading();
        }
    }, 3000*/)
}

function chatting() {
    if (eoooxy.isEmpty($("#user").val())) {
        alert("必須先輸入昵稱,然后點擊開始聊天!");
        return false;
    }
    var data = {"user": $("#user").val()};
    eoooxy.ajax("post", "/back/join", data, function (r) {
        if (!eoooxy.isEmpty(r) && r.code == '100') {
            $("#user").attr("disabled", "disabled");
            loading($("#user").val());
        } else {
            alert(r.msg);
        }
    }, "json")
}

function sendInfo() {
    var message = $("#content");
    var data = {"user": $("#user").val(), "content": $("#content").val()};
    // var data = {"user": "jack", "content": $("#content").val()};
    eoooxy.ajax("post", "/back/send", data, function (r) {
        if (!eoooxy.isEmpty(r) && r.code == '100') {
            message.val('');
            $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight;
            message.focus();
        } else {
            alert(r.msg);
        }

    })
}

jsp部分:

<%@ page language="java" contentType="text/html; charset=UTF-8"
         pageEncoding="UTF-8" %>
<html>
<head>
    <%@include file="/WEB-INF/view/common/meta.jsp" %>
    <title>聊天室</title>
    ${f:addJs("resources/js/back/chat.js")}
</head>
<body>
<div style="width: auto;" align="center">
    <h1>多人聊天室</h1>
    <span>
        <input id="user" style="resize: none;outline: none;" placeholder="昵稱,必須輸入">
        <button onclick="chatting()">開始聊天</button>
    </span>
    <div id="chatSpace"
         style="width: 600px;height: 500px; border: solid 1px #CCCCCC; overflow-y: auto;text-align: left">

    </div>
    <div style="width: 600px; height: 100px;border: solid 1px #CCCCCC; margin-top: 20px;">
        <textarea id="content"
                  style="width: 590px;height: 60px; resize: none;border: 0px;outline: none;margin: 5px;"></textarea>
        <button style="float: right;margin-right: 10px;" id="btn_send" onclick="sendInfo()">確定</button>
    </div>
</div>
</body>
</html>

 

以上就是根據pub/sub 以及ajax長連接寫的一個在線實時聊天系統(實際上延遲1s),如果有錯,請指出,謝謝!

因為這邊采用的是ajax長連接(就是一直問:有沒有消息啊,有沒有消息啊,有的話拿走然后繼續問。。),所以會占用資源。如果我們能更好的優化他,我們可以使用H5的新的特性WebSocket來構建實時的聊天系統,具體這邊我就不介紹了,因為我也還沒搞透徹,沒有調查沒有發言權。。。

示例代碼git連接:https://github.com/eoooxy/anhoo

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM