Redis實現聊天功能


  在學習了Redis做為消息隊列之后研究 了redis聊天的功能。

  其實用關系型數據庫也可以實現消息功能,自己就曾經用mysql寫過一個簡單的消息的功能。RDB中思路如下:

**
在實際中可以完全借助mysql數據庫實現聊天功能,建立一個表,保存接收人的username、message、isConsumed等信息,用戶登錄之后采用心跳機制不停的檢測數據庫並消費消息。
心跳可以做好多事,比如檢測檢測當前用戶是否已經登錄,如果已經登錄剔除之前已經登錄的用戶,實現一個用戶一次登錄的功能。
心跳可以采用JS的周期函數不停的向后台發起異步請求,后台查詢未消息的消息
**

 

 

1.Redis實現一對一的聊天功能(基於lpush和brpop實現)

  簡單的實現一個用戶向另一個用戶發送多條信息,實現的思路是:

一對一聊天的思路:(采用Lpush和Brpop實現)
1.消息生產者生產消息到redis中:生產消息的時候根據接收人的userName與消息的類型發送到對應的key,采用lpush發送消息(根據userName生成key)
2.消息的消費者根據userName,從userName的key中消費對應的消息。如果有必要可以將消息寫到RDB中避免數據的丟失。(根據userName生成key的規則獲取用戶對應的消息)
3.消息的內容頭部加入發送者,例如原來消息內容是:hello,為了知道消息的發送者可以改為:張三*-*hello(為了獲取消息的發送者)

 

下面直接上代碼:

User.java(只有一個userName有用)

package cn.xm.jwxt.bean.system;

import java.util.List;
import java.util.Set;

public class User {
  
    private String username;//用戶姓名
public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username == null ? null : username.trim();
    }
}

 

 

 

redis-chat.properties

redis.url=127.0.0.1
redis.port=6379
redis.maxIdle=30
redis.minIdle=10
redis.maxTotal=100
redis.maxWait=20000

 

 

Jedis工具類:(返回Jedis連接)

package cn.xm.redisChat.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:32 2018/10/9
 */
public class JedisPoolUtils {

    private static JedisPool pool = null;

    static {

        //加載配置文件
        InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis-chat.properties");
        Properties pro = new Properties();
        try {
            pro.load(in);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //獲得池子對象
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大閑置個數
        poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大閑置個數
        poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小閑置個數
        poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大連接數
        pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString()));
    }

    //獲得jedis資源的方法
    public static Jedis getJedis() {
        return pool.getResource();
    }
}

 

 

 

消息生產者:(處理消息頭部加上消息的發送者,並且根據接受者的userName生成key)

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

/**
 * @Author: qlq
 * @Description 消息生產者(根據消息的)
 * @Date: 23:02 2018/10/13
 */
public class RedisMessageProducer {
    private static final Logger log = LoggerFactory.getLogger(RedisMessageProducer.class);

    /**
     * 發送消息的方法
     *
     * @param sendUser   發送消息的用戶
     * @param sendToUser 接收消息的用戶
     * @param messages   可變參數返送多條消息
     * @return
     */
    public static boolean sendMessage(User sendUser, User sendToUser, String... messages) {
        Jedis jedis = JedisPoolUtils.getJedis();
        try {
            String key = sendToUser.getUsername() + ":msg";
            //將消息的內容加上消息的發送人以 *-* 分割,不能用增強for循環
            for (int i = 0, length_1 = messages.length; i < length_1; i++) {
                messages[i] = sendUser.getUsername() + "*-*" + messages[i];
            }
            Long lpush = jedis.lpush(key, messages);//返回值是還有多少消息未消費
            log.debug("user {} send message [{}] to {}", sendUser.getUsername(), messages, sendToUser.getUsername());
            log.debug("user {} has {} messages ", sendToUser.getUsername(), lpush);
        } catch (Exception e) {
            log.error("sendMessage error", e);
        } finally {
            jedis.close();
        }
        return true;
    }
}

 

 

消息的消費者:(采用線程池獲取消息,根據接收消息的userName從對應的key中獲取對應的消息,並解析消息的key和發送者和內容)

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Author: qlq
 * @Description 消息的消費者
 * @Date: 23:44 2018/10/13
 */
public class RedisMessageConsumer {
    private static final Logger log = LoggerFactory.getLogger(RedisMessageConsumer.class);

    /**
     * 參數是初始化線程池子的大小
     */
    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

    /**
     * 消費消息
     *
     * @param consumerUser 接收消息的用戶
     */
    public static void consumerMessage(final User consumerUser) {
        final Jedis jedis = JedisPoolUtils.getJedis();

        //新建一個線程,線程池獲取消息
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (true){
                    List<String> messages = jedis.brpop(0, consumerUser.getUsername() + ":msg");//0是timeout,返回的是一個集合,第一個是消息的key,第二個是消息的內容
                    String key = messages.get(0);//第一個是key
                    String message = messages.get(1);//第二個是消息
                    String sendUserName = message.substring(0, message.indexOf("*-*"));//獲取消息的發送者
                    message = message.substring(message.indexOf("*-*")+3);//獲取消息內容
                    log.debug("ThreadName is {},user {} consumer message {} ,sended by {}", Thread.currentThread().getName(),consumerUser.getUsername(), message, sendUserName);
                }
            }
        };
        //線程池中獲取消息
        //第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
        batchTaskPool.scheduleWithFixedDelay(runnable, 3,5, TimeUnit.SECONDS);
    }
}

 

 

 

 

測試類:(lisi和wangwu消費消息)

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;

/**
 * @Author: qlq
 * @Description 消息消息
 * @Date: 0:04 2018/10/14
 */
public class ConsumerMessageApp {

    public static void main(String[] args) {
        User sndToUser = new User();
        sndToUser.setUsername("lisi");

        User sndToUser2 = new User();
        sndToUser2.setUsername("wangwu");

        RedisMessageConsumer.consumerMessage(sndToUser);
        RedisMessageConsumer.consumerMessage(sndToUser2);
    }
}

 

 

zhangsan給lisi和wangwu發送消息

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;

/**
 * @Author: qlq
 * @Description 生產消息測試
 * @Date: 23:59 2018/10/13
 */

public class ProducerMessageApp {
    public static void main(String[] args) {
        User sndUser = new User();
        sndUser.setUsername("zhangsan");

        User sndToUser = new User();
        sndToUser.setUsername("lisi");

        User sndToUser2 = new User();
        sndToUser2.setUsername("wangwu");

        RedisMessageProducer.sendMessage(sndUser, sndToUser, "給李四的消息一", "給李四的消息二");
        RedisMessageProducer.sendMessage(sndUser, sndToUser2, "給王五的消息一", "給王五的消息二");
    }
}

 

 

1.先啟動消費者

2.啟動消費者之后

消費者控制台如下:

 

生產者控制台如下:

 

 3.再次啟動消費者之后

消費者控制台:

 

 

 

生產者控制台:

 

 

至此實現了簡單的一對一聊天,實際上就是簡單的一個用戶給另一個用戶發送消息。上面采用這種方式實現的即使用戶上線也會接受之前未接受的消息。只有BRPOP之后消息才會消失。

實際中可以根據需求進行實際的開發,實際中有消息類型、內容等。

有時間的話可以用kindeditor實現一個簡單的一對一web聊天系統,這個功能待完成。==============

 

2.群聊功能(基於發布/訂閱實現)

群聊的思路:采用發布訂閱模式實現(publish和subscribe實現)
1.每個channel都有一個頻道,每一個channel代表一個群,用戶每次訂閱這個房間都代表進入這個群,可以發送與接收消息
2.發送者每次發送消息都需要先進入房間,也就是訂閱channel,之后可以向該頻道發送消息
3.接收者需要先進入房間,也就是訂閱channel,然后接收消息

也就是不管發送消息與接收消息,都需要訂閱channel進入房間。用戶進入某個房間可以存入到zSet,每次進入某個房間和發送消息先判斷是否已經進入某個房間。比如:
房間room1,則channel就是room1,保存其成員的set就是room1members。

 

總的來說:

    每個房間都是一個channel,進入房間的成員訂閱該channel。每個房間的成員保存在一個zset中,key可以定義為roomName+"users"。

    用戶退出房間的時候需要退出該房間,也就是退訂該channel,同時在zset中移除該成員。(退訂只能調用JedisPubSub的unsubscribe實現)。

 

也就是一個房間對應的信息如下:

  一個channel,名稱為  roomName    發送的群消息到這個channel

  一個zset,對應的key為roomName+"users",保存的是進入該房間的用戶,用戶退出房間需要借助退出房間的消息以及退訂實現

  發送群聊消息直接publish消息到該roomName即可。

 

 

 

下面上代碼:

RoomUtil   用戶進入某個房間與退出某個房間。退出房間需要發出退出房間的信號(也就是發送一條退訂的消息)

package cn.xm.redisChat.groupChat;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

/**
 * @Author: qlq
 * @Description 進入房間的工具類(訂閱某個channel,代表進入房間)
 * @Date: 16:15 2018/10/14
 */
public class RoomUtil {

    private static final Logger log = LoggerFactory.getLogger(RoomUtil.class);

    /**
     * 進入房間
     *
     * @param user     用戶
     * @param roomName
     */
    public static void enterRoom(User user, String roomName) {
        Jedis jedis = JedisPoolUtils.getJedis();
        String username = user.getUsername();
        Boolean sismember = jedis.sismember(roomName + "users", username);
        if (!sismember) {
            jedis.sadd(roomName + "users", username);
            log.info("{} 已經成功進入房間  {}!", username, roomName);
        } else {
            log.info("{} 已經進入房間,不能重復進入!", username);
        }
    }

    /**
     * 退出房間
     *
     * @param user     用戶
     * @param roomName 房間名稱
     */
    public static void exitRoom(User user, String roomName) {
        Jedis jedis = JedisPoolUtils.getJedis();
        String username = user.getUsername();
        Boolean sismember = jedis.sismember(roomName + "users", username);
        if (sismember) {
            //從成員組中移除
            jedis.srem(roomName + "users", username);
            //發送退訂信號(房間內的成員收到該信號后退訂)
            String exitSignal = user.getUsername() + ":exit:" + roomName;
            jedis.publish(roomName, exitSignal);
            log.info("{} 已經發出移除房間  {}的信號!", username, roomName);
        } else {
            log.info("{} 已經不在房間內!", username);
        }
    }

    /**
     * 判斷用戶是否在某個房間
     *
     * @param user
     * @param roomName
     * @return
     */
    public static boolean userIsInRoom(User user, String roomName) {
        Jedis jedis = JedisPoolUtils.getJedis();
        return jedis.sismember(roomName + "users", user.getUsername());
    }
}

 

 

消息生產者:(publish發布消息到指定的channe)

package cn.xm.redisChat.groupChat;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

/**
 * @Author: qlq
 * @Description 消息生產者
 * @Date: 18:09 2018/10/14
 */
public class MessageProducer {
    private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);

    public static void produceMsg(final User sendUser, String roomName, String... messages) {
        //發送消息
        Jedis jedis = JedisPoolUtils.getJedis();
        for (int i = 0, length_1 = messages.length; i < length_1; i++) {
            String msg = sendUser.getUsername() + "*-*" + messages[i];
            log.debug(msg);
            jedis.publish(roomName, msg);//發送消息
        }
    }
}

 

 

消息消費者:

  開啟線程獲取消息,如果收到的是自己退訂的信號則自己退出房間(取消訂閱該channel),訂閱之后線程會一直阻塞,退訂之后才會結束線程,也就是局部線程t會一直阻塞,直到收到退訂信號之后才會結束線程(也就不再獲取消息)

package cn.xm.redisChat.groupChat;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * @Author: qlq
 * @Description 消息消費者(訂閱頻道消費消息)
 * @Date: 16:12 2018/10/14
 */
public class MessageConsumer {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);

    public static void consumerMsg(final User user, final String roomName) {
        if (!RoomUtil.userIsInRoom(user, roomName)) {
            RoomUtil.enterRoom(user, roomName);
        }

        final Jedis jedis = JedisPoolUtils.getJedis();
        //新建一個線程,線程池獲取消息
        Thread t = new Thread() {
            @Override
            public void run() {
                jedis.subscribe(new JedisPubSub() {
                    @Override
                    public void onMessage(String channel, String message) {
                        //如果收到退訂信號就退訂頻道
                        String exitSignal = user.getUsername() + ":exit:" + roomName;
                        if (exitSignal.equals(message)) {
                            unsubscribe(channel);
                            log.info("============" + exitSignal + "============,channel->{}", channel);
                        } else if (!message.contains(":exit:")) {
                            log.info("{} consume msg:{},room is->{}", user.getUsername(), message, channel);
                        }
                    }

                    @Override
                    public void unsubscribe(String... channels) {
                        log.info("==============unsubscribe {}========", channels);
                        super.unsubscribe(channels);
                    }
                }, roomName);
            }
        };
        t.start();
    }
}

 

 

 

測試類:

package cn.xm.redisChat.groupChat;

import cn.xm.jwxt.bean.system.User;
import org.junit.Test;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:46 2018/10/14
 */
public class MsgProduceApp {
    public static void main(String[] args) {
        String roomName1 = "room1", roomName2 = "room2";
        User sendUser = new User();
        sendUser.setUsername("zhangsan");

        MessageProducer.produceMsg(sendUser, roomName1, "消息一", "消息二");
        MessageProducer.produceMsg(sendUser, roomName2, "消息一一", "消息二二");
    }

    /**
     * 將用戶lisi移除房間2
     */
    @Test
    public void fun2() {
        User user = new User();
        user.setUsername("lisi");
        RoomUtil.exitRoom(user, "room2");
    }
}

 

 

package cn.xm.redisChat.groupChat;

import cn.xm.jwxt.bean.system.User;

/**
 * @Author: qlq
 * @Description
 * @Date: 18:19 2018/10/14
 */
public class MsgConsumeApp {

    public static void main(String[] args) {
        String roomName1 = "room1", roomName2 = "room2";
        User sendUser = new User();
        sendUser.setUsername("lisi");
        User sendUser2 = new User();
        sendUser2.setUsername("wangwu");

        MessageConsumer.consumerMsg(sendUser, roomName1);
        MessageConsumer.consumerMsg(sendUser, roomName2);

        MessageConsumer.consumerMsg(sendUser2, roomName2);
    }
}

 

 

 

 

測試過程如下:

1.先調用MsgConsumeApp訂閱房間

2.調用MsgProduceApp生產消息

生產者控制台:

 

消費者控制台:

 

 3.調用fun2 退出房間:

消費者控制台:

 

 4.再次調用生產者生產消息:(lisi不接收房間2的消息)

 

   至此完成了群聊功能,實際上群聊還可以用線程池處理接收消息的線程,暫時用遠程的Thread處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM