[經驗] Java 使用 netty 框架, 向 Unity 客戶端的 C# 實現通信 [1]


這是一個較為立體的思路吧

首先是技術選型:

前端    : HTML5 + jQuery ,簡單暴力, 不解釋

服務端 : Spring Boot + Netty + Redis/Cache

客戶端 : Unity3D + C#

所要實現的效果為:

服務端啟動后, 開啟端口監聽, 然后客戶端啟動, 連接上服務端, 再由前端將數據請求發送到服務端, 服務端再發送到客戶端

為了方便(懶), 所以使用 netty 4.x 作為主要的通訊框架, 由於 5.X好像已經被官方放棄了, 所以我就使用最新版的 

在 pom.xml 處添加 netty4.x 的依賴

        <!-- netty 通信框架 https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.0.39.Final</version>
        </dependency>

        <!-- netty websocket 通訊框架依賴 -->
        <dependency>
            <groupId>org.yeauty</groupId>
            <artifactId>netty-websocket-spring-boot-starter</artifactId>
            <version>0.8.0</version>
        </dependency

老規矩, 從服務端開始, 先創建 netty 的服務端程序

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.http.client.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/*
 *@Description //TODO NIO 服務端$
 *@Author 吾王劍鋒所指 吾等心之所向
 *@Date 2019/8/27 19:18
 */
public class NettyServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);//默認端口
    private Integer defaultPort = 5566;
    public void bind(Integer port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup master = new NioEventLoopGroup();
        EventLoopGroup servant = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(master, servant).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將響應請求消息解碼為 HTTP 消息
                            socketChannel.pipeline().addLast("http-codec", new HttpServerCodec());
                            //將HTTP消息的多個部分構建成一條完整的 HTTP 消息
                            socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(2048));
                            //向客戶端發送 HTML5 文件
                            socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            //設置心跳檢測
                            socketChannel.pipeline().addLast(new IdleStateHandler(60, 30, 60*30, TimeUnit.SECONDS));
                            //配置通道, 進行業務處理
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_KEEPALIVE, true) // 2小時無數據激活心跳機制
                    .childHandler(new ServerChannelInitializer());


            if(null==port) port=this.defaultPort; // 服務器異步創建綁定

            ChannelFuture future = bootstrap.bind(port).sync();
            logger.info("服務啟動:"+ DateUtils.formatDate(new Date()));
            future.channel().closeFuture().sync();  // 關閉服務器通道
        } finally {
            logger.info("服務停止:"+ DateUtils.formatDate(new Date()));
            // 釋放線程池資源
            master.shutdownGracefully();
            servant.shutdownGracefully();
        }
    }
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/*
 *@Description //TODO nio 服務端實現$
 *@Author 吾王劍鋒所指 吾等心之所向
 *@Date 2019/8/27 19:20
 */
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new LineBasedFrameDecoder(10010));
        pipeline.addLast( new StringDecoder());
        pipeline.addLast( new StringEncoder());
        pipeline.addLast("handler", new NettyServerHandler());
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;

import org.apache.http.client.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/*
 *@Description //TODO 服務業務實現$
 *@Author 吾王劍鋒所指 吾等心之所向三
 *@Date 2019/8/28 9:50
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
    private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);

    private static final String URI = "websocket";
    private WebSocketServerHandshaker handshaker;

    /**
     * 讀取客戶端發來的數據
     *
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        String[] data = msg.toString().split("id=");
        if(data != null && data.length > 1) {
            String[] data1 = data[1].split(";");
            String id = data1[0];
            if (NettyServer.map.get(id) != null && NettyServer.map.get(id).equals(ctx)) { //不是第一次連接
                LOGGER.info("接收數據成功!" + DateUtils.formatDate(new Date()));
            } else { //如果map中沒有此ctx 將連接存入map中
                NettyServer.map.put(id, ctx);
                LOGGER.info("連接成功,加入map管理連接!"+"mn:" +id+" : "+ctx+""+ DateUtils.formatDate(new Date()));
            }
        }else{
            LOGGER.info("不是監測數據"+ msg.toString()+" : "+ DateUtils.formatDate(new Date()));
        }
        ctx.writeAndFlush("Received your message : " + msg.toString());
    }


    /**
     *  讀取完畢客戶端發送過來的數據之后的操作
     * */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("服務端接收數據完畢..");
        ctx.channel().write("call ------"); //向客戶端發送一條信息
        ctx.channel().flush();
    }

    /**
     * 客戶端主動斷開服務端的鏈接,關閉流
     * */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().localAddress().toString() + " 通道不活躍!");
        removeChannelMap(ctx);
        ctx.close(); // 關閉流
    }

    /**
     * 客戶端主動連接服務端 連接成功時向客戶端發送一條信息
     *
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("RemoteAddress"+ ctx.channel().remoteAddress() + " active !");
        LOGGER.info("msg send active !"+ctx.channel().writeAndFlush("123456"));
        ctx.writeAndFlush("啦啦啦!");super.channelActive(ctx);
    }

    /**
     * 異常處理
     *
     * */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        LOGGER.error("連接異常,連接異常:"+ DateUtils.formatDate(new Date())+cause.getMessage(), cause);
        ctx.fireExceptionCaught(cause);
        removeChannelMap(ctx);
        ctx.close();
    }

    /**
     *刪除map中ChannelHandlerContext
     *
     *  */
    private void removeChannelMap(ChannelHandlerContext ctx){
        for( String key :NettyServer.map.keySet()){
            if( NettyServer.map.get(key)!=null &&  NettyServer.map.get(key).equals( ctx)){
                NettyServer.map.remove(key);
            }
        }
    }

    /**
     * 收發消息處理
     *
     * */
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception{
        if(msg instanceof HttpRequest){
            doHandlerHttpRequest(ctx, (HttpRequest) msg);
        }else if(msg instanceof HttpRequest){
            doHandlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 進行心跳檢測, 保證用戶在線
     *
     *
     * */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            PingWebSocketFrame ping = new PingWebSocketFrame();
            switch (stateEvent.state()){
                case READER_IDLE: //讀空閑 服務器端
                    LOGGER.info("{["+ctx.channel().remoteAddress()+"]--->(服務端 read 空閑)}");
                    ctx.writeAndFlush(ping);
                    break;
                case WRITER_IDLE: //寫空閑 服務器端
                    LOGGER.info("{["+ctx.channel().remoteAddress()+"]--->(服務端 write 空閑)}");
                    ctx.writeAndFlush(ping);
                    break;
                case ALL_IDLE: //讀寫空閑 服務器端
                    LOGGER.info("{["+ctx.channel().remoteAddress()+"]--->(服務端 讀寫 空閑)}");
            }
        }
    }

    /**
     *  websocket 消息處理
     *
     * */
    protected void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg){
        if(msg instanceof CloseWebSocketFrame){ //判斷 msg 是哪一種類型, 分別作出不同的反應
            LOGGER.info("[{---關閉---}]");
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg);
            return;
        }

        if(msg instanceof PingWebSocketFrame){
            LOGGER.info("[{---ping}]");
            PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain());
            ctx.channel().writeAndFlush(pong);
            return;
        }

        if(!(msg instanceof TextWebSocketFrame)){
            LOGGER.info("[{!!----不支持二進制-----!!}]");
        }
    }

    /**
     * websocket 第一次握手
     *
     * */
    public void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg){
        //http 解碼失敗
        if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){
            sendHttpResponse(ctx, (FullHttpRequest) msg,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                            HttpResponseStatus.BAD_REQUEST));
        }

        //可以獲取 msg 的URI來判斷
        String uri = msg.getUri();
        if(!uri.substring(1).equals(URI)){
            ctx.close();
        }
        ctx.attr(AttributeKey.valueOf("type")).set(uri);

        //通過 URI 獲取其他參數驗證
        WebSocketServerHandshakerFactory factory =
                new WebSocketServerHandshakerFactory(
                        "ws://"+msg.headers().get("Host")+"/"+URI+"",
                        null,
                        false);
        handshaker = factory.newHandshaker(msg);
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        }

        //進行連接
        handshaker.handshake(ctx.channel(), (FullHttpRequest) msg);

    }

    /**
     * 返回應答給客戶端
     *
     * */
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
        if(res.getStatus().code() != 200){
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }

        // 如果是非 keep-alive , 關閉連接
        ChannelFuture cf = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            cf.addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * 斷開連接
     *
     * */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
        LOGGER.info("handlerRemoved ---->"+ctx.channel());
    }

}

然后再在系統啟動文件的地方開啟 啟動netty 服務的 線程就可以

import cn.gzserver.basics.network.netty.NettyServer;
import cn.gzserver.basics.network.socket.SocketServer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;

/*@ComponentScan*/
@EnableScheduling
@SpringBootApplication
@EnableDiscoveryClient
public class GzserverApplication {

    public static void main(String[] args) {
        SpringApplication.run(GzserverApplication.class, args);

        //啟動 socket 服務, 接收客戶端發送連接請求, 並返回數據
        /*SocketServer socketServer = new SocketServer();
        socketServer.start();*/

        //開啟 netty 服務
        new Thread(() -> {
            try {
                new NettyServer().bind(5566);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }


}

然后呢, 客戶端的配置基本上沒有改變, 可以參考我前面寫的一篇博客作為參考就行

https://www.cnblogs.com/unityworld/p/11345431.html

但是,還有一些問題, 會在下一篇文章中說明


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM