實體對象:
import java.io.Serializable; public class TranslatorData implements Serializable { private static final long serialVersionUID = 8763561286199081881L; private String id; private String name; private String message; //傳輸消息體內容 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
import com.bfxy.codec.MarshallingCodeCFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class NettyServer { public NettyServer() { //1. 創建兩個工作線程組: 一個用於接受網絡請求的線程組. 另一個用於實際處理業務的線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); //2 輔助類 ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //表示緩存區動態調配(自適應) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) //緩存區 池化操作 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //向管道中添加攔截器 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); //綁定端口,同步等等請求連接 ChannelFuture cf = serverBootstrap.bind(8765).sync(); System.err.println("Server Startup..."); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //優雅停機 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); System.err.println("Sever ShutDown..."); } } }
import com.bfxy.disruptor.MessageProducer; import com.bfxy.disruptor.RingBufferWorkerPoolFactory; import com.bfxy.entity.TranslatorData; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /** TranslatorData request = (TranslatorData)msg; System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage()); //數據庫持久化操作 IO讀寫 ---> 交給一個線程池 去異步的調用執行 TranslatorData response = new TranslatorData(); response.setId("resp: " + request.getId()); response.setName("resp: " + request.getName()); response.setMessage("resp: " + request.getMessage()); //寫出response響應信息: ctx.writeAndFlush(response); */ TranslatorData request = (TranslatorData)msg; //自已的應用服務應該有一個ID生成規則 String producerId = "code:sessionId:001"; MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId); messageProducer.onData(request, ctx); } }
import com.bfxy.disruptor.MessageConsumer; import com.bfxy.entity.TranslatorData; import com.bfxy.entity.TranslatorDataWapper; import io.netty.channel.ChannelHandlerContext; public class MessageConsumerImpl4Server extends MessageConsumer { public MessageConsumerImpl4Server(String consumerId) { super(consumerId); } public void onEvent(TranslatorDataWapper event) throws Exception { TranslatorData request = event.getData(); ChannelHandlerContext ctx = event.getCtx(); //1.業務處理邏輯: System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage()); //2.回送響應信息: TranslatorData response = new TranslatorData(); response.setId("resp: " + request.getId()); response.setName("resp: " + request.getName()); response.setMessage("resp: " + request.getMessage()); //寫出response響應信息: ctx.writeAndFlush(response); } }
客戶端:
import com.bfxy.codec.MarshallingCodeCFactory; import com.bfxy.entity.TranslatorData; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class NettyClient { public static final String HOST = "127.0.0.1"; public static final int PORT = 8765; //擴展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel> private Channel channel; //1. 創建工作線程組: 用於實際處理業務的線程組 private EventLoopGroup workGroup = new NioEventLoopGroup(); private ChannelFuture cf; public NettyClient() { this.connect(HOST, PORT); } private void connect(String host, int port) { //2 輔助類(注意Client 和 Server 不一樣) Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(workGroup) .channel(NioSocketChannel.class) //表示緩存區動態調配(自適應) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) //緩存區 池化操作 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); //綁定端口,同步等等請求連接 this.cf = bootstrap.connect(host, port).sync(); System.err.println("Client connected..."); //接下來就進行數據的發送, 但是首先我們要獲取channel: this.channel = cf.channel(); } catch (InterruptedException e) { e.printStackTrace(); } } //發送數據 public void sendData(){ for(int i =0; i <10; i++){ TranslatorData request = new TranslatorData(); request.setId("" + i); request.setName("請求消息名稱 " + i); request.setMessage("請求消息內容 " + i); this.channel.writeAndFlush(request); } } public void close() throws Exception { cf.channel().closeFuture().sync(); //優雅停機 workGroup.shutdownGracefully(); System.err.println("Sever ShutDown..."); } }
vimport com.bfxy.disruptor.MessageProducer; import com.bfxy.disruptor.RingBufferWorkerPoolFactory; import com.bfxy.entity.TranslatorData; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /** try { TranslatorData response = (TranslatorData)msg; System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage()); } finally { //一定要注意 用完了緩存 要進行釋放 ReferenceCountUtil.release(msg); } */ TranslatorData response = (TranslatorData)msg; String producerId = "code:seesionId:002"; MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId); messageProducer.onData(response, ctx); } }
import com.bfxy.disruptor.MessageConsumer; import com.bfxy.entity.TranslatorData; import com.bfxy.entity.TranslatorDataWapper; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class MessageConsumerImpl4Client extends MessageConsumer { public MessageConsumerImpl4Client(String consumerId) { super(consumerId); } public void onEvent(TranslatorDataWapper event) throws Exception { TranslatorData response = event.getData(); ChannelHandlerContext ctx = event.getCtx(); //業務邏輯處理: try { System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage()); } finally { ReferenceCountUtil.release(response); } } }
工廠類的封裝:
import java.io.Serializable; public class TranslatorData implements Serializable { private static final long serialVersionUID = 8763561286199081881L; private String id; private String name; private String message; //傳輸消息體內容 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
import io.netty.channel.ChannelHandlerContext; //dis內部需要傳輸的對象 public class TranslatorDataWapper { //實際的數據 private TranslatorData data; //ctx對象 private ChannelHandlerContext ctx; public TranslatorData getData() { return data; } public void setData(TranslatorData data) { this.data = data; } public ChannelHandlerContext getCtx() { return ctx; } public void setCtx(ChannelHandlerContext ctx) { this.ctx = ctx; } }
import com.bfxy.entity.TranslatorDataWapper; import com.lmax.disruptor.WorkHandler; //抽象的讓子類實現 public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> { protected String consumerId; public MessageConsumer(String consumerId) { this.consumerId = consumerId; } public String getConsumerId() { return consumerId; } public void setConsumerId(String consumerId) { this.consumerId = consumerId; } }
import com.bfxy.entity.TranslatorData; import com.bfxy.entity.TranslatorDataWapper; import com.lmax.disruptor.RingBuffer; import io.netty.channel.ChannelHandlerContext; //生產者 public class MessageProducer { private String producerId; private RingBuffer<TranslatorDataWapper> ringBuffer; public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) { this.producerId = producerId; this.ringBuffer = ringBuffer; } //發送實際的對象和ctx public void onData(TranslatorData data, ChannelHandlerContext ctx) { long sequence = ringBuffer.next(); try { TranslatorDataWapper wapper = ringBuffer.get(sequence); wapper.setData(data); wapper.setCtx(ctx); } finally { ringBuffer.publish(sequence); } } }
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import com.bfxy.entity.TranslatorDataWapper; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.dsl.ProducerType; //環形緩存工作池子工廠 public class RingBufferWorkerPoolFactory { //靜態內部類的單例模式 private static class SingletonHolder { static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory(); } //對外不能暴露的接口 private RingBufferWorkerPoolFactory(){ } //對外創建 public static RingBufferWorkerPoolFactory getInstance() { return SingletonHolder.instance; } //生產者池 private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>(); //消費者池 private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>(); private RingBuffer<TranslatorDataWapper> ringBuffer; private SequenceBarrier sequenceBarrier; private WorkerPool<TranslatorDataWapper> workerPool; //ProducerType 生產者類型是多生產還是單生產 public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) { //1. 構建ringBuffer對象 this.ringBuffer = RingBuffer.create(type, new EventFactory<TranslatorDataWapper>() { public TranslatorDataWapper newInstance() { return new TranslatorDataWapper(); } }, bufferSize, waitStrategy); //2.設置序號柵欄 this.sequenceBarrier = this.ringBuffer.newBarrier(); //3.設置工作池 this.workerPool = new WorkerPool<TranslatorDataWapper>( this.ringBuffer, this.sequenceBarrier, new EventExceptionHandler(), messageConsumers); //4 把所構建的消費者置入池中 for(MessageConsumer mc : messageConsumers){ this.consumers.put(mc.getConsumerId(), mc); } //5 添加我們的sequences this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences()); //6 啟動我們的工作池 this.workerPool.start( Executors.newFixedThreadPool (Runtime.getRuntime().availableProcessors()/2)); } //生產者 public MessageProducer getMessageProducer(String producerId){ MessageProducer messageProducer = this.producers.get( producerId); if(null == messageProducer) { messageProducer = new MessageProducer(producerId, this.ringBuffer); this.producers.put(producerId, messageProducer); } return messageProducer; } /** * 異常靜態類 */ static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> { public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) { } public void handleOnStartException(Throwable ex) { } public void handleOnShutdownException(Throwable ex) { } } }
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工廠 */ public final class MarshallingCodeCFactory { /** * 創建Jboss Marshalling解碼器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //創建了MarshallingConfiguration對象,配置了版本號為5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據marshallerFactory和configuration創建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } /** * 創建Jboss Marshalling編碼器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
生產者的邏輯:
import com.bfxy.disruptor.MessageConsumer; import com.bfxy.entity.TranslatorData; import com.bfxy.entity.TranslatorDataWapper; import io.netty.channel.ChannelHandlerContext; public class MessageConsumerImpl4Server extends MessageConsumer { public MessageConsumerImpl4Server(String consumerId) { super(consumerId); } public void onEvent(TranslatorDataWapper event) throws Exception { TranslatorData request = event.getData(); ChannelHandlerContext ctx = event.getCtx(); //1.業務處理邏輯: System.err.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage()); //2.回送響應信息: TranslatorData response = new TranslatorData(); response.setId("resp: " + request.getId()); response.setName("resp: " + request.getName()); response.setMessage("resp: " + request.getMessage()); //寫出response響應信息: ctx.writeAndFlush(response); } }
消費者的邏輯:
import com.bfxy.disruptor.MessageConsumer; import com.bfxy.entity.TranslatorData; import com.bfxy.entity.TranslatorDataWapper; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class MessageConsumerImpl4Client extends MessageConsumer { public MessageConsumerImpl4Client(String consumerId) { super(consumerId); } public void onEvent(TranslatorDataWapper event) throws Exception { TranslatorData response = event.getData(); ChannelHandlerContext ctx = event.getCtx(); //業務邏輯處理: try { System.err.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage()); } finally { ReferenceCountUtil.release(response); } } }
import com.lmax.disruptor.dsl.ProducerType; @SpringBootApplication public class NettyServerApplication { public static void main(String[] args) { SpringApplication.run(NettyServerApplication.class, args); MessageConsumer[] conusmers = new MessageConsumer[4]; for(int i =0; i < conusmers.length; i++) { MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i); conusmers[i] = messageConsumer; } RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI, 1024*1024, //new YieldingWaitStrategy(), new BlockingWaitStrategy(), conusmers); new NettyServer(); } }
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.bfxy.client.MessageConsumerImpl4Client; import com.bfxy.client.NettyClient; import com.bfxy.disruptor.MessageConsumer; import com.bfxy.disruptor.RingBufferWorkerPoolFactory; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.ProducerType; @SpringBootApplication public class NettyClientApplication { public static void main(String[] args) { SpringApplication.run(NettyClientApplication.class, args); MessageConsumer[] conusmers = new MessageConsumer[4]; for(int i =0; i < conusmers.length; i++) { MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i); conusmers[i] = messageConsumer; } RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI, 1024*1024, //new YieldingWaitStrategy(), new BlockingWaitStrategy(), conusmers); //建立連接 並發送消息 new NettyClient().sendData(); } }