netty分布式服務如何實現
在說nettty分布式之前,首先說下,netty是基於nio編程的,如果大家對nio不熟悉還是先看下nio相關的知識。
netty的線程模型和核心組件
1:netty的線程模型
netty通過Reactor模型基於多路復用器接收並處理用戶請求(能講就多講一點),內部實現了兩個線程池,boss線程池和work線程池,其中boss線程池的線程負責處理請求的accept事件,當接收到accept事件的請求時,把對應的socket封裝到一個NioSocketChannel中,並交給work線程池,其中work線程池負責請求的read和write事件
2.Netty核心組件
Bootstrap和ServerBootstrap:Netty應用程序通過設置bootstrap引導類來完成,該類提供了一個用於應用程序網絡層配置的容器。Bootstrap服務端的是ServerBootstrap,客戶端的是Bootstrap。
Channel:Netty 中的接口 Channel 定義了與 socket 豐富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。
ChannelHandler:ChannelHandler 支持很多協議,並且提供用於數據處理的容器,ChannelHandler由特定事件觸發, 常用的一個接口是ChannelInboundHandler,該類型處理入站讀數據(socket讀事件)。
ChannelPipeline:ChannelPipeline 提供了一個容器給 ChannelHandler 鏈並提供了一個API 用於管理沿着鏈入站和出站事件的流動。每個 Channel 都有自己的ChannelPipeline,當 Channel 創建時自動創建的。 下圖說明了ChannelHandler和ChannelPipeline二者的關系:
EventLoop:EventLoop 用於處理 Channel 的 I/O 操作。一個單一的 EventLoop通常會處理多個 Channel 事件。一個 EventLoopGroup 可以含有多於一個的 EventLoop 和 提供了一種迭代用於檢索清單中的下一個。
ChannelFuture:Netty 所有的 I/O 操作都是異步。因為一個操作可能無法立即返回,我們需要有一種方法在以后獲取它的結果。出於這個目的,Netty 提供了接口 ChannelFuture,它的 addListener 方法
Netty 是一個非阻塞、事件驅動的網絡框架。Netty 實際上是使用 Threads( 多線程) 處理 I/O事件的,對於熟悉多線程編程的讀者可能會需要關注同步代碼。這樣的方式不好,因為同步會影響程序的性能,Netty 的設計保證程序處理事件不會有同步。因為某個Channel事件是被添加到一個EventLoop中的,以后該Channel事件都是由該EventLoop來處理的,而EventLoop是一個線程來處理的,也就是說Netty不需要同步IO操作,EventLoop與EventLoopGroup的關系可以理解為線程與線程池的關系一樣。
單機版netty
服務端代碼
public class NettySever { public static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void main(String[] args) throws Exception{ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //獲取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解碼器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入編碼器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的業務處理handler pipeline.addLast(new MyHandler()); }} ) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture sync = serverBootstrap.bind(8888).sync(); ChannelFuture channelFuture = sync.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
客戶端代碼
public class NettyClient { public static void main(String[] args) throws Exception{ NioEventLoopGroup eventLoopGroup=new NioEventLoopGroup(); Bootstrap bootstrap =new Bootstrap(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相關handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定義的handler pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } }); ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync(); Channel channel = sync.channel(); Scanner sc =new Scanner(System.in); while (sc.hasNextLine()){ // System.out.println(sc.next()); channel.writeAndFlush(sc.nextLine()); } }finally { eventLoopGroup.shutdownGracefully(); } } }
自定義handle
public class MyhandlerAdapter extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new MyHandler()); } }
public class MyHandler extends SimpleChannelInboundHandler<String> { // 每一個服務端都可以維護注冊在自己上面的channel,當然有些需要自己去維護,比如上線的時候新增,下線刪除。也可以自定義一個。 public static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 給其他服務端說,某某某上線了 Channel channel = ctx.channel(); ChannelId id = channel.id(); channelGroup.add(channel); System.out.println(channel.remoteAddress()+"上線了"); channelGroup.forEach(c -> { if(channel==c){ c.writeAndFlush("我自己上線了"); }else { c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"上線了"); } }); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 給其他服務端說,某某某上線了 Channel channel = ctx.channel(); channelGroup.forEach(c -> { if(channel==c){ c.writeAndFlush("我自己下線了"); }else { c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"下線了"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); Channel channel = channelHandlerContext.channel(); channelGroup.forEach(c -> { if(channel==c){ c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+