SpringBoot整合Netty實現socket通訊簡單demo


 

 

依賴

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

 

還用到了

   <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

 

ChatDto.java
import lombok.Data;
import lombok.experimental.Accessors;

/**
 * 傳輸實體類
 */
@Data
@Accessors(chain = true)
public class ChatDto {

    /**
     * 客戶端ID 唯一
     */
    private String clientId;

    /**
     * 發送的消息
     */
    private String msg;
}

 

 

NettyChannelMap.java

import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 存放連接的channel對象
 */
public class NettyChannelMap {
    private static Map<String, SocketChannel> map = new ConcurrentHashMap<String, SocketChannel>();

    public static void add(String clientId, SocketChannel socketChannel) {
        map.put(clientId, socketChannel);
    }

    public static Channel get(String clientId) {
        return map.get(clientId);
    }

    public static void remove(SocketChannel socketChannel) {
        for (Map.Entry entry : map.entrySet()) {
            if (entry.getValue() == socketChannel) {
                map.remove(entry.getKey());
            }
        }
    }
}

 

NettyTcpServerBootstrap.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * 服務端啟動類
 */
public class NettyTcpServerBootstrap {
    private int port;
    private SocketChannel socketChannel;

    public NettyTcpServerBootstrap(int port) throws InterruptedException {
        this.port = port;
    }

    public void start() throws InterruptedException {
        /**
         * 創建兩個線程組 bossGroup 和 workerGroup
         * bossGroup 只是處理連接請求,真正的和客戶端業務處理,會交給 workerGroup 完成
         *  兩個都是無線循環
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //創建服務器端的啟動對象,配置參數
            ServerBootstrap bootstrap = new ServerBootstrap();
            //設置兩個線程組
            bootstrap.group(bossGroup, workerGroup)
                    //使用NioServerSocketChannel 作為服務器的通道實現
                    .channel(NioServerSocketChannel.class)
                    //設置線程隊列得到連接個數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //設置保持活動連接狀態
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //通過NoDelay禁用Nagle,使消息立即發出去,不用等待到一定的數據量才發出去
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //可以給 bossGroup 加個日志處理器
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //給workerGroup 的 EventLoop 對應的管道設置處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //給pipeline 設置處理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new ObjectEncoder());
                            p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                            p.addLast(new NettyServerHandler());
                        }
                    });

            //啟動服務器並綁定一個端口並且同步生成一個 ChannelFuture 對象
            ChannelFuture cf = bootstrap.bind(port).sync();
            if (cf.isSuccess()) {
                System.out.println("socket server start---------------");
            }

            //對關閉通道進行監聽
            cf.channel().closeFuture().sync();
        } finally {
            //發送異常關閉
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

        }
    }


}

 

 

 

 

NettyServerHandler.java

import com.alibaba.fastjson.JSON;
import com.example.netty.dto.ChatDto;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {


    /**
     * 定義一個channel組管理所有channel
     * GlobalEventExecutor.INSTANCE 是一個全局事件執行器 是一個單例
     */
    private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 標識 channel處於活動狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    /**
     * 表示連接建立 第一個被執行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        channelGroup.add(ctx.channel());

        /**
         *  該方法會將 channelGroup 中所有的channel 遍歷一遍然后發送消息 不用我們自己遍歷
         *   這里只是做個說明 不用
         */
      //  channelGroup.writeAndFlush("發送所有給所有channel");
    }

    /**
     * 斷開連接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }

    /**
     * 標識channel處於非活動狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyChannelMap.remove((SocketChannel) ctx.channel());
    }


    /**
     * 服務端 接收到 客戶端 發的數據
     * @param context
     * @param obj
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {

        log.info(">>>>>>>>>>>服務端接收到客戶端的消息:{}",obj);

        SocketChannel socketChannel = (SocketChannel) context.channel();
        ChatDto dto = JSON.parseObject(obj.toString(), ChatDto.class);
        /**
         * 客戶端ID
         */
        String clientId = dto.getClientId();

        if (clientId == null) {
            /**
             * 心跳包處理
             */
            ChatDto pingDto=new ChatDto();
            pingDto.setMsg("服務端收到心跳包,返回響應");
            socketChannel.writeAndFlush(JSON.toJSONString(pingDto));
            return;
        }

        Channel channel = NettyChannelMap.get(clientId);

        if (channel==null){
            /**
             * 存放所有連接客戶端
             */
            NettyChannelMap.add(clientId, socketChannel);
            channel=socketChannel;
        }


        /**
         * 服務器返回客戶端消息
         */
        ChatDto returnDto=new ChatDto();
        returnDto.setClientId(clientId).setMsg("我是服務端,收到你的消息了");

        channel.writeAndFlush(JSON.toJSONString(returnDto));


        /**
         * 在這里可以設置異步執行 提交任務到該channel的taskQueue 中
         */

        context.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10*1000);
                    log.info(">>>>>>>>>休眠十秒");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        /**
         * 可以設置多個異步任務
         * 但是這個會在上面異步任務執行完之后才執行
         */
        context.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10*1000);
                    log.info(">>>>>>>>>休眠二十秒");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });


        ReferenceCountUtil.release(obj);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

 

 

 

客戶端

NettyClientBootstrap.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;


/**
 * 客戶端啟動類
 */
public class NettyClientBootstrap {
    private int port;
    private String host;
    public SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);

    public NettyClientBootstrap(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }

    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .remoteAddress(host, port)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            /**
                             *  0 表示禁用
                             * readerIdleTime讀空閑超時時間設定,如果channelRead()方法超過readerIdleTime時間未被調用則會觸發超時事件調用userEventTrigger()方法;
                             *
                             * writerIdleTime寫空閑超時時間設定,如果write()方法超過writerIdleTime時間未被調用則會觸發超時事件調用userEventTrigger()方法;
                             *
                             * allIdleTime所有類型的空閑超時時間設定,包括讀空閑和寫空閑;
                             */
                            socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0));
                            socketChannel.pipeline().addLast(new ObjectEncoder());
                            socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            if (future.isSuccess()) {
                socketChannel = (SocketChannel) future.channel();
                System.out.println("connect server  成功---------");
            }
            //給關閉通道進行監聽
            future.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

}

 

 

NettyClientHandler.java

import com.alibaba.fastjson.JSON;
import com.example.netty.dto.ChatDto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>連接");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>退出");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    /**
                     *  利用寫空閑發送心跳檢測消息
                     */
                    ChatDto pingDto=new ChatDto();
                    pingDto.setMsg("我是心跳包");
                    ctx.writeAndFlush(JSON.toJSONString(pingDto));
                    log.info("send ping to server----------");
                    break;
                default:
                    break;
            }
        }
    }


    /**
     * 客戶端接收到服務端發的數據
     * @param channelHandlerContext
     * @param obj
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj)  {

        log.info(">>>>>>>>>>>>>客戶端接收到消息:{}", obj);


        ReferenceCountUtil.release(obj);
    }


    /**
     * socket通道處於活動狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>socket建立了");
        super.channelActive(ctx);
    }


    /**
     * socket通道不活動了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>socket關閉了");
        super.channelInactive(ctx);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

 

 

SprigBoot啟動類添加服務端啟動代碼

@SpringBootApplication
public class NettyApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyApplication.class, args);


        try {
            NettyTcpServerBootstrap bootstrap = new NettyTcpServerBootstrap(9999);
            bootstrap.start();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("server socket 啟動失敗");
        }
    }

}

 

ChatController.java

import com.alibaba.fastjson.JSON;
import com.example.netty.dto.ChatDto;
import com.example.netty.socket.NettyClientBootstrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * 客戶端消息發送控制器
 */
@RestController
@Slf4j
public class ChatController {

    private static String clientId=UUID.randomUUID().toString();


    public static NettyClientBootstrap bootstrap;

    /**
     * 發送消息demo
     * @param msg
     */
    @PostMapping(value = "/send")
    public void send(String msg) {
        if (bootstrap == null) {
            try {
                /**
                 * 連接 輸入服務器的端口和ip
                 */
                bootstrap = new NettyClientBootstrap(9999, "localhost");
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.error(">>>>>>>>> server socket 連接失敗");
            }
        }
        /**
         *   發送消息
         */
        ChatDto dto=new ChatDto();
        dto.setClientId(clientId).setMsg(msg);
        /**
         * json字符串發送
         */
        bootstrap.socketChannel.writeAndFlush(JSON.toJSONString(dto));

    }
}

 

訪問

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM