Disruptor與Netty實現百萬級(十)


實體對象:

import java.io.Serializable;

public class TranslatorData implements Serializable {
	
	private static final long serialVersionUID = 8763561286199081881L;

	private String id;
	private String name;
	private String message;	//傳輸消息體內容
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
}

  

import com.bfxy.codec.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class NettyServer {

	public NettyServer() {
		//1. 創建兩個工作線程組: 一個用於接受網絡請求的線程組. 另一個用於實際處理業務的線程組
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();

		//2 輔助類
		ServerBootstrap serverBootstrap = new ServerBootstrap();
		try {
			serverBootstrap.group(bossGroup, workGroup)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_BACKLOG, 1024)
			//表示緩存區動態調配(自適應)
			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
			//緩存區 池化操作
			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
			//日志
			.handler(new LoggingHandler(LogLevel.INFO))
			.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel sc) throws Exception {
					//向管道中添加攔截器
					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
					sc.pipeline().addLast(new ServerHandler());
				}
			});
			//綁定端口,同步等等請求連接
			ChannelFuture cf = serverBootstrap.bind(8765).sync();
			System.err.println("Server Startup...");
			cf.channel().closeFuture().sync();
		
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			//優雅停機
			bossGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
			System.err.println("Sever ShutDown...");
		}
	}	
}

  

import com.bfxy.disruptor.MessageProducer;
import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
import com.bfxy.entity.TranslatorData;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	/**
    	TranslatorData request = (TranslatorData)msg;
    	System.err.println("Sever端: id= " + request.getId() 
    					+ ", name= " + request.getName() 
    					+ ", message= " + request.getMessage());
    	//數據庫持久化操作 IO讀寫 ---> 交給一個線程池 去異步的調用執行
    	TranslatorData response = new TranslatorData();
    	response.setId("resp: " + request.getId());
    	response.setName("resp: " + request.getName());
    	response.setMessage("resp: " + request.getMessage());
    	//寫出response響應信息:
    	ctx.writeAndFlush(response);
    	*/
    	TranslatorData request = (TranslatorData)msg;
    	//自已的應用服務應該有一個ID生成規則
    	String producerId = "code:sessionId:001";
    	MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
    	messageProducer.onData(request, ctx);

    }
}

  

import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;

import io.netty.channel.ChannelHandlerContext;

public class MessageConsumerImpl4Server extends MessageConsumer {

	public MessageConsumerImpl4Server(String consumerId) {
		super(consumerId);
	}

	public void onEvent(TranslatorDataWapper event) throws Exception {
		TranslatorData request = event.getData();
		ChannelHandlerContext ctx = event.getCtx();
		//1.業務處理邏輯:
    	System.err.println("Sever端: id= " + request.getId() 
		+ ", name= " + request.getName() 
		+ ", message= " + request.getMessage());
    	
    	//2.回送響應信息:
    	TranslatorData response = new TranslatorData();
    	response.setId("resp: " + request.getId());
    	response.setName("resp: " + request.getName());
    	response.setMessage("resp: " + request.getMessage());
    	//寫出response響應信息:
    	ctx.writeAndFlush(response);
	}
}

  客戶端:

import com.bfxy.codec.MarshallingCodeCFactory;
import com.bfxy.entity.TranslatorData;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class NettyClient {

	public static final String HOST = "127.0.0.1";
	public static final int PORT = 8765;

	
	//擴展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel> 
	private Channel channel;	
	
	//1. 創建工作線程組: 用於實際處理業務的線程組
	private EventLoopGroup workGroup = new NioEventLoopGroup();
	
	private ChannelFuture cf;
	
	public NettyClient() {
		this.connect(HOST, PORT);
	}

	private void connect(String host, int port) {
		//2 輔助類(注意Client 和 Server 不一樣)
		Bootstrap bootstrap = new Bootstrap();
		try {
			
			bootstrap.group(workGroup)
			.channel(NioSocketChannel.class)
			//表示緩存區動態調配(自適應)
			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
			//緩存區 池化操作
			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
			.handler(new LoggingHandler(LogLevel.INFO))
			.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel sc) throws Exception {
					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
					sc.pipeline().addLast(new ClientHandler());
				}
			});
			//綁定端口,同步等等請求連接
			this.cf = bootstrap.connect(host, port).sync();
			System.err.println("Client connected...");
			
			//接下來就進行數據的發送, 但是首先我們要獲取channel:
			this.channel = cf.channel();
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	//發送數據
	public void sendData(){
		for(int i =0; i <10; i++){
			TranslatorData request = new TranslatorData();
			request.setId("" + i);
			request.setName("請求消息名稱 " + i);
			request.setMessage("請求消息內容 " + i);
			this.channel.writeAndFlush(request);
		}
	}
	
	public void close() throws Exception {
		cf.channel().closeFuture().sync();
		//優雅停機
		workGroup.shutdownGracefully();
		System.err.println("Sever ShutDown...");		
	}
}

  

vimport com.bfxy.disruptor.MessageProducer;
import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
import com.bfxy.entity.TranslatorData;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	
    	/**
    	try {
    		TranslatorData response = (TranslatorData)msg;
    		System.err.println("Client端: id= " + response.getId() 
    				+ ", name= " + response.getName()
    				+ ", message= " + response.getMessage());
		} finally {
			//一定要注意 用完了緩存 要進行釋放
			ReferenceCountUtil.release(msg);
		}
		*/
    	TranslatorData response = (TranslatorData)msg;
    	String producerId = "code:seesionId:002";
    	MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
    	messageProducer.onData(response, ctx);
    }
}

  

import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class MessageConsumerImpl4Client extends MessageConsumer {

	public MessageConsumerImpl4Client(String consumerId) {
		super(consumerId);
	}

	public void onEvent(TranslatorDataWapper event) throws Exception {
		TranslatorData response = event.getData();
		ChannelHandlerContext ctx = event.getCtx();
		//業務邏輯處理:
		try {
    		System.err.println("Client端: id= " + response.getId() 
			+ ", name= " + response.getName()
			+ ", message= " + response.getMessage());
		} finally {
			ReferenceCountUtil.release(response);
		}
	}
}

  

 工廠類的封裝:

import java.io.Serializable;

public class TranslatorData implements Serializable {
	
	private static final long serialVersionUID = 8763561286199081881L;

	private String id;
	private String name;
	private String message;	//傳輸消息體內容
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
}

  

import io.netty.channel.ChannelHandlerContext;
//dis內部需要傳輸的對象
public class TranslatorDataWapper {
     //實際的數據
	private TranslatorData data;
	//ctx對象
	private ChannelHandlerContext ctx;

	public TranslatorData getData() {
		return data;
	}

	public void setData(TranslatorData data) {
		this.data = data;
	}

	public ChannelHandlerContext getCtx() {
		return ctx;
	}

	public void setCtx(ChannelHandlerContext ctx) {
		this.ctx = ctx;
	}
}

  

import com.bfxy.entity.TranslatorDataWapper;
import com.lmax.disruptor.WorkHandler;
//抽象的讓子類實現
public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> {

	protected String consumerId;
	
	public MessageConsumer(String consumerId) {
		this.consumerId = consumerId;
	}

	public String getConsumerId() {
		return consumerId;
	}

	public void setConsumerId(String consumerId) {
		this.consumerId = consumerId;
	}
}

  

import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import com.lmax.disruptor.RingBuffer;

import io.netty.channel.ChannelHandlerContext;

//生產者
public class MessageProducer {

	private String producerId;
	
	private RingBuffer<TranslatorDataWapper> ringBuffer;
	
	public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
		this.producerId = producerId;
		this.ringBuffer = ringBuffer;
	}
	//發送實際的對象和ctx
	public void onData(TranslatorData data, ChannelHandlerContext ctx) {
		long sequence = ringBuffer.next();
		try {
			TranslatorDataWapper wapper = ringBuffer.get(sequence);
			wapper.setData(data);
			wapper.setCtx(ctx);
		} finally {
			ringBuffer.publish(sequence);
		}
	}	
}

  

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import com.bfxy.entity.TranslatorDataWapper;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.ProducerType;

//環形緩存工作池子工廠
public class RingBufferWorkerPoolFactory {
    //靜態內部類的單例模式
	private static class SingletonHolder {
		static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
	}
	//對外不能暴露的接口
	private RingBufferWorkerPoolFactory(){
		
	}
	//對外創建
	public static RingBufferWorkerPoolFactory getInstance() {
		return SingletonHolder.instance;
	}
	
    //生產者池
	private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();
	//消費者池
	private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();

	private RingBuffer<TranslatorDataWapper> ringBuffer;

	private SequenceBarrier sequenceBarrier;
	
	private WorkerPool<TranslatorDataWapper> workerPool;
	
	//ProducerType  生產者類型是多生產還是單生產
	public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
		//1. 構建ringBuffer對象
		this.ringBuffer = RingBuffer.create(type,
				new EventFactory<TranslatorDataWapper>() {
					public TranslatorDataWapper newInstance() {
						return new TranslatorDataWapper();
					}
				},
				bufferSize,
				waitStrategy);
		//2.設置序號柵欄
		this.sequenceBarrier = this.ringBuffer.newBarrier();

		//3.設置工作池
		this.workerPool = new WorkerPool<TranslatorDataWapper>(
				this.ringBuffer,
				this.sequenceBarrier, 
				new EventExceptionHandler(), messageConsumers);
		
		//4 把所構建的消費者置入池中
		for(MessageConsumer mc : messageConsumers){
			this.consumers.put(mc.getConsumerId(), mc);
		}
		
		//5 添加我們的sequences
		this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
		
		//6 啟動我們的工作池
		this.workerPool.start(
				Executors.newFixedThreadPool
				(Runtime.getRuntime().availableProcessors()/2));
	}
	//生產者
	public MessageProducer getMessageProducer(String producerId){
		MessageProducer messageProducer = this.producers.get(
				producerId);
		if(null == messageProducer) {
			messageProducer = new MessageProducer(producerId, this.ringBuffer);
			this.producers.put(producerId, messageProducer);
		}
		return messageProducer;
	}

	/**
	 * 異常靜態類
	 */
	static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
		public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
		}

		public void handleOnStartException(Throwable ex) {
		}

		public void handleOnShutdownException(Throwable ex) {
		}
	}
}

  

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工廠
 */
public final class MarshallingCodeCFactory {

    /**
     * 創建Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
    	//首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		//創建了MarshallingConfiguration對象,配置了版本號為5 
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		//根據marshallerFactory和configuration創建provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
		return decoder;
    }

    /**
     * 創建Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
    }
}

  生產者的邏輯:

import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
public class MessageConsumerImpl4Server extends MessageConsumer {

	public MessageConsumerImpl4Server(String consumerId) {
		super(consumerId);
	}
   
	public void onEvent(TranslatorDataWapper event) throws Exception {
		TranslatorData request = event.getData();
		ChannelHandlerContext ctx = event.getCtx();
		//1.業務處理邏輯:
    	System.err.println("Sever端: id= " + request.getId() 
		+ ", name= " + request.getName() 
		+ ", message= " + request.getMessage());
    	
    	//2.回送響應信息:
    	TranslatorData response = new TranslatorData();
    	response.setId("resp: " + request.getId());
    	response.setName("resp: " + request.getName());
    	response.setMessage("resp: " + request.getMessage());
    	//寫出response響應信息:
    	ctx.writeAndFlush(response);
	}
}

  消費者的邏輯:

import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.entity.TranslatorData;
import com.bfxy.entity.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class MessageConsumerImpl4Client extends MessageConsumer {

	public MessageConsumerImpl4Client(String consumerId) {
		super(consumerId);
	}

	public void onEvent(TranslatorDataWapper event) throws Exception {
		TranslatorData response = event.getData();
		ChannelHandlerContext ctx = event.getCtx();
		//業務邏輯處理:
		try {
    		System.err.println("Client端: id= " + response.getId() 
			+ ", name= " + response.getName()
			+ ", message= " + response.getMessage());
		} finally {
			ReferenceCountUtil.release(response);
		}
	}
}

  

import com.lmax.disruptor.dsl.ProducerType;

@SpringBootApplication
public class NettyServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(NettyServerApplication.class, args);

		MessageConsumer[] conusmers = new MessageConsumer[4];
		for(int i =0; i < conusmers.length; i++) {
			MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
			conusmers[i] = messageConsumer;
		}
		RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
				1024*1024,
				//new YieldingWaitStrategy(),
				new BlockingWaitStrategy(),
				conusmers);
		
		new NettyServer();
	}
}

  

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.bfxy.client.MessageConsumerImpl4Client;
import com.bfxy.client.NettyClient;
import com.bfxy.disruptor.MessageConsumer;
import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

@SpringBootApplication
public class NettyClientApplication {

	public static void main(String[] args) {
		SpringApplication.run(NettyClientApplication.class, args);
		
		MessageConsumer[] conusmers = new MessageConsumer[4];
		for(int i =0; i < conusmers.length; i++) {
			MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
			conusmers[i] = messageConsumer;
		}
		RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
				1024*1024,
				//new YieldingWaitStrategy(),
				new BlockingWaitStrategy(),
				conusmers);
		
		//建立連接 並發送消息
		new NettyClient().sendData();
	}
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM