同時開10個線程存入和取出100萬的數據,結論如下:
DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue
執行結果如下:
100萬 DoubleBufferedQueue入隊時間:9510 出隊時間:10771
100萬 DoubleBufferedQueue入隊時間:8169 出隊時間:9789
1000萬 DoubleBufferedQueue入隊時間:98285 出隊時間:101088
1000萬 DoubleBufferedQueue入隊時間:101859 出隊時間:105964
100萬 ConcurrentLinkedQueue入隊時間:10557 出隊時間:13716
100萬 ConcurrentLinkedQueue入隊時間:25298 出隊時間:25332
1000萬 ConcurrentLinkedQueue隊列時間:121868 出隊時間:136116
1000萬 ConcurrentLinkedQueue隊列時間:134306 出隊時間:147893
100萬 ArrayBlockingQueue入隊時間:21080 出隊時間:22025
100萬 ArrayBlockingQueue入隊時間:17689 出隊時間:19654
1000萬 ArrayBlockingQueue入隊時間:194400 出隊時間:205968
1000萬 ArrayBlockingQueue入隊時間:192268 出隊時間:197982
100萬 LinkedBlockingQueue入隊時間:38236 出隊時間:52555
100萬 LinkedBlockingQueue入隊時間:30646 出隊時間:38573
1000萬 LinkedBlockingQueue入隊時間:375669 出隊時間:391976
1000萬 LinkedBlockingQueue入隊時間:701363 出隊時間:711217
doubleBufferedQueue:
package test.MoreThread.d; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import test.MoreThread.l.linkedBlockingQueue; import comrt.util.DoubleBufferedQueue; //DoubleBufferedQueue入隊時間:9510 出隊時間:10771 //DoubleBufferedQueue入隊時間:8169 出隊時間:9789 public class doubleBufferedQueue { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); public final static int size1 = 1000000; public static DoubleBufferedQueue<Object> queue = new DoubleBufferedQueue<Object>( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService .submit(new ExecDoubleBufferedQueue()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } doubleBufferedQueue.isOver = true; log.info("入隊列總共執行時間:" + allTime); } }); thread1.start(); // log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new ExecDoubleBufferedQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出隊列總共執行時間:" + allTime_out); } }); thread2.start(); } } class ExecDoubleBufferedQueue implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < doubleBufferedQueue.size1; i++) { doubleBufferedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } } class ExecDoubleBufferedQueue_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!doubleBufferedQueue.isOver) { doubleBufferedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } }
concurrentLinkedQueue:
package test.MoreThread.c; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //ConcurrentLinkedQueue入隊時間:10557 出隊時間:13716 //ConcurrentLinkedQueue入隊時間:25298 出隊時間:25332 public class concurrentLinkedQueue { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); public static ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>(); public final static int size1 = 1000000; public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService.submit(new Exec()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } concurrentLinkedQueue.isOver = true; log.info("隊列總共執行時間:" + allTime); } }); thread1.start(); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new Exec_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出隊列總共執行時間:" + allTime_out); } }); thread2.start(); // log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart)); } } class Exec implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < concurrentLinkedQueue.size1; i++) { concurrentLinkedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } } class Exec_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!concurrentLinkedQueue.isOver) { concurrentLinkedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } }
arrayBlockingQueue:
package test.MoreThread.a; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //ArrayBlockingQueue入隊時間:21080 出隊時間:22025 //ArrayBlockingQueue入隊時間:17689 出隊時間:19654 public class arrayBlockingQueue { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); public final static int size1 = 1000000; public static ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService .submit(new ExecArrayBlockingQueue()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } arrayBlockingQueue.isOver = true; log.info("隊列總共執行時間:" + allTime); } }); thread1.start(); // log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new ExecArrayBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出隊列總共執行時間:" + allTime_out); } }); thread2.start(); } } class ExecArrayBlockingQueue implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < arrayBlockingQueue.size1; i++) { arrayBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } } class ExecArrayBlockingQueue_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!arrayBlockingQueue.isOver) { arrayBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } }
linkedBlockingQueue:
package test.MoreThread.l; import java.util.ArrayList; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //LinkedBlockingQueue入隊時間:38236 出隊時間:52555 //LinkedBlockingQueue入隊時間:30646 出隊時間:38573 public class linkedBlockingQueue { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); public final static int size1 = 1000000; public static LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService .submit(new ExecLinkedBlockingQueue()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } linkedBlockingQueue.isOver = true; log.info("入隊列總共執行時間:" + allTime); } }); thread1.start(); // log.info("主線程執行時間:" + (System.currentTimeMillis() - timestart)); // System.out.println(linkedBlockingQueue.queue.size()); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new ExecLinkedBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出隊列總共執行時間:" + allTime_out); } }); thread2.start(); } } class ExecLinkedBlockingQueue implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < linkedBlockingQueue.size1; i++) { linkedBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } } class ExecLinkedBlockingQueue_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!linkedBlockingQueue.isOver) { linkedBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("執行時間:" + time2); return time2; } }
DoubleBufferedQueue雙緩沖隊列
package comrt.util; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //雙緩沖隊列,線程安全 public class DoubleBufferedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = 1011398447523020L; public static final int DEFAULT_QUEUE_CAPACITY = 5000000; public static final long DEFAULT_MAX_TIMEOUT = 0; public static final long DEFAULT_MAX_COUNT = 10; private Logger logger = LoggerFactory.getLogger(DoubleBufferedQueue.class.getName()); /** The queued items */ private ReentrantLock readLock; // 寫鎖 private ReentrantLock writeLock; // 是否滿 private Condition notFull; private Condition awake; // 讀寫數組 private transient E[] writeArray; private transient E[] readArray; // 讀寫計數 private volatile int writeCount; private volatile int readCount; // 寫數組下標指針 private int writeArrayTP; private int writeArrayHP; // 讀數組下標指針 private int readArrayTP; private int readArrayHP; private int capacity; public DoubleBufferedQueue(int capacity) { // 默認 this.capacity = DEFAULT_QUEUE_CAPACITY; if (capacity > 0) { this.capacity = capacity; } readArray = (E[]) new Object[capacity]; writeArray = (E[]) new Object[capacity]; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); } private void insert(E e) { writeArray[writeArrayTP] = e; ++writeArrayTP; ++writeCount; } private E extract() { E e = readArray[readArrayHP]; readArray[readArrayHP] = null; ++readArrayHP; --readCount; return e; } /** * switch condition: read queue is empty && write queue is not empty * * Notice:This function can only be invoked after readLock is grabbed,or may * cause dead lock * * @param timeout * @param isInfinite * : whether need to wait forever until some other thread awake * it * @return * @throws InterruptedException */ private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException { writeLock.lock(); try { if (writeCount <= 0) { // logger.debug("Write Count:" + writeCount // + ", Write Queue is empty, do not switch!"); try { // logger.debug("Queue is empty, need wait...."); if (isInfinite && timeout <= 0) { awake.await(); return -1; } else if (timeout > 0) { return awake.awaitNanos(timeout); } else { return 0; } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { E[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; readArrayTP = writeArrayTP; writeCount = 0; writeArrayHP = readArrayHP; writeArrayTP = 0; notFull.signal(); // logger.debug("Queue switch successfully!"); return 0; } } finally { writeLock.unlock(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { throw new NullPointerException(); } long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } writeLock.lockInterruptibly(); try { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (writeCount < writeArray.length) { insert(e); if (writeCount == 1) { awake.signal(); } return true; } // Time out if (nanoTime <= 0) { // logger.debug("offer wait time out!"); return false; } // keep waiting try { // logger.debug("Queue is full, need wait...."); nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } return false; } // 取 @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } readLock.lockInterruptibly(); try { if (nanoTime > 0) { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (readCount > 0) { return extract(); } if (nanoTime <= 0) { // logger.debug("poll time out!"); return null; } nanoTime = queueSwap(nanoTime, false); } } else { if (readCount > 0) { return extract(); } queueSwap(nanoTime, false); if (readCount > 0) { return extract(); } } } finally { readLock.unlock(); } return null; } // 等待500毫秒 @Override public E poll() { E ret = null; try { ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e) { ret = null; } return ret; } // 查看 @Override public E peek() { E e = null; readLock.lock(); try { if (readCount > 0) { e = readArray[readArrayHP]; } } finally { readLock.unlock(); } return e; } // 默認500毫秒 @Override public boolean offer(E e) { boolean ret = false; try { ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e2) { ret = false; } return ret; } @Override public void put(E e) throws InterruptedException { // never need to // block offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public E take() throws InterruptedException { return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public int remainingCapacity() { return this.capacity; } @Override public int drainTo(Collection<? super E> c) { return 0; } @Override public int drainTo(Collection<? super E> c, int maxElements) { return 0; } @Override public Iterator<E> iterator() { return null; } // 當前讀隊列中還有多少個 @Override public int size() { int size = 0; readLock.lock(); try { size = readCount; } finally { readLock.unlock(); } return size; } /** * 當前已寫入的隊列大小 * */ public int WriteSize() { int size = 0; writeLock.lock(); try { size = writeCount; } finally { writeLock.unlock(); } return size; } public int unsafeReadSize() { return readCount; } public int unsafeWriteSize() { return writeCount; } public int capacity() { return capacity; } public String toMemString() { return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity; } // 清理 /* * public void clear() { readLock.lock(); writeLock.lock(); try { readCount * = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0; * //logger.debug("Queue clear successfully!"); } finally { * writeLock.unlock(); readLock.unlock(); } } */ }