springboot 集成netty-socket


1. 添加依賴

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.18</version>
</dependency>

2. 添加YML配置

# SocketIO配置
socket:
  # SocketIO端口
  port: 9090
  # 連接數大小
  workCount: 100
  # 允許客戶請求
  allowCustomRequests: true
  # 協議升級超時時間(毫秒),默認10秒,HTTP握手升級為ws協議超時時間
  upgradeTimeout: 10000
  # Ping消息超時時間(毫秒),默認60秒,這個時間間隔內沒有接收到心跳消息就會發送超時事件
  pingTimeout: 60000
  # Ping消息間隔(毫秒),默認25秒。客戶端向服務器發送一條心跳消息間隔
  pingInterval: 25000
  # 設置HTTP交互最大內容長度
  maxHttpContentLength: 1048576
  # 設置最大每幀處理數據的長度,防止他人利用大數據來攻擊服務器
  maxFramePayloadLength: 1048576

3. 實現Spring配置類

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * Socket 配置類
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 15:22
 */
@Data
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {

    private Integer port;

    private Integer workCount;

    private Boolean allowCustomRequests;

    private Integer upgradeTimeout;

    private Integer pingTimeout;

    private Integer pingInterval;

    private Integer maxFramePayloadLength;

    private Integer maxHttpContentLength;


}
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.nuorui.common.config.properties.SocketProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * Socket 配置類
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 15:23
 */
@Configuration
@EnableConfigurationProperties(SocketProperties.class)
public class SocketConfig {

    @Resource
    private SocketProperties properties;

    @Bean
    public SocketIOServer socketIOServer() {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setPort(properties.getPort());

        com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
        socketConfig.setReuseAddress(true);
        config.setSocketConfig(socketConfig);
        config.setWorkerThreads(properties.getWorkCount());
        config.setAllowCustomRequests(properties.getAllowCustomRequests());
        config.setUpgradeTimeout(properties.getUpgradeTimeout());
        config.setPingTimeout(properties.getPingTimeout());
        config.setPingInterval(properties.getPingInterval());
        config.setMaxHttpContentLength(properties.getMaxHttpContentLength());
        config.setMaxFramePayloadLength(properties.getMaxFramePayloadLength());

        return new SocketIOServer(config);
    }

    /**
     * 開啟SocketIOServer注解支持
     *
     * @param socketServer
     * @return
     */
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

4. 實現服務端

import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.google.common.collect.Maps;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * 客戶端緩存
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 16:01
 */
@Component
public class ClientCache {

    /**
     * 本地緩存
     */
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = Maps.newConcurrentMap();

    /**
     * 存入本地緩存
     *
     * @param mmsi         船舶MMSI
     * @param sessionId      頁面sessionID
     * @param socketIOClient 頁面對應的通道連接信息
     */
    public void saveClient(String mmsi, UUID sessionId, SocketIOClient socketIOClient) {
        if (StrUtil.isNotBlank(mmsi)) {
            HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(mmsi);
            if (sessionIdClientCache == null) {
                sessionIdClientCache = new HashMap<>();
            }
            sessionIdClientCache.put(sessionId, socketIOClient);
            concurrentHashMap.put(mmsi, sessionIdClientCache);
        }
    }

    /**
     * 根據用戶ID獲取所有通道信息
     *
     * @param mmsi
     * @return
     */
    public HashMap<UUID, SocketIOClient> getMmsiClient(String mmsi) {
        return concurrentHashMap.get(mmsi);
    }

    /**
     * 根據用戶ID及頁面sessionID刪除頁面鏈接信息
     *
     * @param mmsi
     * @param sessionId
     */
    public void deleteSessionClient(String mmsi, UUID sessionId) {
        concurrentHashMap.get(mmsi).remove(sessionId);
    }
}
import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 類描述
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 16:33
 */
@Slf4j
@Component
public class SocketConnection {

    @Resource
    private ClientCache clientCache;

    /**
     * 客戶端連接
     *
     * @param client
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi");
        UUID sessionId = client.getSessionId();
        clientCache.saveClient(mmsi, sessionId, client);

        log.info("客戶端:" + mmsi + "|" + sessionId + "已連接");
    }

    /**
     * 客戶端斷開
     *
     * @param client
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        String mmsi = client.getHandshakeData().getSingleUrlParam("mmsi");
        if (StrUtil.isNotBlank(mmsi)) {
            UUID sessionId = client.getSessionId();
            clientCache.deleteSessionClient(mmsi, sessionId);

            log.info("客戶端:" + mmsi + "|" + client.getSessionId() + "已離線");
        }
    }
}
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * Socket 服務器
 *
 * @author: Fred
 * @email 453086@qq.com
 * @create: 2021-07-20 15:43
 */
@Slf4j
@Component
@Order(1)
public class SocketServer implements CommandLineRunner {

    /**
     * socketIOServer
     */
    private final SocketIOServer socketIOServer;

    @Autowired
    public SocketServer(SocketIOServer socketIOServer) {
        this.socketIOServer = socketIOServer;
    }

    @Override
    public void run(String... args) {
        socketIOServer.start();
    }
}

6. 服務端發消息給客戶端

public class SocketController {

    @Resource
    private ClientCache clientCache;

    @PostMapping("/test1")
    public void test() {
        HashMap<UUID, SocketIOClient> userClient = clientCache.getMmsiClient("2222");
        userClient.forEach((uuid, socketIOClient) -> {
            //向客戶端推送消息
            socketIOClient.sendEvent("event", "服務端推送消息");
        });
    }

}

 

————————————————————————————————————————————————————————————————————————————————————————

客戶端

1. 添加依賴

<dependency>
    <groupId>io.socket</groupId>
    <artifactId>socket.io-client</artifactId>
    <version>1.0.0</version>
</dependency>

2. 客戶端監聽

String url = "http://127.0.0.1:9093";
        try {
            IO.Options options = new IO.Options();
            options.transports           = new String[]{"websocket"};
            options.reconnectionAttempts = 2;
            // 失敗重連的時間間隔
            options.reconnectionDelay = 1000;
            // 連接超時時間(ms)
            options.timeout = 500;
            // mmsi: 唯一標識 傳給服務端存儲
            final Socket socket = IO.socket(url + "?mmsi=2222", options);

            socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));

            // 自定義事件`connected` -> 接收服務端成功連接消息
            socket.on("connected", objects -> log.debug("服務端:" + objects[0].toString()));

            // 自定義事件`push_data_event` -> 接收服務端消息
            socket.on("push_data_event", objects -> log.debug("服務端:" + objects[0].toString()));

            // 自定義事件`myBroadcast` -> 接收服務端廣播消息
            socket.on("myBroadcast", objects -> log.debug("服務端:" + objects[0].toString()));

            socket.connect();

            socket.emit("push_data_event", "發送數據 " + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM