Netty學習第四章 spring boot整合netty的使用


  現在大多數項目都是基於spring boot進行開發,所以我們以spring boot作為開發框架來使用netty。使用spring boot的一個好處就是能給將netty的業務拆分出來,並通過spring cloud整合到項目中。

  我們以一個簡單的客戶端發送消息到服務的場景編寫一個實例。

一、服務端模塊

  netty中服務端一般分為兩個類,一個是啟動配置類,另一個是消息的邏輯處理類,但是首先我們要配置spring boot的啟動類,啟動netty

  

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    @Autowired
    NettyServer nettyServer;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        nettyServer.startServer();
    }
}

 

 

 

  1.啟動配置類

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

/**
 * Netty
 * 服務端
 */
@Configuration
public class NettyServer {

    //四個處理請求的邏輯類
    @Autowired
    ServerInboundHandler serverInboundHandler;

    @Autowired
    ServerInboundGetTimeHandler serverInboundGetTimeHandler;

    @Autowired
    ServerLastOutboundHandler serverLastOutboundHandler;

    @Autowired
    ServerOutboundHandler serverOutboundHandler;

    public void startServer() {
        System.out.println("服務端啟動成功");
        //創建兩個線程組,用於接收客戶端的請求任務,創建兩個線程組是因為netty采用的是反應器設計模式
        //反應器設計模式中bossGroup線程組用於接收
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workerGroup線程組用於處理任務
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //創建netty的啟動類
        ServerBootstrap bootstrap = new ServerBootstrap();
        //創建一個通道
        ChannelFuture f = null;
        try {
            bootstrap.group(bossGroup, workerGroup) //設置線程組
                    .channel(NioServerSocketChannel.class) //設置通道為非阻塞IO
                    .option(ChannelOption.SO_BACKLOG, 128) //設置日志
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024)  //接收緩存
                    .childOption(ChannelOption.SO_KEEPALIVE, true)//是否保持連接
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //設置處理請求的邏輯處理類
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //ChannelPipeline是handler的任務組,里面有多個handler
                            ChannelPipeline pipeline = ch.pipeline();
                            //邏輯處理類
                            pipeline.addLast(serverLastOutboundHandler);
                            pipeline.addLast(serverOutboundHandler);
                            pipeline.addLast(serverInboundHandler);
                            pipeline.addLast(serverInboundGetTimeHandler);
                        }
                    });

            f = bootstrap.bind(84).sync();//阻塞端口號,以及同步策略
            f.channel().closeFuture().sync();//關閉通道
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //優雅退出
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }


}

  2.啟動配置類中的各個組件

  1)EventLoop 與 EventLoopGroup 

  EventLoop 好比一個線程,1個EventLoop 可以服務多個channel,而一個channel只會有一個EventLoop 。EventLoop 在netty中就是負責整個IO操作,包括從消息的讀取、編碼以及后續 ChannelHandler 的執行,這樣做的好處就是避免了線程中的上下文切換時,大量浪費資源情況。

  EventLoopGroup 是負責分配EventLoop到新創建的channel,EventLoopGroup 就好比線程池,它里面包含多個EventLoop。

  2)BootStrap

  BootStrap 是netty中的引導啟動類也就是一個工廠配置類,可以通過它來完成 Netty 的客戶端或服務器端的 Netty 初始化,所以我們主要來看它的幾個常用的配置方法。

  ① gruop() 方法

  gruop()方法用於配置netty中的線程組,也就是我們的EventLoopGroup ,在服務端中需要配置兩個線程組,這是因為netty中采用的是反應器設計模式(reactor ),我們知道反應器設計模式中是需要兩個線程組,一個用於接收用戶的請求,另一個用於處理請求的內容。

  ② channel() 方法

  channel()方法用於配置通道的IO類型,IO類型有兩個:阻塞IO(BIO)OioServerSocketChannel;非阻塞IO(NIO)NioServerSocketChannel。

  ③ childHandler () 方法

  用於設置處理請求的適配器,這個在下面詳細介紹。

  ④ childOption() 方法

  給每條child channel連接設置一些TCP底層相關的屬性,比如上面,我們設置了兩種TCP屬性,其中 ChannelOption.SO_KEEPALIVE表示是否開啟TCP底層心跳機制,true為開

  ⑤ option

  給每條parent channel 連接設置一些TCP底層相關的屬性。

  關於option的屬性有:

  SO_RCVBUF ,SO_SNDBUF:用於設置TCP連接中使用的兩個緩存區。

  TCP_NODELAY:立即發送數據,采用的是Nagle算法。Nagle算法是當小數據過多時,就會將這些小數據碎片連接成更大的報文,從而保證發送的報文數量最小。所以如果數據量小就要禁用這個算法,netty默認是禁用的值為true。

  通俗地說,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網絡交互,就開啟。

  SO_KEEPALIVE:底層TCP協議的心跳機制。Socket參數,連接保活,默認值為False。啟用該功能時,TCP會主動探測空閑連接的有效性。

  SO_REUSEADDR:Socket參數,地址復用,默認值False

  SO_LINGER:Socket參數,關閉Socket的延遲時間,默認值為-1,表示禁用該功能。

  SO_BACKLOG:Socket參數,服務端接受連接的隊列長度,如果隊列已滿,客戶端連接將被拒絕。默認值,Windows為200,其他為128。

  SO_BROADCAST:Socket參數,設置廣播模式。

  3)ChannelFuture 

  我們知道netty中的所有IO操作都是異步的,這意味着任何IO調用都會立即返回,不管結果如果狀態如果。而ChannelFuture 的存在就是為了解決這一問題,它會提供IO操作中有關的信息、結果或狀態。

  ChannelFuture 一共有兩個狀態:

  未完成狀態:當IO操作開始時,將創建一個新的ChannelFuture 對象,此時這個對象既沒有操作成功也沒有失敗,那么就說這個對象就是未完成的狀態。簡單來說未完成指創建了對象且沒有完成IO操作。

  已完成狀態:當IO操作完成后,不管操作是成功還是失敗,future都是標記已完成的,失敗時也會有對應的具體失敗信息。

3.消息邏輯處理類  

  可以看到我一共在pipeline里面配置了4個handler,這是為了查看inboundhandler和outboundhandler的數據傳遞方式,以及每個handler的執行順序

   ServerInboundGetTimeHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.context.annotation.Configuration;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Inbound處理類
 * 給客戶端返回一個時間戳
 */
@Configuration
public class ServerInboundGetTimeHandler  extends ChannelInboundHandlerAdapter {


    /**
     * 獲取客戶端的內容類
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //將傳遞過來的內容轉換為ByteBuf對象
        ByteBuf buf = (ByteBuf) msg;
        //和文件IO一樣,用一個字節數組讀數據
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);
        //將讀取的數據轉換為字符串
        String body = new String(reg, "UTF-8");
        //給客戶端傳遞的內容,同樣也要轉換成ByteBuf對象
        Date dNow = new Date( );
        SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd hh:mm:ss");
        String respMsg = body+ft.format(dNow);
        System.out.println("服務器當前時間是:"+ft.format(dNow));
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        //調用write方法,通知並將數據傳給outboundHand
        ctx.write(respByteBuf);

    }

    /**
     * 刷新后才將數據發出到SocketChannel
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * 關閉
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  ServerInboundHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.context.annotation.Configuration;

/**
 * Inbound處理類,是用來處理客戶端發送過來的信息
 * Sharable 所有通道都能使用的handler
 */
@Configuration
@ChannelHandler.Sharable
public class ServerInboundHandler extends ChannelInboundHandlerAdapter {

    /**
     * 獲取客戶端的內容類
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //將傳遞過來的內容轉換為ByteBuf對象
        ByteBuf buf = (ByteBuf) msg;
        //和文件IO一樣,用一個字節數組讀數據
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);
        //將讀取的數據轉換為字符串
        String body = new String(reg, "UTF-8");
        System.out.println( "服務端接收的信息是: " + body);
        //給客戶端傳遞的內容,同樣也要轉換成ByteBuf對象
        String respMsg = "你好我是服務端,當前時間是:";
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        //調用fireChannelRead方法,通知並將數據傳給下一個handler
        ctx.fireChannelRead(respByteBuf);

    }

    /**
     * 刷新后才將數據發出到SocketChannel
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * 關閉
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  ServerLastOutboundHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.springframework.context.annotation.Configuration;

/**
 * Outbound表示服務器發送的handler
 */
@Configuration
public class ServerLastOutboundHandler extends ChannelOutboundHandlerAdapter {

    /**
     * 服務端要傳遞消息的方法
     * @param ctx
     * @param msg
     * @param promise
     * @throws Exception
     */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        //將傳遞過來的內容轉換為ByteBuf對象
        ByteBuf buf = (ByteBuf) msg;
        //和文件IO一樣,用一個字節數組讀數據
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);
        String body=new String(reg,"UTF-8");
        String respMsg = body+"\n1.吃飯 2.睡覺";
        System.out.println("服務端要發送的消息是:\n"+respMsg);
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        ctx.write(respByteBuf);
        ctx.flush(); //ctx.write()方法執行后,需要調用flush()方法才能令它立即執行
    }
}

  ServerOutboundHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.springframework.context.annotation.Configuration;

/**
 * Outbound表示服務器發送的handler
 */
@Configuration
public class ServerOutboundHandler extends ChannelOutboundHandlerAdapter{

    /**
     * 服務端要傳遞消息的方法
     * @param ctx
     * @param msg
     * @param promise
     * @throws Exception
     */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        //將傳遞過來的內容轉換為ByteBuf對象
        ByteBuf buf = (ByteBuf) msg;
        //和文件IO一樣,用一個字節數組讀數據
        byte[] reg = new byte[buf.readableBytes()];
        buf.readBytes(reg);
        String body=new String(reg,"UTF-8");
        System.out.println("serverOutbound的內容:\n"+body);
        String respMsg = body+"\n請問你需要操作什么任務";
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        ctx.write(respByteBuf);
        ctx.flush(); //ctx.write()方法執行后,需要調用flush()方法才能令它立即執行
    }
}

4.channelHandler中的各個組件

  1)channel

  channel的本質就是一個socket連接,是服務端與客戶端連接的通道。channel除了連接客戶端與服務端外,還能監控通道的狀態,如:什么時候傳輸、傳輸完成情況都能監控到。

  channel的一個有四個狀態:

  channelReistered:channel注冊到一個EventLoop,此時為注冊狀態

  channelUnregistered:channel已經創建好了還未進行注冊,此時為未注冊狀態

  channelActive:客戶端與服務端連接后,channel會變為活躍狀態,此時可以接收和發送數據

  channelInactive:非活躍狀態,沒有連接遠程主機的時候。

  channel的生命周期狀態變化大致如圖:

  

 

 

 

   2)channelHandler

  channelHandler就是我們處理數據邏輯的地方,它一共分為兩大類:InboundHandler和嘔Outboundhandler。InboundHandler用於處理輸入的數據和改變channel狀態類型,OutboundHandler用於回寫給外界的數據。

  channelHandler的執行順序:

  InboundHandler:順序執行

  OutboundHandler:逆序執行

  在channelHandler的執行過程中,InboundHandler會覆蓋后面的OutboundHandler,所以在開發中應該先執行OutboundHandler再執行InboundHandler

  3)channelPipeline

  管理channelHandler的有序容器,它里面可以有多個channelHandler。

  channel、channelHandler、channelPipeline三者的關系:

  一個channel有一個容器channelPipeline,容器中有多個channelHandler。創建channel時會自動創建一個channelPipeline,每個channel都有一個管理它的channelPipeline,這個關聯是永久的。

二、客戶端代碼

  netty中客戶端的各個組件都是和服務端一樣的,所以不用再介紹客戶端的組件

  1.配置類代碼

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

/**
 * netty 客戶端類
 */
@Configuration
public class NettyClient {


    public static void main(String[] args) {
        //客戶端只需要創建一個線程就足夠了
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //客戶端啟動類
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)//設置線程組
                    .channel(NioSocketChannel.class)//設置通道類型
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 84))//設置IP和端口
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            //阻塞通道
            ChannelFuture channelFuture = bootstrap.connect().sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
            group.shutdownGracefully();
        }
    }


}

  2.邏輯處理類代碼

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 客戶端邏輯處理類
 */
public class ClientHandler  extends SimpleChannelInboundHandler<ByteBuf> {


    /**
     * 發送給服務器消息的方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是客戶端", CharsetUtil.UTF_8));
    }


    /**
     * 回調方法,接收服務器發送的消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println( msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在處理過程中引發異常時被調用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

測試結果,先啟動服務端:

 

 

 

 然后啟動客戶端:

 

 

 

 最后再來看看服務端:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM