本文章為作者原創,有問題的地方請提出指正。
1、類繼承Diagram
2、定義EndPoint類
目前僅僅定義了2個方法,分別用來獲取本地或遠程服務器的地址。
package netty; import java.net.InetSocketAddress; /** * @author xfyou * @date 2019/8/28 */ public interface EndPoint { /** * Return the local Inet address * * @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code> * if this <code>EndPoint</code> does not represent a network connection. */ InetSocketAddress getLocalAddress(); /** * Return the remote Inet address * * @return The remote Inet address to which this <code>EndPoint</code> is bound, or <code>null</code> * if this <code>EndPoint</code> does not represent a network connection. */ InetSocketAddress getRemoteAddress(); }
2、定義AbstractClass類
主要是定義幾個抽象方法:
- doOpen - 創建引導類Bootstrap;
- doConnect - 創建Channel並連接遠程服務器;
- getChannel - 獲取已創建的Channel
另外,提供了2個公共的方法給外部調用:
- send - 發送消息(OutBound)
- receive - 接收消息 (InBound)
內部私有的write()方法。write方法負責在connect成功后,把消息寫到遠程peer。翻閱源碼,我們可以看到如下的調用棧:
- channel.writeAndFlush
- pipeline.writeAndFlush (pipleline為channel實例所關聯的pipleline實例)
- AbstractChannelHandlerContext.writeAndFlush (每個ChannelHanlder都有一個對應的ChannelHandlerContext,可以從這個ChannelHanlderConext獲取Channel、ChannelHanlder和ChannelPipeline)
- AbstractChannelHandlerContext.write(在這個方法里面有一個executor.inEventLoop()的判斷,這個地方很重要,它主要是判斷當前線程是否是EventLoop分配的線程,如果是則直接使用EventLoop分配的線程執行,否則會將當前要執行的任務封裝成一個Task,然后塞到一個LinkedBlockQueue里面去等待后續的調度執行。這樣做的目的主要是就是把用戶線程的操作封裝成Task放入隊列,統一由I/O線程來處理)
- AbstractChannelHandlerContext.writeAndFlush (每個ChannelHanlder都有一個對應的ChannelHandlerContext,可以從這個ChannelHanlderConext獲取Channel、ChannelHanlder和ChannelPipeline)
- pipeline.writeAndFlush (pipleline為channel實例所關聯的pipleline實例)
package netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.NotImplementedException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author xfyou * @date 2019/8/29 */ @Slf4j @RequiredArgsConstructor abstract class AbstractClient implements EndPoint { @NonNull private String hostName; @NonNull private int port; @NonNull @Getter(value = AccessLevel.PROTECTED) private int connectionTimeout; protected final CountDownLatch countDownLatch = new CountDownLatch(1); protected String respMsg; @SneakyThrows public void send(Object message) { doOpen(); doConnect(); write(message); } @SneakyThrows public String receive() { boolean b = countDownLatch.await(getConnectionTimeout(), TimeUnit.MILLISECONDS); if (!b) { log.error("Timeout(" + getConnectionTimeout() + "ms) when receiving response message"); } return respMsg; } private void write(Object message) { Channel channel = getChannel(); if (null != channel) { ChannelFuture f = channel.writeAndFlush(byteBufferFrom(message)).syncUninterruptibly(); if (!f.isSuccess()) { log.error("Failed to send message to " + getRemoteAddress() + f.cause().getMessage()); } } } private ByteBuf byteBufferFrom(Object message) { return message instanceof String ? Unpooled.copiedBuffer((String) message, StandardCharsets.UTF_8) : Unpooled.copiedBuffer((byte[]) message); } @Override public InetSocketAddress getRemoteAddress() { return new InetSocketAddress(hostName, port); } @Override public InetSocketAddress getLocalAddress() { throw new NotImplementedException("This method is not need to be implemented"); } /** * Open client. * * @throws Throwable */ protected abstract void doOpen() throws Throwable; /** * Connect to server. * * @throws Throwable */ protected abstract void doConnect() throws Throwable; /** * Get the connected channel. * * @return channel */ protected abstract Channel getChannel(); }
4、定義NettyClient類
NettyClient類繼承了AbstractClient類,主要是實現了doOpen、doConnect、getChannel類;同時實現了一個自定義的ChannelHander用來在ChannelActive時獲取Channel以及有消息返回時讀取消息。
- doOpen方法的實現。創建引導類並在引導類上注冊相關屬性;
- 注冊NioEventLoopGroup,基於java NIO傳輸的一個線程池,線程池的默認大小為:CPU核數*2。當一個新的Channel被創建后,Netty會從這個NioEventLoopGroup中選擇一個線程來為此Channel創建一個關聯的EventLoop(用來監聽關聯Channel的所有的I/O事件,比如連接、斷開連接、讀、寫等);
- 注冊NioSocketChannel類類型,這個類型說明將要創建的Channel的實例的類型,客戶端為:NioSocketChannel,服務器端為:NioServerSocketChannel;Bootstrap會根據這個class來創建一個BootstrapChannelFactory<NioSocketChannel>實例(Channel工廠類,用於將來在connect時創建Channel);
- 設置相關Option選項
- 注冊自定義的ChannelHandler,這些ChannelHandler會被注冊到與Channel相關聯的ChannelPipleline中,用來攔截消息並做相應的處理。
- doConnect方法的實現。通過已創建的Channel來連接到遠程服務器。前面我們已經在Bootstrap中設置的超時時間,所以connect時可以使用忽略線程中斷阻塞的方式去連接,直到超時。connect時會先通過BootstrapChannelFactory<NioSocketChannel>來創建一個NioSocketChannel實例,並把這個NioSocketChannel實例注冊到NioEventGroup中去(從線程池中按某種算法選擇一個EventLoop來和當前的Channel建立對應關系,可以是1:N,即一個EventLoop可以對應多個Channel )。EventLoop同時也是一個EventLoopExecutor,EventLoop和Channel對應起來后就可以處理所有這個Channel的I/O操作了。一句話,某個Channel的所有I/O操作都是線程池(NioEventGroup)中的某個I/O線程(EventLoopExecutor)來異步處理的。
package netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; /** * @author xfyou * @date 2019/8/28 */ @Slf4j public class NettyClient extends AbstractClient { private Bootstrap bootstrap; private volatile Channel channel; private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); public NettyClient(String hostName, int port, int connectionTimeout) { super(hostName, port, connectionTimeout); } private class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); channel = ctx.channel(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { try { respMsg = msg.toString(StandardCharsets.UTF_8); } finally { countDownLatch.countDown(); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("An exception was thrown, cause:" + cause.getMessage()); ctx.close(); } } @Override protected void doOpen() throws Throwable { bootstrap = new Bootstrap(); bootstrap .group(NIO_GROUP) .remoteAddress(getRemoteAddress()) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectionTimeout()) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); } @Override public void doConnect() { ChannelFuture f = bootstrap.connect().syncUninterruptibly(); if (!f.isSuccess() && null != f.cause()) { log.error("The client failed to connect the server:" + getRemoteAddress() + ",error message is:" + f.cause().getMessage()); } } @Override protected Channel getChannel() { return channel; } }
5、測試類
package netty; import lombok.SneakyThrows; /** * Test * * @author xfyou */ public class Test { @SneakyThrows public static void main(String[] args) { NettyClient client = new NettyClient("127.0.0.1", 8080, 45000); client.send("aaa".getBytes()); // maybe do something else System.out.println(client.receive()); } }