Disruptor VS BlockingQueue的壓測對比:
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueue4Test {
public static void main(String[] args) {
final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);
final long startTime = System.currentTimeMillis();
//向容器中添加元素
new Thread(new Runnable() {
public void run() {
long i = 0;
while (i < Constants.EVENT_NUM_OHM) {
Data data = new Data(i, "c" + i);
try {
queue.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}).start();
new Thread(new Runnable() {
public void run() {
int k = 0;
while (k < Constants.EVENT_NUM_OHM) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
k++;
}
long endTime = System.currentTimeMillis();
System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
}
}).start();
}
}
public interface Constants {
int EVENT_NUM_OHM = 1000000;
int EVENT_NUM_FM = 50000000;
int EVENT_NUM_OM = 10000000;
}
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class DisruptorSingle4Test {
public static void main(String[] args) {
int ringBufferSize = 65536;
final Disruptor<Data> disruptor = new Disruptor<Data>(
new EventFactory<Data>() {
public Data newInstance() {
return new Data();
}
},
ringBufferSize,
Executors.newSingleThreadExecutor(),
ProducerType.SINGLE,
//new BlockingWaitStrategy()
new YieldingWaitStrategy()
);
DataConsumer consumer = new DataConsumer();
//消費數據
disruptor.handleEventsWith(consumer);
disruptor.start();
new Thread(new Runnable() {
public void run() {
RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
long seq = ringBuffer.next();
Data data = ringBuffer.get(seq);
data.setId(i);
data.setName("c" + i);
ringBuffer.publish(seq);
}
}
}).start();
}
}
import com.lmax.disruptor.EventHandler;
public class DataConsumer implements EventHandler<Data> {
private long startTime;
private int i;
public DataConsumer() {
this.startTime = System.currentTimeMillis();
}
public void onEvent(Data data, long seq, boolean bool)
throws Exception {
i++;
if (i == Constants.EVENT_NUM_OHM) {
long endTime = System.currentTimeMillis();
System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
}
}
}
import java.io.Serializable;
public class Data implements Serializable {
private static final long serialVersionUID = 2035546038986494352L;
private Long id ;
private String name;
public Data() {
}
public Data(Long id, String name) {
super();
this.id = id;
this.name = name;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
BlockingQueue測試:

1.建立一個工廠Event類,用於創建Event類實例對象
2.需要有一個jian監聽事件類,用於處理數據(Event類)
3.實例化Disruptor實例,配置一系列參數,編寫DisDisruptor核心組件
4.編寫生產者組件,向Disruptor容器中投遞數據
pom.xml添加:
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <scope>3.3.2</scope> </dependency>
public class OrderEvent {
private long value; //訂單的價格
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
import com.lmax.disruptor.EventFactory;
public class OrderEventFactory implements EventFactory<OrderEvent>{
public OrderEvent newInstance() {
return new OrderEvent(); //這個方法就是為了返回空的數據對象(Event)
}
}
public class OrderEventHandler implements EventHandler<OrderEvent>{
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(Integer.MAX_VALUE);
System.err.println("消費者: " + event.getValue());
}
}
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生產者發送消息的時候, 首先 需要從我們的ringBuffer里面 獲取一個可用的序號
long sequence = ringBuffer.next(); //0
try {
//2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象"
OrderEvent event = ringBuffer.get(sequence);
//3 進行實際的賦值處理
event.setValue(data.getLong(0));
} finally {
//4 提交發布操作
ringBuffer.publish(sequence);
}
}
}
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) {
// 參數准備工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 4;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 1 eventFactory: 消息(event)工廠對象
* 2 ringBufferSize: 容器的長度
* 3 executor: 線程池(建議使用自定義線程池) RejectedExecutionHandler
* 4 ProducerType: 單生產者 還是 多生產者
* 5 waitStrategy: 等待策略
*/
//1. 實例化disruptor對象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//2. 添加消費者的監聽 (構建disruptor 與 消費者的一個關聯關系)
disruptor.handleEventsWith(new OrderEventHandler());
//3. 啟動disruptor
disruptor.start();
//4. 獲取實際存儲數據的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long i = 0 ; i < 100; i ++){
bb.putLong(0, i);
producer.sendData(bb);
}
disruptor.shutdown();
executor.shutdown();
}
}

















public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if ((availableSequence = cursorSequence.get()) < sequence)
{
lock.lock();
try
{
while ((availableSequence = cursorSequence.get()) < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
}
public final class SleepingWaitStrategy implements WaitStrategy
{
private static final int DEFAULT_RETRIES = 200;
private final int retries;
public SleepingWaitStrategy()
{
this(DEFAULT_RETRIES);
}
public SleepingWaitStrategy(int retries)
{
this.retries = retries;
}
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = retries;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
if (counter > 100)
{
--counter;
}
else if (counter > 0)
{
--counter;
Thread.yield();
}
else
{
LockSupport.parkNanos(1L);
}
return counter;
}
}
public final class YieldingWaitStrategy implements WaitStrategy
{
private static final int SPIN_TRIES = 100;
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = SPIN_TRIES;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
if (0 == counter)
{
Thread.yield();
}
else
{
--counter;
}
return counter;
}
}





