Netty 之 Socket通信


1.導入pom依賴

  <dependencies>

    ......

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.39.Final</version>
    </dependency>
  </dependencies>

 

 

2.服務端

  a.創建服務端啟動類

public class StartServer {

    //端口號
    private int port;

    public StartServer(int port) {
        this.port = port;
    }

    //啟動方法
    public void start() throws Exception {
        //負責接收客戶端的連接的線程。線程數設置為1即可,netty處理鏈接事件默認為單線程,過度設置反而浪費cpu資源
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //負責處理數據傳輸的工作線程。線程數默認為CPU核心數乘以2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            //在ServerChannelInitializer中初始化ChannelPipeline責任鏈,並添加到serverBootstrap中
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) {
                    //添加編解碼
                    channel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                    channel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                    channel.pipeline().addLast("socketHandler", new SocketHandler());
                }
            });
            //標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //是否啟用心跳保活機制
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

            //綁定端口后,開啟監聽
            ChannelFuture future = bootstrap.bind(port).sync();
            //等待服務監聽端口關閉
            future.channel().closeFuture().sync();
        } finally {
            //釋放資源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    //測試代碼
    public static void main(String[] args) {
        try {
            int port = 8080;
            new StartServer(port).start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 

  b.創建Socket處理器

public class SocketHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 客戶端發消息會觸發
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("[" + this.getIP(ctx) + "]收到消息:" + msg);
        ClientManager.getInstance().handleMsg(this.getIP(ctx), "This is response");
    }

    /**
     * 客戶端連接會觸發
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //添加channel信息
        ClientManager.getInstance().putChannel(this.getIP(ctx), ctx.channel());
        System.out.println("[" + this.getIP(ctx) + "]已連接。。。");
    }

    /**
     * 客戶端斷開連接會觸發
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //刪除失效的channel
        ClientManager.getInstance().removeChannel(getIP(ctx));
        ctx.close();
        System.out.println("[" + this.getIP(ctx) + "]已斷開。。。");
    }

    /**
     * 發生異常觸發
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
        System.out.println("[" + this.getIP(ctx) + "]發生異常:" + t);
        ctx.close();
    }

    /**
     * 獲取IP地址
     */
    private String getIP(ChannelHandlerContext ctx) {
        String socketString = ctx.channel().remoteAddress().toString();
        int index = socketString.indexOf(":");
        String ip = socketString.substring(1, index);
        return ip;
    }

}

 

  c.創建客戶端管理類

public class ClientManager {

    private static ClientManager instance = new ClientManager();
    private ClientManager(){}
    public static ClientManager getInstance(){
        return instance;
    }

    //IP與信道的對應關系
    private Map<String, Channel> channelMap = new ConcurrentHashMap<>();

    //添加信道
    public void putChannel(String ip, Channel channel){
        this.channelMap.put(ip, channel);
    }

    //刪除信道
    public void removeChannel(String ip){
        this.channelMap.remove(ip);
    }

    //發送消息
    public void sendMsg(String ip, String msg){
        Channel channel = this.channelMap.get(ip);
        if(channel != null){
            channel.writeAndFlush(msg);
        }
    }

    //處理消息
    public void handleMsg(String ip, String msg){
        this.sendMsg(ip, msg);
    }

}

 

 

3.客戶端

  a.創建客戶端啟動類

public class StartClient {

    //主機名/IP
    private String host;
    //端口號
    private int port;

    public StartClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    //啟動方法
    public void start() throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline().addLast("decoder", new StringDecoder());
                    channel.pipeline().addLast("encoder", new StringEncoder());
                    channel.pipeline().addLast(new SimpleChannelInboundHandler<String>(){
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                            System.out.println("收到響應:" + msg);
                        }
                    });
                }
            });

            //建立連接
            ChannelFuture future = bootstrap.connect(host, port).sync();
            //發送消息
            future.channel().writeAndFlush("This is Request");
            //等待服務監聽端口關閉
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    //測試代碼
    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 8080;
            new StartClient(host, port).start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 

 

4.參考文章

  https://www.cnblogs.com/happy2010/p/10895209.html

  https://www.jianshu.com/p/b60180a0a0e6


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM