實現Netty分布式解決思路


netty分布式服務如何實現

在說nettty分布式之前,首先說下,netty是基於nio編程的,如果大家對nio不熟悉還是先看下nio相關的知識。

netty的線程模型和核心組件

1:netty的線程模型
netty通過Reactor模型基於多路復用器接收並處理用戶請求(能講就多講一點),內部實現了兩個線程池,boss線程池和work線程池,其中boss線程池的線程負責處理請求的accept事件,當接收到accept事件的請求時,把對應的socket封裝到一個NioSocketChannel中,並交給work線程池,其中work線程池負責請求的read和write事件
在這里插入圖片描述

2.Netty核心組件
Bootstrap和ServerBootstrap:Netty應用程序通過設置bootstrap引導類來完成,該類提供了一個用於應用程序網絡層配置的容器。Bootstrap服務端的是ServerBootstrap,客戶端的是Bootstrap。
Channel:Netty 中的接口 Channel 定義了與 socket 豐富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。
ChannelHandler:ChannelHandler 支持很多協議,並且提供用於數據處理的容器,ChannelHandler由特定事件觸發, 常用的一個接口是ChannelInboundHandler,該類型處理入站讀數據(socket讀事件)。
ChannelPipeline:ChannelPipeline 提供了一個容器給 ChannelHandler 鏈並提供了一個API 用於管理沿着鏈入站和出站事件的流動。每個 Channel 都有自己的ChannelPipeline,當 Channel 創建時自動創建的。 下圖說明了ChannelHandler和ChannelPipeline二者的關系:
在這里插入圖片描述

EventLoop:EventLoop 用於處理 Channel 的 I/O 操作。一個單一的 EventLoop通常會處理多個 Channel 事件。一個 EventLoopGroup 可以含有多於一個的 EventLoop 和 提供了一種迭代用於檢索清單中的下一個。
ChannelFuture:Netty 所有的 I/O 操作都是異步。因為一個操作可能無法立即返回,我們需要有一種方法在以后獲取它的結果。出於這個目的,Netty 提供了接口 ChannelFuture,它的 addListener 方法
Netty 是一個非阻塞、事件驅動的網絡框架。Netty 實際上是使用 Threads( 多線程) 處理 I/O事件的,對於熟悉多線程編程的讀者可能會需要關注同步代碼。這樣的方式不好,因為同步會影響程序的性能,Netty 的設計保證程序處理事件不會有同步。因為某個Channel事件是被添加到一個EventLoop中的,以后該Channel事件都是由該EventLoop來處理的,而EventLoop是一個線程來處理的,也就是說Netty不需要同步IO操作,EventLoop與EventLoopGroup的關系可以理解為線程與線程池的關系一樣。

單機版netty

服務端代碼

public class NettySever { public static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void main(String[] args) throws Exception{ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //獲取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解碼器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入編碼器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的業務處理handler pipeline.addLast(new MyHandler()); }} ) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture sync = serverBootstrap.bind(8888).sync(); ChannelFuture channelFuture = sync.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } 

客戶端代碼

public class NettyClient { public static void main(String[] args) throws Exception{ NioEventLoopGroup eventLoopGroup=new NioEventLoopGroup(); Bootstrap bootstrap =new Bootstrap(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相關handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定義的handler pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } }); ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync(); Channel channel = sync.channel(); Scanner sc =new Scanner(System.in); while (sc.hasNextLine()){ // System.out.println(sc.next()); channel.writeAndFlush(sc.nextLine()); } }finally { eventLoopGroup.shutdownGracefully(); } } } 

自定義handle

public class MyhandlerAdapter extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new MyHandler()); } } 
public class MyHandler extends SimpleChannelInboundHandler<String> { // 每一個服務端都可以維護注冊在自己上面的channel,當然有些需要自己去維護,比如上線的時候新增,下線刪除。也可以自定義一個。 public static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 給其他服務端說,某某某上線了 Channel channel = ctx.channel(); ChannelId id = channel.id(); channelGroup.add(channel); System.out.println(channel.remoteAddress()+"上線了"); channelGroup.forEach(c -> { if(channel==c){ c.writeAndFlush("我自己上線了"); }else { c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"上線了"); } }); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 給其他服務端說,某某某上線了 Channel channel = ctx.channel(); channelGroup.forEach(c -> { if(channel==c){ c.writeAndFlush("我自己下線了"); }else { c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"下線了"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); Channel channel = channelHandlerContext.channel(); channelGroup.forEach(c -> { if(channel==c){ c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"我自己說 " + s); }else { c.writeAndFlush(channel.remoteAddress()+"在"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"說 " + s); } }); } }

上面的代碼基本是就是一個單機版本的netty,但是如果並發量大,用戶多的情況下,單機肯定不滿足,那么如何做到分布式呢,分布式的難點又在哪呢?

分布式netty的解決思路

1 : 假如有兩台服務器,服務器A,服務器B,小王登錄到了服務器A上,小李登錄到了服務器B上,那么小王想跟小李聊天,怎么樣才行呢,我想大家最先想到的是每台服務器都可以維護注冊在自己服務器上的SorketChannel,想讓所有的服務器共享的話,怎么才能實現呢?
2:那么我們直接用第三方的redis,可以實現數據共享,但是SorketChannel是不能被序列化的,所以這種借助第三方的的是不可行的。
3 :借助路由的形式,比如小王發消息給小李,我們可以直接給小李所在的服務器B發消息,這樣消息肯定能送達,但是小王怎么知道小李在哪一台服務器上呢?這個時候其實在登錄的時候我們會去選擇一台服務器,我們可以在redis里面存儲起來,那么不論誰給誰發消息都是可以實現的

// 用戶登錄的時候保存用戶和自己被分配的NioSocketChannel,后面可以根據用戶id查到相關的NioSocketChannel,發送消息 if (msg.getType() == Constants.CommandType.LOGIN) { //保存客戶端與 Channel 之間的關系 SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel()); SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg()); LOGGER.info("客戶端[{}]上線成功", msg.getReqMsg()); }
 @Override public void saveRouteInfo(LoginReqVO loginReqVO, String msg) throws Exception { // key是一個固定前綴加上用戶id,msg就是登錄服務器的地址 String key = ROUTE_PREFIX + loginReqVO.getUserId(); redisTemplate.opsForValue().set(key, msg); }

講到這里你應該有些思路了吧,但是真正的想要做一個im及時通訊,還有好多地方要考慮,服務器A突然掛掉,那么小王怎么辦,要是小王自己沒網了,那么服務器A還要保存小王的NioSocketChannel嗎?
當然真正的應該是服務器A掛了,應該讓小王重新注冊到B上,如果小王自己網絡不好,應該先重連,重連到給定的最大次數,應該就踢掉小王,避免不必要的資源浪費。

當然這個可以用心跳機制去完成。netty本身就有一個IdleStateHandler,我們可以ping-pong去看服務器和客戶端是否正常連接。但是什么時候去ping,不可能我們正常聊天的時候去觸發這個事件吧,我們可以在寫空閑的時候去發送ping,客戶端如果不是正常退出的話會觸發

@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (shutDownMsg == null){ shutDownMsg = SpringBeanFactory.getBean(ShutDownMsg.class) ; } //用戶主動退出,不執行重連邏輯 if (shutDownMsg.checkStatus()){ return; } if (scheduledExecutorService == null){ scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ; } LOGGER.info("客戶端斷開了,重新連接!"); // 開啟一個線程去重連,重連 scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;

重連就是先看自己是不是好了,還沒好的話就下線,清楚自己的路由,重新選擇一個服務器(這個要可以自己實現算法,hash,隨機,權重)

 public void reconnect() throws Exception { if (channel != null && channel.isActive()) { return; } //首先清除路由信息,下線 routeRequest.offLine(); LOGGER.info("reconnect...."); start(); LOGGER.info("reconnect success"); }

轉自https://blog.csdn.net/wzx7612302/article/details/104794550


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM