SpringBoot中使用消息中間件Kafka實現Websocket的集群


1、在實際項目中,由於數據量的增大及並發數的增多,我們不可能只用一台Websocket服務,這個時候就需要用到Webscoket的集群。但是Websocket集群會遇到一些問題。首先我們肯定會想到直接將Websocket的Session放到Redis等緩存服務器中,然后用的時候直接在Redis中獲取。但是Webscoket的Session比較特殊,它不能被序列化,因為 WebSocket的session是有狀態的,還有就是 WebSocket的session是有時效性的,只要連接一斷開,該Session就會失效。

2、解決Websocket集群的三種方法

  2.1、通過相應的算法,將有關聯的用戶(即有可能發生聊天的對象)全部指定到一台Webscoket服務。這樣就不會存在聊天對象收不到消息的情況。但是這種方法有局限性,就是用戶只能和有關聯的用戶聊天,不能和其他未建立關聯的用戶聊天。

  2.2、使用Redis的消息訂閱功能來實現WebSocket集群。大致思路如下圖。

 

  2.3、使用Kafka等消息中間件來實現Webscoket集群。這也是目前我選用的方式。其實該方法和Redis的消息訂閱大致思路差不多。但是Redis我們只把他作為緩存使用,不想Redis涉及太多的業務處理,因此就選用了Kafka。

     2.3.1、Kafka安裝。(百度上有)

     2.3.2、Kafka實現集群的大致思路,如下圖(如果一個groupId下有多個消費者,則只會有一個消費者能獲取到消息,所以為了保證Websocket集群都能收到消息,則需要不同的groupId。我使用的是服務器的IP來作為groupId)

 

     2.3.3、在項目的pom文件中添加Kafka依賴(注:Kafka依賴的版本必須和服務器上安裝的版本一致

        <!-- kafka依賴 -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.0.RELEASE</version>
        </dependency>

     2.3.4、建立Kafka的生產者Bean

 

package com.yxl.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: yxl
 * @Description: Kafka生產者(消息發送者)
 * @DATE: Created in 2018/11/14
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    public Map<String, Object> producerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "kafka集群IP1:9092,kafka集群IP2:9092");
        properties.put("acks", "all");//ack是判別請求是否為完整的條件(就是是判斷是不是成功發送了)。我們指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的。
        properties.put("retries", 0);//如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重復消息的可能性。
        properties.put("batch.size", 16384);//producer(生產者)緩存每個分區未發送消息。緩存的大小是通過 batch.size 配置指定的
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

 

    2.3.4、建立Kafka的消費者Bean以及消費者監聽

package com.yxl.configuration;

import com.yxl.myListener.MyKafkaListener;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @Author: yxl
 * @Description: Kafka消費者
 * @DATE: Created in 2018/11/14
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "kafka集群IP1:9092,kafka集群IP2:9092");
        properties.put("group.id", getIPAddress()); //獲取服務器Ip作為groupId
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    public String getIPAddress() {
        try {
            InetAddress address = InetAddress.getLocalHost();
            if (address != null && StringUtils.isNotBlank(address.getHostAddress())) {
                return address.getHostAddress();
            }
        }catch (UnknownHostException e) {
            return UUID.randomUUID().toString().replace("-","");
        }
        return UUID.randomUUID().toString().replace("-","");
    }

    /**
      * 自定義監聽
      */
    @Bean
    public MyKafkaListener listener() {
        return new MyKafkaListener();
    }
}

    2.3.4、消費者監聽

package com.yxl.myListener;

import com.yxl.websocket.ChatWebsocket;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @Author: yxl
 * @Description:
 * @DATE: Created in 2018/11/14
 */
public class MyKafkaListener {

    Logger logger = Logger.getLogger(MyKafkaListener.class);

    /**
     * 發送聊天消息時的監聽
     * @param record
     */
    @KafkaListener(topics = {"chatMessage"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("chatMessage發送聊天消息監聽:"+record.value().toString());
        ChatWebsocket chatWebsocket = new ChatWebsocket();
        chatWebsocket.kafkaReceiveMsg(record.value().toString());
    }

    /**
     * 關閉連接時的監聽
     * @param record
     */
    @KafkaListener(topics = {"closeWebsocket"})
    private void closeListener(ConsumerRecord<?, ?> record) {
        logger.info("closeWebsocket關閉websocket連接監聽:"+record.value().toString());
        ChatWebsocket chatWebsocket = new ChatWebsocket();
        chatWebsocket.kafkaCloseWebsocket(record.value().toString());
    }

}

    2.3.6、Websocket集群java代碼

package com.kk.server.chat.websocket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Websocket集群
 * Created by yxl on 2018-11-17.
 */
@ServerEndpoint("/chat/{userId}")
@Component
public class ChatWebsocket {

    private Logger logger = Logger.getLogger(ChatWebsocket.class);

    private static ApplicationContext applicationContext;

    private KafkaTemplate kafkaTemplate;

    //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。
    private static int onlineCount = 0;
    //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。若要實現服務端與單一客戶端通信的話,可以使用Map來存放,其中Key可以為用戶標識
    private static Map<String, Session> drWebSocketSet = new ConcurrentHashMap<>(); //醫生web


    /**
     * 連接建立成功調用的方法
     *
     * @param userId     用戶標識
     */
    @OnOpen
    public void onOpen(@PathParam("userId") String userId, Session session) {

        if (kafkaTemplate == null) {
            kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); //獲取kafka的Bean實例
        }

        drWebSocketSet.put(userId, session);
    }


    /**
     * s
     * 收到客戶端消息后調用的方法
     *
     * @param message 客戶端發送過來的消息
     * @param session 可選的參數
     */
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        if ("ping".equals(message)) {
            session.getBasicRemote().sendText("pong"); //心跳
        } else {
            sendMessage(message, session); //調用Kafka進行消息分發
        }


    }

    /**
     * 發送消息
     *
     * @param message
     * @param session
     * @throws IOException
     */
    public void sendMessage(String message, Session session) throws IOException {
        if (StringUtils.isNotBlank(message)) {

            JSONObject jsonObject = JSONObject.parseObject(message);

            String sender_id = jsonObject.getString("sender_id"); //發送者ID
            String receiver_id = jsonObject.getString("receiver_id"); //接受者ID

            //TODO 這里可以進行優化。可以首先根據接收方的userId,即receiver_id判斷接收方是否在當前服務器,若在,直接獲取session發送即可就不需要走Kafka了,節約資源
            kafkaTemplate.send("chatMessage", message);
        }
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose(Session session) {
        Map<String, String> pathParameters = session.getPathParameters();
        String userId = pathParameters.get("userId"); //從session中獲取userId
        Map<String, String> map = new HashMap<>();
        map.put("username", userId);
        kafkaTemplate.send("closeWebsocket", JSON.toJSONString(map));
    }
}

    /**
     * 關閉連接
     *
     * @param map 當前登錄客戶端的map
     */
    private void close(Map<String, Session> map, String username) {
        if (StringUtils.isNotBlank(username)) {
            logger.info("關閉websocket鏈接,關閉客戶端username:" + username);
            if (map.get(username) != null) {
                map.remove(username);
            }
        }
    }

    /**
     * kafka發送消息監聽事件,有消息分發
     *
     * @param message
     * @author yxl
     */
    public void kafkaReceiveMsg(String message) {
        JSONObject jsonObject = JSONObject.parseObject(message);

        String receiver_id = jsonObject.getString("receiver_id"); //接受者ID

        if (drWebSocketSet.get(receiver_id) != null) {
            drWebSocketSet.get(receiver_id).getBasicRemote.sendText(message); //進行消息發送
        }
    }

    /**
     * kafka監聽關閉websocket連接
     *
     * @param closeMessage
     */
    public void kafkaCloseWebsocket(String closeMessage) {
        JSONObject jsonObject = JSONObject.parseObject(closeMessage);
        String userId = jsonObject.getString("userId");
        drWebSocketSet.remove(userId);
    }



    /**
     * 發生錯誤時調用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("webscoket發生錯誤!關閉websocket鏈接");
        //onClose(session);
        error.printStackTrace();
        logger.info("webscoket發生錯誤!" + error.getMessage());
    }

}

websocket中不能直接注入相應的Bean實例,這個時候可以看我的另一篇博客https://www.cnblogs.com/Amaris-Lin/p/9038813.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM