三個生產者向一個內存容器中產生數據,另外三個消費者從容器中消費數據。
public class Main { public static void main(String[] args) throws Exception { //內存緩沖區 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10); //生產者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); //消費者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //創建線程池運行,這是一個緩存的線程池,可以創建無窮大的線程,沒有任務的時候不創建線程。空閑線程存活時間為60s(默認值) ExecutorService cachePool = Executors.newCachedThreadPool(); cachePool.execute(p1); cachePool.execute(p2); cachePool.execute(p3); cachePool.execute(c1); cachePool.execute(c2); cachePool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // cachePool.shutdown(); // cachePool.shutdownNow(); } }
public class Provider implements Runnable{ //共享緩存區 private BlockingQueue<Data> queue; //多線程間是否啟動變量,有強制從主內存中刷新的功能。即時返回線程的狀態 private volatile boolean isRunning = true; //id生成器,因為多個producer共享這個變量 private static AtomicInteger count = new AtomicInteger(); //隨機對象 private static Random r = new Random(); public Provider(BlockingQueue queue){ this.queue = queue; } public void run() { while(isRunning){ try { //隨機休眠0 - 1000 毫秒 表示獲取數據(產生數據的耗時) Thread.sleep(r.nextInt(1000)); //獲取的數據進行累計... int id = count.incrementAndGet(); //比如通過一個getData方法獲取了 Data data = new Data(Integer.toString(id), "數據" + id); System.out.println("當前線程:" + Thread.currentThread().getName() + ", 獲取了數據,id為:" + id + ", 進行裝載到公共緩沖區中..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ System.out.println("提交緩沖區數據失敗...."); //do something... 比如重新提交 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop(){ this.isRunning = false; } }
public class Consumer implements Runnable{ private BlockingQueue<Data> queue; public Consumer(BlockingQueue queue){ this.queue = queue; } //隨機對象 private static Random r = new Random(); public void run() { while(true){ try { //獲取數據 Data data = this.queue.take(); //進行數據處理。休眠0 - 1000毫秒模擬耗時 Thread.sleep(r.nextInt(1000)); System.out.println("當前消費線程:" + Thread.currentThread().getName() + ", 消費成功,消費數據為id: " + data.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public final class Data { private String id; private String name; public Data(String id, String name){ this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString(){ return "{id: " + id + ", name: " + name + "}"; } }