生產者和消費者模式的好處是能夠實現異步和解耦,即生產者生產出消息后不需要立馬等到消息的執行結果而繼續向下執行,在多線程技術中采用同步隊列的方式來達到消息的生產者和消費者解耦的目的。
我們這個實例中實現是生產者不停的往同步隊列中塞數據,而消費者從同步隊列中取出數據進行處理。
Wrong類代碼:
package com.zte.ems.thread; import java.util.concurrent.ConcurrentLinkedQueue; public class Wrong { private ConcurrentLinkedQueue<Integer> linkedQueues = new ConcurrentLinkedQueue<Integer>(); // 私有構造函數,防止通過new的方式創建,另外在構造函數中啟動內部線程 private Wrong() { Worker work = new Worker(); new Thread(work).start(); } private static Wrong wrong = new Wrong(); // wr:使用單例創建Wrong對象,這個多個線程便可以共享一個wrong對象了。 public static Wrong getWrInstance() { if (wrong == null) { wrong = new Wrong(); return wrong; } else { return wrong; } } // 將生成出來的數據添加到隊列中 public void put(Integer number) { linkedQueues.add(number); System.out.println("加入到同步隊列的數據為:" + number); } // 另一個線程不停得從隊列中取出數據 public void get() { while (!linkedQueues.isEmpty()) { Integer number = linkedQueues.remove(); System.out.println("從同步隊列中取出的數字為:" + number); } } public class Worker implements Runnable { @Override public void run() { while (true) { get(); } } } }
ThreadMain類代碼:
package com.zte.ems.thread; public class ThreadMain { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 100; i++) { if (i % 5 == 0) { Wrong.getWrInstance().put(i); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } }
最終實現的結果:

由於我們只是簡單的模擬,因此生產出來的消息非常簡單,只是一些數字,但是在實際的項目開發中,生產的消息可以是具體的業務類。然后消費者再從Wrong類下的同步隊列中進行取出處理。
另外,在單一服務器內部我們可以通過多線程共享同步隊列的方式來實現異步和解耦,即處在業務邏輯前面的線程將消息數據輸出寫入到同步隊列中,處在業務邏輯后面的線程再從同步隊列中讀取出消息數據進行處理。
但是在目前比較流行的分布式系統環境下, 多個服務器集群則是通過分布式消息隊列的方式來實現異步和解耦,比較流行的消息中間件有ActiveMQ、kafka、RocketMQ等。分布式消息隊列可以看成是內存隊列的分布式部署。
