Java網絡編程6:Netty的基本使用


一、Netty開發環境搭建

開發工具:idea2017

首先在Netty官網(http://netty.io )上下載最新的jar包 ,然后解壓,找到 netty-all-版本號.Final.jar,將其復制到,idea新建的javase項目的lib文件夾下,並右鍵添加到library。

然后就可以在本機上開發客戶端和服務端程序。主要的功能就是,客戶端訪問服務器,服務器返回當前時間。

二、服務端開發

兩個類,TimeServer是配置、運行的,TimeServerHandler是用來處理數據的讀寫邏輯。

TimeServer類:

package src.com.sj.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by Administrator on 2017/5/16.
 */
public class TimeServer {
    public static void main(String[] args) {
        int port = 8088;
        new TimeServer().bind(port);
    }

    public void bind(int port) {//bind用來配置Netty

        EventLoopGroup bossGroup = new NioEventLoopGroup();//線程組(池),用來服務端接受客戶端連接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//線程組(池),用來SocketChannel 網絡讀寫
        try {
            ServerBootstrap b = new ServerBootstrap();// ServerBootstrap 是 Netty 用於啟動 NIO 服務端的輔助啟動類,用於降低開發難度
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//創建channel
                    .option(ChannelOption.SO_BACKLOG, 1024)//配置TCP參數
                    .childHandler(new ChildChannelHandler());//綁定處理類

            //服務器啟動輔助類配置完成后,調用 bind 方法綁定監聽端口,調用 sync 方法同步等待綁定操作完成
            ChannelFuture f = b.bind(port).sync();

            System.out.println(Thread.currentThread().getName() + ",服務器開始監聽端口,等待客戶端連接.........");

            //下面會進行阻塞,等待服務器連接關閉之后 main 方法退出,程序結束
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new TimeServerHandler());//把處理類添加到pipeline通道中
        }
    }
}

TimeServerHandler:

package src.com.sj.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {//需要繼承ChannelInboundHandlerAdapter,用來處理數據

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //將 msg 轉為 Netty 的 ByteBuf 對象,類似 JDK 中的 java.nio.ByteBuffer,不過 ButeBuf 功能更強,更靈活
        ByteBuf buf = (ByteBuf) msg;

        //Encode過程
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);

        //Decode過程
        String body = new String(reg, "UTF-8");
        System.out.println(Thread.currentThread().getName() + ",The server receive  order : " + body);

        /**回復消息
         * copiedBuffer:創建一個新的緩沖區,內容為里面的參數
         * 通過 ChannelHandlerContext 的 write 方法將消息異步發送給客戶端
         * */
        String respMsg = "I am Server,消息接收 success!";
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        ctx.write(respByteBuf);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        /**flush:將消息發送隊列中的消息寫入到 SocketChannel 中發送給對方,為了頻繁的喚醒 Selector 進行消息發送
         * Netty 的 write 方法並不直接將消息寫如 SocketChannel 中,調用 write 只是把待發送的消息放到發送緩存數組中,再通過調用 flush
         * 方法,將發送緩沖區的消息全部寫入到 SocketChannel 中
         * */
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**當發生異常時,關閉 ChannelHandlerContext,釋放和它相關聯的句柄等資源 */
        ctx.close();
    }
}

 

三、客戶端

同服務器類似,也是兩個類,TimeClient是用來配置、運行的,TimeClientHandler是用來處理數據的讀寫邏輯的。

TimeClient:

package src.com.sj.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {

    /**
     * 使用 3 個線程模擬三個客戶端
     */
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new Thread(new MyThread()).start();
        }
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            connect("127.0.0.1", 8088);
        }

        public void connect(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup();//線程組(池),用來連接服務端
            try {
                /**Bootstrap 與 ServerBootstrap 都繼承(extends)於 AbstractBootstrap
                 * 創建客戶端輔助啟動類,並對其配置,與服務器稍微不同,這里的 Channel 設置為 NioSocketChannel
                 * 然后為其添加 Handler,這里直接使用匿名內部類,實現 initChannel 方法
                 * 作用是當創建 NioSocketChannel 成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件*/
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });

                //connect:發起異步連接操作,調用同步方法 sync 等待連接成功
                ChannelFuture channelFuture = b.connect(host, port).sync();
                System.out.println(Thread.currentThread().getName() + ",客戶端發起異步連接..........");

                //等待客戶端鏈路關閉
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //優雅退出,釋放NIO線程組
                group.shutdownGracefully();
            }
        }
    }
}

TimeClientHandler:

package src.com.sj.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.logging.Logger;


public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());

    /**
     * 當客戶端和服務端 TCP 鏈路建立成功之后,Netty 的 NIO 線程會調用 channelActive 方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String reqMsg = "我是客戶端 " + Thread.currentThread().getName();
        byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
        ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
        /**
         * writeBytes:將指定的源數組的數據傳輸到緩沖區
         * 調用 ChannelHandlerContext 的 writeAndFlush 方法將消息發送給服務器
         */
        reqByteBuf.writeBytes(reqMsgByte);
        ctx.writeAndFlush(reqByteBuf);
    }

    /**
     * 當服務端返回應答消息時,channelRead 方法被調用,從 Netty 的 ByteBuf 中讀取並打印應答消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);
        ctx.close();
    }

    /**
     * 當發生異常時,打印異常 日志,釋放客戶端資源
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**釋放資源*/
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM