websocket redis實現集群即時消息聊天


websocket與redismq實現集群消息聊天

1.application.properties

server.port=8081

#thymeleaf配置
#是否啟用模板緩存。
spring.thymeleaf.cache=false
#是否為Web框架啟用Thymeleaf視圖解析。
spring.thymeleaf.enabled=true
#在SpringEL表達式中啟用SpringEL編譯器。
spring.thymeleaf.enable-spring-el-compiler=true
#模板文件編碼。
spring.thymeleaf.encoding=UTF-8
#要應用於模板的模板模式。另請參見Thymeleaf的TemplateMode枚舉。
spring.thymeleaf.mode=HTML5
#在構建URL時添加前綴以查看名稱的前綴。
spring.thymeleaf.prefix=classpath:/templates/
#Content-Type寫入HTTP響應的值。
spring.thymeleaf.servlet.content-type=text/html
#在構建URL時附加到視圖名稱的后綴。
spring.thymeleaf.suffix=.html

##單服務器
spring.redis.host=192.168.159.129
##單端口
spring.redis.port=6379
## 連接池最大連接數(使用負值表示沒有限制) 
spring.redis.pool.max-active=300
## Redis數據庫索引(默認為0) 
spring.redis.database=0
## 連接池最大阻塞等待時間(使用負值表示沒有限制) 
spring.redis.pool.max-wait=-1
## 連接池中的最大空閑連接 
spring.redis.pool.max-idle=100
## 連接池中的最小空閑連接 
spring.redis.pool.min-idle=20
## 連接超時時間(毫秒) 
spring.redis.timeout=60000

 

2.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.szw.learn</groupId>
    <artifactId>websocket_redis_mq_01</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>websocket_redis_mq_01</name>
    
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.16.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.test.skip>true</maven.test.skip>
        <skipTests>true</skipTests>
        <thymeleaf.version>3.0.7.RELEASE</thymeleaf.version>
        <thymeleaf-layout-dialect.version>2.1.2</thymeleaf-layout-dialect.version>
        <start-class>com.szw.learn.WsMqApplication</start-class>
    </properties>

    <dependencies>
        <!-- 使用web啟動器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 測試 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 模板引擎 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        
        <!-- redis id與1.5之前的變了 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        
        <!-- websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <!-- 要將源碼放上去,需要加入這個插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打包 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

3.SpringUtils.java

package com.szw.learn.util; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.stereotype.Repository; @Repository public final class SpringUtils implements BeanFactoryPostProcessor { private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境
 @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } public static ConfigurableListableBeanFactory getBeanFactory() { return beanFactory; } /** * 獲取對象 * * @param name * @return Object 一個以所給名字注冊的bean的實例 * @throws org.springframework.beans.BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) getBeanFactory().getBean(name); } /** * 獲取類型為requiredType的對象 * * @param clz * @return * @throws org.springframework.beans.BeansException * */
    public static <T> T getBean(Class<T> clz) throws BeansException { T result = (T) getBeanFactory().getBean(clz); return result; } /** * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true * * @param name * @return boolean */
    public static boolean containsBean(String name) { return getBeanFactory().containsBean(name); } /** * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return getBeanFactory().isSingleton(name); } /** * @param name * @return Class 注冊對象的類型 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return getBeanFactory().getType(name); } /** * 如果給定的bean名字在bean定義中有別名,則返回這些別名 * * @param name * @return * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return getBeanFactory().getAliases(name); } }

 

4.redis

發布service:

package com.szw.learn.redismq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; /** * @author 七脈 描述:發布service */ @Component public class PublishService { @Autowired StringRedisTemplate redisTemplate; /** * @author 七脈 描述:發布方法 * @param channel 消息發布訂閱 主題 * @param message 消息信息 */
    public void publish(String channel, Object message) { // 該方法封裝的 connection.publish(rawChannel, rawMessage);
 redisTemplate.convertAndSend(channel, message); } }

訂閱監聽類:

package com.szw.learn.redismq; import java.io.IOException; import javax.websocket.Session; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.StringRedisTemplate; /** * @author 七脈 描述:訂閱監聽類 */
public class SubscribeListener implements MessageListener { private StringRedisTemplate stringRedisTemplate; private Session session; /** * 訂閱接收發布者的消息 */ @Override public void onMessage(Message message, byte[] pattern) { String msg = new String(message.getBody()); System.out.println(new String(pattern) + "主題發布:" + msg); if(null!=session){ try { session.getBasicRemote().sendText(msg); } catch (IOException e) { e.printStackTrace(); } } } public StringRedisTemplate getStringRedisTemplate() { return stringRedisTemplate; } public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } }

注冊redis監聽容器:

package com.szw.learn.redismq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisConfig { @Autowired private JedisConnectionFactory jedisConnectionFactory; /** * @author 七脈 描述:需要手動注冊RedisMessageListenerContainer加入IOC容器 * @return
     */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory); return container; } }

 

5.websocket

websocket注冊:

package com.szw.learn.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebsocketConfig { /** * <br>描 述: @Endpoint注解的websocket交給ServerEndpointExporter自動注冊管理 * @return
     */ @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }

websocket端點:

package com.szw.learn.websocket; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import com.szw.learn.redismq.PublishService; import com.szw.learn.redismq.SubscribeListener; import com.szw.learn.util.SpringUtils; /** *@ServerEndpoint(value="/websocket")value值必須以/開路 *備注:@ServerEndpoint注解類不支持使用@Autowire *{topic}指:向哪個頻道主題里發消息 *{myname}指:這個消息是誰的。真實環境里可以使用當前登錄用戶信息 */ @Component @ServerEndpoint(value="/websocket/{topic}/{myname}") public class WebsocketEndpoint { /** * 因為@ServerEndpoint不支持注入,所以使用SpringUtils獲取IOC實例 */
    private StringRedisTemplate redisTampate = SpringUtils.getBean(StringRedisTemplate.class); private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class); //存放該服務器該ws的所有連接。用處:比如向所有連接該ws的用戶發送通知消息。
    private static CopyOnWriteArraySet<WebsocketEndpoint> sessions = new CopyOnWriteArraySet<>(); private Session session; @OnOpen public void onOpen(Session session,@PathParam("topic")String topic){ System.out.println("java websocket:打開連接"); this.session = session; sessions.add(this); SubscribeListener subscribeListener = new SubscribeListener(); subscribeListener.setSession(session); subscribeListener.setStringRedisTemplate(redisTampate); //設置訂閱topic
        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic(topic)); } @OnClose public void onClose(Session session){ System.out.println("java websocket:關閉連接"); sessions.remove(this); } @OnMessage public void onMessage(Session session,String message,@PathParam("topic")String topic,@PathParam("myname")String myname) throws IOException{ message = myname+":"+message; System.out.println("java websocket 收到消息=="+message); PublishService publishService = SpringUtils.getBean(PublishService.class); publishService.publish(topic, message); } @OnError public void onError(Session session,Throwable error){ System.out.println("java websocket 出現錯誤"); } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } }

測試controller

package com.szw.learn.websocket; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.servlet.ModelAndView; @Controller @RequestMapping("websocket") public class WebsocketController { @Value("${server.port}") private String port; public static final String INDEX = "websocket/index"; /** * @author 七脈 * 描述:聊天頁 * @param topic 發布訂閱的頻道主題 * @param myname 發布者的顯示名稱 * @return
     */ @RequestMapping("index/{topic}/{myname}") public ModelAndView index(@PathVariable("topic")String topic,@PathVariable("myname")String myname){ ModelAndView mav = new ModelAndView(INDEX); mav.addObject("port", port); mav.addObject("topic",topic); mav.addObject("myname",myname); return mav; } }

 

6.啟動類

package com.szw.learn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class WsMqApplication { public static void main(String[] args) { System.setProperty("spring.devtools.restart.enabled", "false"); SpringApplication.run(WsMqApplication.class, args); } }

 

7.測試頁面

<!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8"></meta>
<title>websocket集群</title>
</head>
<body> 本服務端口號:[[${port}]],使用redismq實現websocket集群<br/> [[${topic}]] 頻道 聊天中。。。<br/>
    <input id="input_id" type="text" /><button onclick="sendMessage()">發送</button>    <button onclick="closeWebsocket()">關閉</button>
    <div id="message_id"></div>
</body>
<script type="text/javascript"> document.getElementById('input_id').focus(); var websocket = null; //當前瀏覽前是否支持websocket
    if("WebSocket" in window){ var url = "ws://127.0.0.1:[[${port}]]/websocket/[[${topic}]]/[[${myname}]]"; websocket = new WebSocket(url); }else{ alert("瀏覽器不支持websocket"); } websocket.onopen = function(event){ setMessage("打開連接"); } websocket.onclose = function(event){ setMessage("關閉連接"); } websocket.onmessage = function(event){ setMessage(event.data); } websocket.onerror = function(event){ setMessage("連接異常"); } //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。
 window.onbeforeunload = function(){ closeWebsocket(); } //關閉websocket
    function closeWebsocket(){ //3代表已經關閉
        if(3!=websocket.readyState){ websocket.close(); }else{ alert("websocket之前已經關閉"); } } //將消息顯示在網頁上
    function setMessage(message){ document.getElementById('message_id').innerHTML += message + '<br/>'; } //發送消息
    function sendMessage(){ //1代表正在連接
        if(1==websocket.readyState){ var message = document.getElementById('input_id').value; //setMessage(message);
 websocket.send(message); }else{ alert("websocket未連接"); } document.getElementById('input_id').value=""; document.getElementById('input_id').focus(); } </script>
</html>

 

8.測試

  啟動兩個服務,端口號分別8081、8082(可以+)

  模擬兩個端口的地址:

    http://localhost:8081/websocket/index/like/董志峰

    http://localhost:8082/websocket/index/like/史振偉

  如圖

  

 

源碼下載:https://pan.baidu.com/s/1VMQJgXe5vX7uwsyRV57gIw


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM