一.並行和並發區別:
-
並行:是指兩者同時執行一件事。比如賽跑,兩個人都在不停的往前跑;
-
並發:是指資源有限的情況下,兩者交替輪流使用資源。比如一段路(單核CPU資源)同時只能過一個人,A走一段后,讓給B,B用完繼續給A ,交替使用,目的是提高效率。
二.什么叫線程安全
-
線程安全就是說多線程訪問同一代碼,不會產生不確定的結果。
-
反之線程不安全就是,多線程在訪問同一代碼時,會發生不確定因素,例如死鎖,數據不一致性等。
三.LinkedBlockingQueue
LinkedBlockingQueue是一個線程安全的阻塞隊列,它實現了BlockingQueue接口,BlockingQueue接口繼承自java.util.Queue接口,並在這個接口的基礎上增加了take和put方法,這兩個方法正是隊列操作的阻塞版本。
由於LinkedBlockingQueue實現是線程安全的,實現了先進先出等特性,是作為生產者消費者的首選;LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE;其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
示例代碼:
package com.lky.test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Test; /** * @Title: testBlockQueue.java * @Package com.lky.test * @Description:多線程模擬實現生產者消費者模型(阻塞隊列) * @author lky * @date 2015年10月24日 下午5:08:01 * @version V1.0 */ public class testBlockQueue { private Log log = LogFactory.getLog(testBlockQueue.class); /** * @Title: testBlockQueue.java * @Package com.lky.test * @Description: 定義阻塞隊列 * @author lky * @date 2015年10月24日 下午5:07:28 * @version V1.0 */ public class Basket { // 隊列的最大容量為3 BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3); // 如果隊列不滿,則放入,否則阻塞等待 public void produce(String apple) throws InterruptedException { basket.put(apple); } // 如果隊列不為空,則取出,否則阻塞等待 public String consumer() throws InterruptedException { return basket.take(); } } /** * @Title: testBlockQueue.java * @Package com.lky.test * @Description: 定義生產者 * @author lky * @date 2015年10月24日 下午5:18:17 * @version V1.0 */ public class Produce implements Runnable { private Basket basket; private String fruit; public Produce(String fruit, Basket basket) { this.basket = basket; this.fruit = fruit; } @Override public void run() { try { while (true) { log.info("[" + Thread.currentThread().getName() + "]" + "開始生產apple----->" + this.fruit); basket.produce(fruit); log.info("apple生產完畢!!!!"); Thread.sleep(1000); } } catch (Exception e) { log.error("生產蘋果異常!!!!!"); } } } /** * @Title: testBlockQueue.java * @Package com.lky.test * @Description: 定義消費者 * @author lky * @date 2015年10月24日 下午5:24:31 * @version V1.0 */ public class Consumer implements Runnable { private Basket basket; public Consumer(Basket basket) { this.basket = basket; } @Override public void run() { try { while (true) { String fruit = basket.consumer(); log.info("[" + Thread.currentThread().getName() + "]" + "取到一個水果: " + fruit); Thread.sleep(1000); } } catch (Exception e) { log.error("消費者取蘋果異常!!!!"); } } } @Test public void test() { System.out.println(Runtime.getRuntime().availableProcessors());//獲取當前系統的cpu數目 Basket basket = new Basket(); Produce produce1 = new Produce("apple", basket); Produce produce2 = new Produce("banna", basket); Consumer consumer = new Consumer(basket); // 新建一個線程池 ExecutorService service = Executors.newCachedThreadPool(); service.submit(produce1); service.submit(produce2); service.submit(consumer); try { Thread.sleep(20000); } catch (Exception e) { log.error("程序異常錯誤!!!!"); } service.shutdown(); } }
四 .ConcurrentLinkedQueue
ConcurrentLinkedQueue是Queue的一個安全實現.Queue中元素按FIFO原則進行排序.采用CAS操作,來保證元素的一致性。
示例代碼:
package com.lky.test; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * @Title: testNBlockQueue.java * @Package com.lky.test * @Description: 多線程模擬實現生產者消費者模型(非阻塞式隊列) * @author lky * @date 2015年10月24日 下午8:02:14 * @version V1.0 */ public class testNBlockQueue { private static Log log = LogFactory.getLog(testNBlockQueue.class); private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); private static int count = 2; private static CountDownLatch latch = new CountDownLatch(count); private static class Poll implements Runnable { @Override public void run() { while (!queue.isEmpty()) { log.info(Thread.currentThread().getName() + "消費一個商品: " + queue.poll()); } latch.countDown(); } } public static void main(String args[]) throws InterruptedException { long timeStart = System.currentTimeMillis(); ExecutorService eService = Executors.newFixedThreadPool(4); // 生產商品 for (int i = 0; i < 100000; ++i) { queue.offer(i); } // 消費者 for (int i = 0; i < count; ++i) { eService.submit(new Poll()); } latch.await();// 使得主線程阻塞,直到latch.getCount()為0 System.out.println("Cost time: " + (System.currentTimeMillis() - timeStart)); eService.shutdown(); } }