scala實現Netty通信


在學習spark源碼的時候看到spark在1.6之后底層的通信框架變成了akka和netty兩種方式,默認的是用netty根據源碼的思路用scala寫了一個Demo級別的netty通信

package com.spark.netty
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder}

/**
  * Created by root on 2016/11/18.
  */
class NettyServer {
  def bind(host: String, port: Int): Unit = {
    //配置服務端線程池組
    //用於服務器接收客戶端連接
    val bossGroup = new NioEventLoopGroup()
    //用戶進行SocketChannel的網絡讀寫
    val workerGroup = new NioEventLoopGroup()

    try {
      //是Netty用戶啟動NIO服務端的輔助啟動類,降低服務端的開發復雜度
      val bootstrap = new ServerBootstrap()
      //將兩個NIO線程組作為參數傳入到ServerBootstrap
      bootstrap.group(bossGroup, workerGroup)
        //創建NioServerSocketChannel
        .channel(classOf[NioServerSocketChannel])
        //綁定I/O事件處理類
        .childHandler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(
            //            new ObjectEncoder,
            //            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
            new ServerHandler
          )
        }
      })
      //綁定端口,調用sync方法等待綁定操作完成
      val channelFuture = bootstrap.bind(host, port).sync()
      //等待服務關閉
      channelFuture.channel().closeFuture().sync()
    } finally {
      //優雅的退出,釋放線程池資源
      bossGroup.shutdownGracefully()
      workerGroup.shutdownGracefully()
    }
  }
}

object NettyServer {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val server = new NettyServer
    server.bind(host, port)
  }
}
package com.spark.netty

import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel}
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}

/**
  * Created by root on 2016/11/18.
  */

class NettyClient {
  def connect(host: String, port: Int): Unit = {
    //創建客戶端NIO線程組
    val eventGroup = new NioEventLoopGroup
    //創建客戶端輔助啟動類
    val bootstrap = new Bootstrap
    try {
      //將NIO線程組傳入到Bootstrap
      bootstrap.group(eventGroup)
        //創建NioSocketChannel
        .channel(classOf[NioSocketChannel])
        //綁定I/O事件處理類
        .handler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(
//            new ObjectEncoder,
//            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
            new ClientHandler
          )
        }
      })
      //發起異步連接操作
      val channelFuture = bootstrap.connect(host, port).sync()
      //等待服務關閉
      channelFuture.channel().closeFuture().sync()
    } finally {
      //優雅的退出,釋放線程池資源
      eventGroup.shutdownGracefully()
    }
  }
}

object NettyClient {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val client = new NettyClient
    client.connect(host, port)
  }
}
package com.spark.netty

import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}

/**
  * Created by root on 2016/11/18.
  */
class ServerHandler extends ChannelInboundHandlerAdapter {
  /**
    * 有客戶端建立連接后調用
    */
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("channelActive invoked")
  }

  /**
    * 接受客戶端發送來的消息
    */
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    println("channelRead invoked")
    val byteBuf = msg.asInstanceOf[ByteBuf]
    val bytes = new Array[Byte](byteBuf.readableBytes())
    byteBuf.readBytes(bytes)
    val message = new String(bytes, "UTF-8")
    println(message)
    val back = "good boy!"
    val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
    println(msg)
    ctx.write(resp)
  }

  /**
    * 將消息對列中的數據寫入到SocketChanne並發送給對方
    */
  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
    println("channekReadComplete invoked")
    ctx.flush()
  }


}
package com.spark.netty

import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter}

/**
  * Created by root on 2016/11/18.
  */
class ClientHandler extends ChannelInboundHandlerAdapter {
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("channelActive")
    val content = "hello server"
    ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8")))
    //發送case class 不在發送字符串了,封裝一個字符串
    //    ctx.writeAndFlush(RegisterMsg("hello server"))
  }

  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    println("channelRead")
    val byteBuf = msg.asInstanceOf[ByteBuf]
    val bytes = new Array[Byte](byteBuf.readableBytes())
    byteBuf.readBytes(bytes)
    val message = new String(bytes, "UTF-8")
    println(message)
  }

  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
    println("channeReadComplete")
    ctx.flush()
  }
//發送異常時關閉
  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
    println("exceptionCaught")
    ctx.close()
  }

}
package com.spark.netty

/**
  * Created by root on 2016/11/18.
  */
case class RegisterMsg(content: String) extends Serializable

先啟動NettyServer,然后在啟動NettyClient.打印結果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM