在學習spark源碼的時候看到spark在1.6之后底層的通信框架變成了akka和netty兩種方式,默認的是用netty根據源碼的思路用scala寫了一個Demo級別的netty通信
package com.spark.netty import io.netty.bootstrap.ServerBootstrap import io.netty.channel.ChannelInitializer import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder} /** * Created by root on 2016/11/18. */ class NettyServer { def bind(host: String, port: Int): Unit = { //配置服務端線程池組 //用於服務器接收客戶端連接 val bossGroup = new NioEventLoopGroup() //用戶進行SocketChannel的網絡讀寫 val workerGroup = new NioEventLoopGroup() try { //是Netty用戶啟動NIO服務端的輔助啟動類,降低服務端的開發復雜度 val bootstrap = new ServerBootstrap() //將兩個NIO線程組作為參數傳入到ServerBootstrap bootstrap.group(bossGroup, workerGroup) //創建NioServerSocketChannel .channel(classOf[NioServerSocketChannel]) //綁定I/O事件處理類 .childHandler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline().addLast( // new ObjectEncoder, // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), new ServerHandler ) } }) //綁定端口,調用sync方法等待綁定操作完成 val channelFuture = bootstrap.bind(host, port).sync() //等待服務關閉 channelFuture.channel().closeFuture().sync() } finally { //優雅的退出,釋放線程池資源 bossGroup.shutdownGracefully() workerGroup.shutdownGracefully() } } } object NettyServer { def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val server = new NettyServer server.bind(host, port) } }
package com.spark.netty import io.netty.bootstrap.Bootstrap import io.netty.channel.ChannelInitializer import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel} import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder} /** * Created by root on 2016/11/18. */ class NettyClient { def connect(host: String, port: Int): Unit = { //創建客戶端NIO線程組 val eventGroup = new NioEventLoopGroup //創建客戶端輔助啟動類 val bootstrap = new Bootstrap try { //將NIO線程組傳入到Bootstrap bootstrap.group(eventGroup) //創建NioSocketChannel .channel(classOf[NioSocketChannel]) //綁定I/O事件處理類 .handler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline().addLast( // new ObjectEncoder, // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), new ClientHandler ) } }) //發起異步連接操作 val channelFuture = bootstrap.connect(host, port).sync() //等待服務關閉 channelFuture.channel().closeFuture().sync() } finally { //優雅的退出,釋放線程池資源 eventGroup.shutdownGracefully() } } } object NettyClient { def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val client = new NettyClient client.connect(host, port) } }
package com.spark.netty import io.netty.buffer.{Unpooled, ByteBuf} import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} /** * Created by root on 2016/11/18. */ class ServerHandler extends ChannelInboundHandlerAdapter { /** * 有客戶端建立連接后調用 */ override def channelActive(ctx: ChannelHandlerContext): Unit = { println("channelActive invoked") } /** * 接受客戶端發送來的消息 */ override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { println("channelRead invoked") val byteBuf = msg.asInstanceOf[ByteBuf] val bytes = new Array[Byte](byteBuf.readableBytes()) byteBuf.readBytes(bytes) val message = new String(bytes, "UTF-8") println(message) val back = "good boy!" val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8")) println(msg) ctx.write(resp) } /** * 將消息對列中的數據寫入到SocketChanne並發送給對方 */ override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { println("channekReadComplete invoked") ctx.flush() } }
package com.spark.netty import io.netty.buffer.{ByteBuf, Unpooled} import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter} /** * Created by root on 2016/11/18. */ class ClientHandler extends ChannelInboundHandlerAdapter { override def channelActive(ctx: ChannelHandlerContext): Unit = { println("channelActive") val content = "hello server" ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8"))) //發送case class 不在發送字符串了,封裝一個字符串 // ctx.writeAndFlush(RegisterMsg("hello server")) } override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { println("channelRead") val byteBuf = msg.asInstanceOf[ByteBuf] val bytes = new Array[Byte](byteBuf.readableBytes()) byteBuf.readBytes(bytes) val message = new String(bytes, "UTF-8") println(message) } override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { println("channeReadComplete") ctx.flush() } //發送異常時關閉 override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { println("exceptionCaught") ctx.close() } }
package com.spark.netty /** * Created by root on 2016/11/18. */ case class RegisterMsg(content: String) extends Serializable
先啟動NettyServer,然后在啟動NettyClient.打印結果


