Java 並發框架Disruptor(七)


Disruptor VS BlockingQueue的壓測對比:

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue4Test {

    public static void main(String[] args) {
        final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);
        final long startTime = System.currentTimeMillis();
        //向容器中添加元素
        new Thread(new Runnable() {

            public void run() {
                long i = 0;
                while (i < Constants.EVENT_NUM_OHM) {
                	Data data = new Data(i, "c" + i);
                    try {
                        queue.put(data);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        }).start();

        new Thread(new Runnable() {
            public void run() {
                int k = 0;
                while (k < Constants.EVENT_NUM_OHM) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    k++;
                }
                long endTime = System.currentTimeMillis();
                System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
            }
        }).start();
    }
}

  

public interface Constants {

	int EVENT_NUM_OHM = 1000000;
	
	int EVENT_NUM_FM = 50000000;
	
	int EVENT_NUM_OM = 10000000;
	
}

  

import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class DisruptorSingle4Test {

    public static void main(String[] args) {
        int ringBufferSize = 65536;
        final Disruptor<Data> disruptor = new Disruptor<Data>(
                 new EventFactory<Data>() {
					public Data newInstance() {
						return new Data();
					}
				},
                ringBufferSize,
                Executors.newSingleThreadExecutor(),
                ProducerType.SINGLE, 
                //new BlockingWaitStrategy()
                new YieldingWaitStrategy()
        		);

        DataConsumer consumer = new DataConsumer();
        //消費數據
        disruptor.handleEventsWith(consumer);
        disruptor.start();
        new Thread(new Runnable() {

            public void run() {
                RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
                for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
                    long seq = ringBuffer.next();
                    Data data = ringBuffer.get(seq);
                    data.setId(i);
                    data.setName("c" + i);
                    ringBuffer.publish(seq);
                }
            }
        }).start();
    }
}

  

import com.lmax.disruptor.EventHandler;

public class DataConsumer implements EventHandler<Data> {

    private long startTime;
    private int i;

    public DataConsumer() {
        this.startTime = System.currentTimeMillis();
    }

    public void onEvent(Data data, long seq, boolean bool)
            throws Exception {
        i++;
        if (i == Constants.EVENT_NUM_OHM) {
            long endTime = System.currentTimeMillis();
            System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
        }
    }

}

  

import java.io.Serializable;

public class Data implements Serializable {

	private static final long serialVersionUID = 2035546038986494352L;
	private Long id ;
	private String name;
	
	public Data() {
	}
	public Data(Long id, String name) {
		super();
		this.id = id;
		this.name = name;
	}

	public Long getId() {
		return id;
	}
	public void setId(Long id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
}

  

 

BlockingQueue測試:

1.建立一個工廠Event類,用於創建Event類實例對象

2.需要有一個jian監聽事件類,用於處理數據(Event類)

3.實例化Disruptor實例,配置一系列參數,編寫DisDisruptor核心組件

4.編寫生產者組件,向Disruptor容器中投遞數據

pom.xml添加:

<dependency>
	<groupId>com.lmax</groupId>
	<artifactId>disruptor</artifactId>
	<scope>3.3.2</scope>
</dependency>

  

public class OrderEvent {

	private long value; //訂單的價格

	public long getValue() {
		return value;
	}

	public void setValue(long value) {
		this.value = value;
	}
}

  

import com.lmax.disruptor.EventFactory;

public class OrderEventFactory implements EventFactory<OrderEvent>{

	public OrderEvent newInstance() {
		return new OrderEvent();		//這個方法就是為了返回空的數據對象(Event)
	}
}

  

public class OrderEventHandler implements EventHandler<OrderEvent>{

	public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
		Thread.sleep(Integer.MAX_VALUE);
		System.err.println("消費者: " + event.getValue());
	}
}

  

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {

	private RingBuffer<OrderEvent> ringBuffer;
	
	public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}
	
	public void sendData(ByteBuffer data) {
		//1 在生產者發送消息的時候, 首先 需要從我們的ringBuffer里面 獲取一個可用的序號
		long sequence = ringBuffer.next();	//0	
		try {
			//2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象"
			OrderEvent event = ringBuffer.get(sequence);
			//3 進行實際的賦值處理
			event.setValue(data.getLong(0));			
		} finally {
			//4 提交發布操作
			ringBuffer.publish(sequence);			
		}
	}
}

  

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
	public static void main(String[] args) {
		// 參數准備工作
		OrderEventFactory orderEventFactory = new OrderEventFactory();
		int ringBufferSize = 4;
		ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		/**
		 * 1 eventFactory: 消息(event)工廠對象
		 * 2 ringBufferSize: 容器的長度
		 * 3 executor: 線程池(建議使用自定義線程池) RejectedExecutionHandler
		 * 4 ProducerType: 單生產者 還是 多生產者
		 * 5 waitStrategy: 等待策略
		 */
		//1. 實例化disruptor對象
		Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
				ringBufferSize,
				executor,
				ProducerType.SINGLE,
				new BlockingWaitStrategy());
		
		//2. 添加消費者的監聽 (構建disruptor 與 消費者的一個關聯關系)
		disruptor.handleEventsWith(new OrderEventHandler());
		
		//3. 啟動disruptor
		disruptor.start();
		
		//4. 獲取實際存儲數據的容器: RingBuffer
		RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
		
		OrderEventProducer producer = new OrderEventProducer(ringBuffer);
		
		ByteBuffer bb = ByteBuffer.allocate(8);
		
		for(long i = 0 ; i < 100; i ++){
			bb.putLong(0, i);
			producer.sendData(bb);
		}
		
		disruptor.shutdown();
		executor.shutdown();
		
	}
}

  

 

 

 

 

public final class BlockingWaitStrategy implements WaitStrategy
{
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if ((availableSequence = cursorSequence.get()) < sequence)
        {
            lock.lock();
            try
            {
                while ((availableSequence = cursorSequence.get()) < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();
        }
        finally
        {
            lock.unlock();
        }
    }
}

  

public final class SleepingWaitStrategy implements WaitStrategy
{
    private static final int DEFAULT_RETRIES = 200;

    private final int retries;

    public SleepingWaitStrategy()
    {
        this(DEFAULT_RETRIES);
    }

    public SleepingWaitStrategy(int retries)
    {
        this.retries = retries;
    }

    @Override
    public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        int counter = retries;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        barrier.checkAlert();

        if (counter > 100)
        {
            --counter;
        }
        else if (counter > 0)
        {
            --counter;
            Thread.yield();
        }
        else
        {
            LockSupport.parkNanos(1L);
        }

        return counter;
    }
}

  

public final class YieldingWaitStrategy implements WaitStrategy
{
    private static final int SPIN_TRIES = 100;

    @Override
    public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        int counter = SPIN_TRIES;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        barrier.checkAlert();

        if (0 == counter)
        {
            Thread.yield();
        }
        else
        {
            --counter;
        }

        return counter;
    }
}

  

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM