服務端:
package com.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * netty5版本服務端 */ public class Server { public static void main(String[] args) { //服務類 ServerBootstrap bootstrap = new ServerBootstrap(); //boss和worker, netty5不是線程池,而是事件循環組,里面包含線程池。 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //設置線程池 bootstrap.group(boss, worker);//boss用來監聽端口的 //設置socket工廠、 bootstrap.channel(NioServerSocketChannel.class); //設置管道工廠 bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); //netty3中對應設置如下 //bootstrap.setOption("backlog", 1024); //bootstrap.setOption("tcpNoDelay", true); //bootstrap.setOption("keepAlive", true); //設置參數,TCP參數 bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的設置,鏈接緩沖池的大小。tcp的服務端是有隊列的。隊列保存2048個客戶端。2048后面的連接是拒絕的。 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的設置,維持鏈接的活躍,清除死鏈接 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的設置,關閉延遲發送。發一包並不是馬上發出去,而是積累到一定之后再發出去。 //綁定端口 ChannelFuture future = bootstrap.bind(10101); System.out.println("start"); //等待服務端關閉 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //釋放資源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
package com.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 服務端消息處理 */ public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); ctx.channel().writeAndFlush("hi"); ctx.writeAndFlush("hi"); } /** * 新客戶端接入 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } /** * 客戶端斷開 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } /** * 異常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
客戶端:
package com.client; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * netty5版本的客戶端 */ public class Client { public static void main(String[] args) { //服務類 Bootstrap bootstrap = new Bootstrap(); //worker EventLoopGroup worker = new NioEventLoopGroup();//boss用來監聽端口,這里只創建worker try { //設置線程池 bootstrap.group(worker); //設置socket工廠、 bootstrap.channel(NioSocketChannel.class); //設置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("請輸入:"); String msg = bufferedReader.readLine(); connect.channel().writeAndFlush(msg); } } catch (Exception e) { e.printStackTrace(); } finally{ worker.shutdownGracefully(); } } }
package com.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 客戶端消息處理 */ public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("客戶端收到消息:"+msg); } }
一個客戶端啟動多個連接:
package com.client; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** 多連接客戶端,客戶端保持一個連接不夠,要保持多個連接。 線程池是有多個線程,每個線程里面有一個任務隊列,線程run的時候會從任務隊列取一個任務出來,執行任務的run方法, 隊列里面沒有任務就阻塞等待新的任務進來。 一個thread + 隊列 == 一個單線程線程池 =====> 線程安全的,任務是線性串行執行的 對象池:首先初始化n個對象,把這些對象放入一個隊列里面,需要對象的時候會出棧一個對象,有對象就出棧,使用完了歸還對象池里面。 沒有對象會阻塞等待有可用的對象。或者創建一個新的對象使用完之后歸還線程池,歸還的時候如果池子滿了就銷毀。 比如數據庫連接池:使用完后要釋放資源,就是把連接放回連接池里面。 對象組:首先初始化n個對象,把這些對象放入一個數組里面。使用的時候獲取一個對象不移除,使用完之后不用歸還。需要對象有並發的能力。 對象組:線程安全,不會產生阻塞效應 對象池:線程不安全,會產生阻塞效應 */ public class MultClient { /** * 服務類 */ private Bootstrap bootstrap = new Bootstrap(); /** * 會話,多個channel, */ private List<Channel> channels = new ArrayList<>(); /** * 引用計數 */ private final AtomicInteger index = new AtomicInteger(); /** * 初始化 * @param count */ public void init(int count){ //worker EventLoopGroup worker = new NioEventLoopGroup(); //設置線程池 bootstrap.group(worker); //設置socket工廠、 bootstrap.channel(NioSocketChannel.class); //設置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); for(int i=1; i<=count; i++){ ChannelFuture future = bootstrap.connect("192.168.0.103", 10101); channels.add(future.channel()); } } /** * 獲取會話 */ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count){ Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重連 reconnect(channel); if(count >= channels.size()){ throw new RuntimeException("no can use channel"); } return getFirstActiveChannel(count + 1); } return channel; } /** * 重連 * @param channel */ private void reconnect(Channel channel){ synchronized(channel){ if(channels.indexOf(channel) == -1){//已經重連過了 return ; } Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel(); channels.set(channels.indexOf(channel), newChannel); } } }
package com.client; import java.io.BufferedReader; import java.io.InputStreamReader; /** * 啟動類 */ public class Start { public static void main(String[] args) { MultClient client = new MultClient(); client.init(5);//初始化5個連接 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ try { System.out.println("請輸入:"); String msg = bufferedReader.readLine(); client.nextChannel().writeAndFlush(msg); } catch (Exception e) { e.printStackTrace(); } } } }