
實體對象:
import java.io.Serializable;
public class TranslatorData implements Serializable {
private static final long serialVersionUID = 8763561286199081881L;
private String id;
private String name;
private String message; //傳輸消息體內容
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
import com.bfxy.codec.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class NettyServer {
public NettyServer() {
//1. 創建兩個工作線程組: 一個用於接受網絡請求的線程組. 另一個用於實際處理業務的線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
//2 輔助類
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//表示緩存區動態調配(自適應)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
//緩存區 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//向管道中添加攔截器
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHandler());
}
});
//綁定端口,同步等等請求連接
ChannelFuture cf = serverBootstrap.bind(8765).sync();
System.err.println("Server Startup...");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//優雅停機
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
System.err.println("Sever ShutDown...");
}
}
}
import com.bfxy.disruptor.MessageProducer;
import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
import com.bfxy.entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
TranslatorData request = (TranslatorData)msg;
System.err.println("Sever端: id= " + request.getId()
+ ", name= " + request.getName()
+ ", message= " + request.getMessage());
//數據庫持久化操作 IO讀寫 ---> 交給一個線程池 去異步的調用執行
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
//寫出response響應信息:
ctx.writeAndFlush(response);
*/
TranslatorData request = (TranslatorData)msg;
//自已的應用服務應該有一個ID生成規則
String producerId = "code:sessionId:001";
MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
messageProducer.onData(request, ctx);
}
}
import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
public class MessageConsumerImpl4Server extends MessageConsumer {
public MessageConsumerImpl4Server(String consumerId) {
super(consumerId);
}
public void onEvent(TranslatorDataWapper event) throws Exception {
TranslatorData request = event.getData();
ChannelHandlerContext ctx = event.getCtx();
//1.業務處理邏輯:
System.err.println("Sever端: id= " + request.getId()
+ ", name= " + request.getName()
+ ", message= " + request.getMessage());
//2.回送響應信息:
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
//寫出response響應信息:
ctx.writeAndFlush(response);
}
}
客戶端:
import com.bfxy.codec.MarshallingCodeCFactory;
import com.bfxy.entity.TranslatorData;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class NettyClient {
public static final String HOST = "127.0.0.1";
public static final int PORT = 8765;
//擴展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel>
private Channel channel;
//1. 創建工作線程組: 用於實際處理業務的線程組
private EventLoopGroup workGroup = new NioEventLoopGroup();
private ChannelFuture cf;
public NettyClient() {
this.connect(HOST, PORT);
}
private void connect(String host, int port) {
//2 輔助類(注意Client 和 Server 不一樣)
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
//表示緩存區動態調配(自適應)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
//緩存區 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
});
//綁定端口,同步等等請求連接
this.cf = bootstrap.connect(host, port).sync();
System.err.println("Client connected...");
//接下來就進行數據的發送, 但是首先我們要獲取channel:
this.channel = cf.channel();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//發送數據
public void sendData(){
for(int i =0; i <10; i++){
TranslatorData request = new TranslatorData();
request.setId("" + i);
request.setName("請求消息名稱 " + i);
request.setMessage("請求消息內容 " + i);
this.channel.writeAndFlush(request);
}
}
public void close() throws Exception {
cf.channel().closeFuture().sync();
//優雅停機
workGroup.shutdownGracefully();
System.err.println("Sever ShutDown...");
}
}
vimport com.bfxy.disruptor.MessageProducer;
import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
import com.bfxy.entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class ClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
try {
TranslatorData response = (TranslatorData)msg;
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
//一定要注意 用完了緩存 要進行釋放
ReferenceCountUtil.release(msg);
}
*/
TranslatorData response = (TranslatorData)msg;
String producerId = "code:seesionId:002";
MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
messageProducer.onData(response, ctx);
}
}
import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class MessageConsumerImpl4Client extends MessageConsumer {
public MessageConsumerImpl4Client(String consumerId) {
super(consumerId);
}
public void onEvent(TranslatorDataWapper event) throws Exception {
TranslatorData response = event.getData();
ChannelHandlerContext ctx = event.getCtx();
//業務邏輯處理:
try {
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
ReferenceCountUtil.release(response);
}
}
}


工廠類的封裝:
import java.io.Serializable;
public class TranslatorData implements Serializable {
private static final long serialVersionUID = 8763561286199081881L;
private String id;
private String name;
private String message; //傳輸消息體內容
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
import io.netty.channel.ChannelHandlerContext;
//dis內部需要傳輸的對象
public class TranslatorDataWapper {
//實際的數據
private TranslatorData data;
//ctx對象
private ChannelHandlerContext ctx;
public TranslatorData getData() {
return data;
}
public void setData(TranslatorData data) {
this.data = data;
}
public ChannelHandlerContext getCtx() {
return ctx;
}
public void setCtx(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
}
import com.bfxy.entity.TranslatorDataWapper;
import com.lmax.disruptor.WorkHandler;
//抽象的讓子類實現
public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> {
protected String consumerId;
public MessageConsumer(String consumerId) {
this.consumerId = consumerId;
}
public String getConsumerId() {
return consumerId;
}
public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}
}
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import com.lmax.disruptor.RingBuffer;
import io.netty.channel.ChannelHandlerContext;
//生產者
public class MessageProducer {
private String producerId;
private RingBuffer<TranslatorDataWapper> ringBuffer;
public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
this.producerId = producerId;
this.ringBuffer = ringBuffer;
}
//發送實際的對象和ctx
public void onData(TranslatorData data, ChannelHandlerContext ctx) {
long sequence = ringBuffer.next();
try {
TranslatorDataWapper wapper = ringBuffer.get(sequence);
wapper.setData(data);
wapper.setCtx(ctx);
} finally {
ringBuffer.publish(sequence);
}
}
}
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import com.bfxy.entity.TranslatorDataWapper;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.ProducerType;
//環形緩存工作池子工廠
public class RingBufferWorkerPoolFactory {
//靜態內部類的單例模式
private static class SingletonHolder {
static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
}
//對外不能暴露的接口
private RingBufferWorkerPoolFactory(){
}
//對外創建
public static RingBufferWorkerPoolFactory getInstance() {
return SingletonHolder.instance;
}
//生產者池
private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();
//消費者池
private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();
private RingBuffer<TranslatorDataWapper> ringBuffer;
private SequenceBarrier sequenceBarrier;
private WorkerPool<TranslatorDataWapper> workerPool;
//ProducerType 生產者類型是多生產還是單生產
public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
//1. 構建ringBuffer對象
this.ringBuffer = RingBuffer.create(type,
new EventFactory<TranslatorDataWapper>() {
public TranslatorDataWapper newInstance() {
return new TranslatorDataWapper();
}
},
bufferSize,
waitStrategy);
//2.設置序號柵欄
this.sequenceBarrier = this.ringBuffer.newBarrier();
//3.設置工作池
this.workerPool = new WorkerPool<TranslatorDataWapper>(
this.ringBuffer,
this.sequenceBarrier,
new EventExceptionHandler(), messageConsumers);
//4 把所構建的消費者置入池中
for(MessageConsumer mc : messageConsumers){
this.consumers.put(mc.getConsumerId(), mc);
}
//5 添加我們的sequences
this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
//6 啟動我們的工作池
this.workerPool.start(
Executors.newFixedThreadPool
(Runtime.getRuntime().availableProcessors()/2));
}
//生產者
public MessageProducer getMessageProducer(String producerId){
MessageProducer messageProducer = this.producers.get(
producerId);
if(null == messageProducer) {
messageProducer = new MessageProducer(producerId, this.ringBuffer);
this.producers.put(producerId, messageProducer);
}
return messageProducer;
}
/**
* 異常靜態類
*/
static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
}
public void handleOnStartException(Throwable ex) {
}
public void handleOnShutdownException(Throwable ex) {
}
}
}
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
* Marshalling工廠
*/
public final class MarshallingCodeCFactory {
/**
* 創建Jboss Marshalling解碼器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//創建了MarshallingConfiguration對象,配置了版本號為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根據marshallerFactory和configuration創建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 創建Jboss Marshalling編碼器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
生產者的邏輯:
import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
public class MessageConsumerImpl4Server extends MessageConsumer {
public MessageConsumerImpl4Server(String consumerId) {
super(consumerId);
}
public void onEvent(TranslatorDataWapper event) throws Exception {
TranslatorData request = event.getData();
ChannelHandlerContext ctx = event.getCtx();
//1.業務處理邏輯:
System.err.println("Sever端: id= " + request.getId()
+ ", name= " + request.getName()
+ ", message= " + request.getMessage());
//2.回送響應信息:
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
//寫出response響應信息:
ctx.writeAndFlush(response);
}
}
消費者的邏輯:
import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class MessageConsumerImpl4Client extends MessageConsumer {
public MessageConsumerImpl4Client(String consumerId) {
super(consumerId);
}
public void onEvent(TranslatorDataWapper event) throws Exception {
TranslatorData response = event.getData();
ChannelHandlerContext ctx = event.getCtx();
//業務邏輯處理:
try {
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
ReferenceCountUtil.release(response);
}
}
}
import com.lmax.disruptor.dsl.ProducerType;
@SpringBootApplication
public class NettyServerApplication {
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
MessageConsumer[] conusmers = new MessageConsumer[4];
for(int i =0; i < conusmers.length; i++) {
MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
conusmers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
1024*1024,
//new YieldingWaitStrategy(),
new BlockingWaitStrategy(),
conusmers);
new NettyServer();
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.bfxy.client.MessageConsumerImpl4Client;
import com.bfxy.client.NettyClient;
import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
@SpringBootApplication
public class NettyClientApplication {
public static void main(String[] args) {
SpringApplication.run(NettyClientApplication.class, args);
MessageConsumer[] conusmers = new MessageConsumer[4];
for(int i =0; i < conusmers.length; i++) {
MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
conusmers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
1024*1024,
//new YieldingWaitStrategy(),
new BlockingWaitStrategy(),
conusmers);
//建立連接 並發送消息
new NettyClient().sendData();
}
}
