現在大多數項目都是基於spring boot進行開發,所以我們以spring boot作為開發框架來使用netty。使用spring boot的一個好處就是能給將netty的業務拆分出來,並通過spring cloud整合到項目中。
我們以一個簡單的客戶端發送消息到服務的場景編寫一個實例。
一、服務端模塊
netty中服務端一般分為兩個類,一個是啟動配置類,另一個是消息的邏輯處理類,但是首先我們要配置spring boot的啟動類,啟動netty
@SpringBootApplication public class DemoApplication implements CommandLineRunner { @Autowired NettyServer nettyServer; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Override public void run(String... args) throws Exception { nettyServer.startServer(); } }
1.啟動配置類
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; /** * Netty * 服務端 */ @Configuration public class NettyServer { //四個處理請求的邏輯類 @Autowired ServerInboundHandler serverInboundHandler; @Autowired ServerInboundGetTimeHandler serverInboundGetTimeHandler; @Autowired ServerLastOutboundHandler serverLastOutboundHandler; @Autowired ServerOutboundHandler serverOutboundHandler; public void startServer() { System.out.println("服務端啟動成功"); //創建兩個線程組,用於接收客戶端的請求任務,創建兩個線程組是因為netty采用的是反應器設計模式 //反應器設計模式中bossGroup線程組用於接收 EventLoopGroup bossGroup = new NioEventLoopGroup(); //workerGroup線程組用於處理任務 EventLoopGroup workerGroup = new NioEventLoopGroup(); //創建netty的啟動類 ServerBootstrap bootstrap = new ServerBootstrap(); //創建一個通道 ChannelFuture f = null; try { bootstrap.group(bossGroup, workerGroup) //設置線程組 .channel(NioServerSocketChannel.class) //設置通道為非阻塞IO .option(ChannelOption.SO_BACKLOG, 128) //設置日志 .option(ChannelOption.SO_RCVBUF, 32 * 1024) //接收緩存 .childOption(ChannelOption.SO_KEEPALIVE, true)//是否保持連接 .childHandler(new ChannelInitializer<SocketChannel>() { //設置處理請求的邏輯處理類 @Override protected void initChannel(SocketChannel ch) throws Exception { //ChannelPipeline是handler的任務組,里面有多個handler ChannelPipeline pipeline = ch.pipeline(); //邏輯處理類 pipeline.addLast(serverLastOutboundHandler); pipeline.addLast(serverOutboundHandler); pipeline.addLast(serverInboundHandler); pipeline.addLast(serverInboundGetTimeHandler); } }); f = bootstrap.bind(84).sync();//阻塞端口號,以及同步策略 f.channel().closeFuture().sync();//關閉通道 } catch (InterruptedException e) { e.printStackTrace(); } finally { //優雅退出 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
2.啟動配置類中的各個組件
1)EventLoop 與 EventLoopGroup
EventLoop 好比一個線程,1個EventLoop 可以服務多個channel,而一個channel只會有一個EventLoop 。EventLoop 在netty中就是負責整個IO操作,包括從消息的讀取、編碼以及后續 ChannelHandler 的執行,這樣做的好處就是避免了線程中的上下文切換時,大量浪費資源情況。
EventLoopGroup 是負責分配EventLoop到新創建的channel,EventLoopGroup 就好比線程池,它里面包含多個EventLoop。
2)BootStrap
BootStrap 是netty中的引導啟動類也就是一個工廠配置類,可以通過它來完成 Netty 的客戶端或服務器端的 Netty 初始化,所以我們主要來看它的幾個常用的配置方法。
① gruop() 方法
gruop()方法用於配置netty中的線程組,也就是我們的EventLoopGroup ,在服務端中需要配置兩個線程組,這是因為netty中采用的是反應器設計模式(reactor ),我們知道反應器設計模式中是需要兩個線程組,一個用於接收用戶的請求,另一個用於處理請求的內容。
② channel() 方法
channel()方法用於配置通道的IO類型,IO類型有兩個:阻塞IO(BIO)OioServerSocketChannel;非阻塞IO(NIO)NioServerSocketChannel。
③ childHandler () 方法
用於設置處理請求的適配器,這個在下面詳細介紹。
④ childOption() 方法
給每條child channel連接設置一些TCP底層相關的屬性,比如上面,我們設置了兩種TCP屬性,其中 ChannelOption.SO_KEEPALIVE表示是否開啟TCP底層心跳機制,true為開
⑤ option
給每條parent channel 連接設置一些TCP底層相關的屬性。
關於option的屬性有:
SO_RCVBUF ,SO_SNDBUF:用於設置TCP連接中使用的兩個緩存區。
TCP_NODELAY:立即發送數據,采用的是Nagle算法。Nagle算法是當小數據過多時,就會將這些小數據碎片連接成更大的報文,從而保證發送的報文數量最小。所以如果數據量小就要禁用這個算法,netty默認是禁用的值為true。
通俗地說,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網絡交互,就開啟。
SO_KEEPALIVE:底層TCP協議的心跳機制。Socket參數,連接保活,默認值為False。啟用該功能時,TCP會主動探測空閑連接的有效性。
SO_REUSEADDR:Socket參數,地址復用,默認值False
SO_LINGER:Socket參數,關閉Socket的延遲時間,默認值為-1,表示禁用該功能。
SO_BACKLOG:Socket參數,服務端接受連接的隊列長度,如果隊列已滿,客戶端連接將被拒絕。默認值,Windows為200,其他為128。
SO_BROADCAST:Socket參數,設置廣播模式。
3)ChannelFuture
我們知道netty中的所有IO操作都是異步的,這意味着任何IO調用都會立即返回,不管結果如果狀態如果。而ChannelFuture 的存在就是為了解決這一問題,它會提供IO操作中有關的信息、結果或狀態。
ChannelFuture 一共有兩個狀態:
未完成狀態:當IO操作開始時,將創建一個新的ChannelFuture 對象,此時這個對象既沒有操作成功也沒有失敗,那么就說這個對象就是未完成的狀態。簡單來說未完成指創建了對象且沒有完成IO操作。
已完成狀態:當IO操作完成后,不管操作是成功還是失敗,future都是標記已完成的,失敗時也會有對應的具體失敗信息。
3.消息邏輯處理類
可以看到我一共在pipeline里面配置了4個handler,這是為了查看inboundhandler和outboundhandler的數據傳遞方式,以及每個handler的執行順序
ServerInboundGetTimeHandler:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.springframework.context.annotation.Configuration; import java.text.SimpleDateFormat; import java.util.Date; /** * Inbound處理類 * 給客戶端返回一個時間戳 */ @Configuration public class ServerInboundGetTimeHandler extends ChannelInboundHandlerAdapter { /** * 獲取客戶端的內容類 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將傳遞過來的內容轉換為ByteBuf對象 ByteBuf buf = (ByteBuf) msg; //和文件IO一樣,用一個字節數組讀數據 byte[] reg = new byte[buf.readableBytes()]; buf.readBytes(reg); //將讀取的數據轉換為字符串 String body = new String(reg, "UTF-8"); //給客戶端傳遞的內容,同樣也要轉換成ByteBuf對象 Date dNow = new Date( ); SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd hh:mm:ss"); String respMsg = body+ft.format(dNow); System.out.println("服務器當前時間是:"+ft.format(dNow)); ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes()); //調用write方法,通知並將數據傳給outboundHand ctx.write(respByteBuf); } /** * 刷新后才將數據發出到SocketChannel * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * 關閉 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ServerInboundHandler:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.springframework.context.annotation.Configuration; /** * Inbound處理類,是用來處理客戶端發送過來的信息 * Sharable 所有通道都能使用的handler */ @Configuration @ChannelHandler.Sharable public class ServerInboundHandler extends ChannelInboundHandlerAdapter { /** * 獲取客戶端的內容類 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將傳遞過來的內容轉換為ByteBuf對象 ByteBuf buf = (ByteBuf) msg; //和文件IO一樣,用一個字節數組讀數據 byte[] reg = new byte[buf.readableBytes()]; buf.readBytes(reg); //將讀取的數據轉換為字符串 String body = new String(reg, "UTF-8"); System.out.println( "服務端接收的信息是: " + body); //給客戶端傳遞的內容,同樣也要轉換成ByteBuf對象 String respMsg = "你好我是服務端,當前時間是:"; ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes()); //調用fireChannelRead方法,通知並將數據傳給下一個handler ctx.fireChannelRead(respByteBuf); } /** * 刷新后才將數據發出到SocketChannel * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * 關閉 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ServerLastOutboundHandler:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import org.springframework.context.annotation.Configuration; /** * Outbound表示服務器發送的handler */ @Configuration public class ServerLastOutboundHandler extends ChannelOutboundHandlerAdapter { /** * 服務端要傳遞消息的方法 * @param ctx * @param msg * @param promise * @throws Exception */ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //將傳遞過來的內容轉換為ByteBuf對象 ByteBuf buf = (ByteBuf) msg; //和文件IO一樣,用一個字節數組讀數據 byte[] reg = new byte[buf.readableBytes()]; buf.readBytes(reg); String body=new String(reg,"UTF-8"); String respMsg = body+"\n1.吃飯 2.睡覺"; System.out.println("服務端要發送的消息是:\n"+respMsg); ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes()); ctx.write(respByteBuf); ctx.flush(); //ctx.write()方法執行后,需要調用flush()方法才能令它立即執行 } }
ServerOutboundHandler:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import org.springframework.context.annotation.Configuration; /** * Outbound表示服務器發送的handler */ @Configuration public class ServerOutboundHandler extends ChannelOutboundHandlerAdapter{ /** * 服務端要傳遞消息的方法 * @param ctx * @param msg * @param promise * @throws Exception */ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //將傳遞過來的內容轉換為ByteBuf對象 ByteBuf buf = (ByteBuf) msg; //和文件IO一樣,用一個字節數組讀數據 byte[] reg = new byte[buf.readableBytes()]; buf.readBytes(reg); String body=new String(reg,"UTF-8"); System.out.println("serverOutbound的內容:\n"+body); String respMsg = body+"\n請問你需要操作什么任務"; ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes()); ctx.write(respByteBuf); ctx.flush(); //ctx.write()方法執行后,需要調用flush()方法才能令它立即執行 } }
4.channelHandler中的各個組件
1)channel
channel的本質就是一個socket連接,是服務端與客戶端連接的通道。channel除了連接客戶端與服務端外,還能監控通道的狀態,如:什么時候傳輸、傳輸完成情況都能監控到。
channel的一個有四個狀態:
channelReistered:channel注冊到一個EventLoop,此時為注冊狀態
channelUnregistered:channel已經創建好了還未進行注冊,此時為未注冊狀態
channelActive:客戶端與服務端連接后,channel會變為活躍狀態,此時可以接收和發送數據
channelInactive:非活躍狀態,沒有連接遠程主機的時候。
channel的生命周期狀態變化大致如圖:
2)channelHandler
channelHandler就是我們處理數據邏輯的地方,它一共分為兩大類:InboundHandler和嘔Outboundhandler。InboundHandler用於處理輸入的數據和改變channel狀態類型,OutboundHandler用於回寫給外界的數據。
channelHandler的執行順序:
InboundHandler:順序執行
OutboundHandler:逆序執行
在channelHandler的執行過程中,InboundHandler會覆蓋后面的OutboundHandler,所以在開發中應該先執行OutboundHandler再執行InboundHandler
3)channelPipeline
管理channelHandler的有序容器,它里面可以有多個channelHandler。
channel、channelHandler、channelPipeline三者的關系:
一個channel有一個容器channelPipeline,容器中有多個channelHandler。創建channel時會自動創建一個channelPipeline,每個channel都有一個管理它的channelPipeline,這個關聯是永久的。
二、客戶端代碼
netty中客戶端的各個組件都是和服務端一樣的,所以不用再介紹客戶端的組件
1.配置類代碼
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; /** * netty 客戶端類 */ @Configuration public class NettyClient { public static void main(String[] args) { //客戶端只需要創建一個線程就足夠了 EventLoopGroup group = new NioEventLoopGroup(); try { //客戶端啟動類 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)//設置線程組 .channel(NioSocketChannel.class)//設置通道類型 .remoteAddress(new InetSocketAddress("127.0.0.1", 84))//設置IP和端口 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); //阻塞通道 ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { } finally { group.shutdownGracefully(); } } }
2.邏輯處理類代碼
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * 客戶端邏輯處理類 */ public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 發送給服務器消息的方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是客戶端", CharsetUtil.UTF_8)); } /** * 回調方法,接收服務器發送的消息 * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println( msg.toString(CharsetUtil.UTF_8)); } /** * 在處理過程中引發異常時被調用 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
測試結果,先啟動服務端:
然后啟動客戶端:
最后再來看看服務端: