一、創建event類 Order
public class Order { private String id; private String name; private double price; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
二、創建消費者類 Consumer
import com.lmax.disruptor.WorkHandler; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; public class Consumer implements WorkHandler<Order> { private String consumerId; private static AtomicInteger count = new AtomicInteger(0); private Random random = new Random(); public Consumer(String consumerId) { this.consumerId = consumerId; } @Override public void onEvent(Order event) throws Exception { Thread.sleep(1 * random.nextInt(5)); System.out.println("當前消費者:" + this.consumerId + ",消費信息ID:"+event.getId()); count.incrementAndGet(); } public int getCount() { return count.get(); } }
三、創建生產者類 Producer
import com.lmax.disruptor.RingBuffer; public class Producer { private RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(String data) { long sequnce = ringBuffer.next(); try { Order order = ringBuffer.get(sequnce); order.setId(data); } finally { ringBuffer.publish(sequnce); } }
四、創建測試類
import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.ProducerType; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; public class TestMain { public static void main(String[] args) throws Exception{ //1 創建ringbuffer RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { @Override public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy()); //2 通過ringbuffer 創建一個屏障 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //3 創建多個消費者 Consumer[] consumers = new Consumer[10]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new Consumer("C" + i); } //4 構建多消費者工作池 WorkerPool<Order> workerPool = new WorkerPool<Order>( ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumers); //5 設置多個消費者的sequence 序號用於單獨統計消費進度,並且設置到ringbuffer中 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); //6 啟動workPool workerPool.start(Executors.newFixedThreadPool(10)); //設置異步生產 100個生產者 CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) { Producer producer = new Producer(ringBuffer); new Thread(new Runnable() { @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 0; j < 100; j++) { producer.sendData(UUID.randomUUID().toString()); } } }).start(); } Thread.sleep(2000); System.out.println("--------線程創建完畢,開始生產數據----------"); latch.countDown(); Thread.sleep(10000); System.out.println("消費者處理的任務總數:" + consumers[0].getCount()); } //創建exception類 static class EventExceptionHandler implements ExceptionHandler<Order> { @Override public void handleEventException(Throwable ex, long sequence, Order event) { } @Override public void handleOnStartException(Throwable ex) { } @Override public void handleOnShutdownException(Throwable ex) { } } }