- 感知階段
隨着軟件業的發展,互聯網用戶的日漸增多,並發這門藝術的興起似乎是那么合情合理。每日PV十多億的淘寶,處理並發的手段可謂是業界一流。用戶訪問淘寶首頁的平均等待時間只有區區幾秒,但是服務器所處理的流程十分復雜。首先負責首頁的服務器就有好幾千台,通過計算把與用戶路由最近的服務器處理首頁的返回。其次是網頁上的資源,就JS和CSS文件就有上百個,還有圖片資源等。它能在幾秒內加載出來可見阿里幾千名頂尖工程師的智慧是如何登峰造極。
而在大型電商網站中,他們的服務或者應用解耦之后,是通過消息隊列在彼此間通信的。消息隊列和應用之間的架構關系就是生產者消費者模型。
在介紹之前,先找找現實間的模型。筆者最近發覺,很多技術模型是和生活中的模型息息相關的。相信多數人都進過肯德基和麥當勞消費,筆者進店消費的時候發現他們的點單流程和並發模型十分接近。雖然每家店的流程有所差異,但是大概就只有兩種模型。在肯德基里,你點單之后點單員會把所點的食物完成封裝之后拿來你面前,然后讓你結賬,有時候有些耗時操作沒完成就會留下一個餐台號稍后送來。而在麥當勞的點餐模型大致是,你點完快餐之后要求你立即付款,付完款之后下一位點餐,而取餐的是在旁邊等待,另一個服務員專責負責配餐。

肯德基流程

麥當勞點餐圖
在並發模型中,肯德基比較傾向於一個線程把所有的服務都做完,而麥當勞傾向於服務解耦,讓他們更專注於自己的業務。而肯德基的模型與BIO服務器的模型設計類似,麥當勞的模型則與生產者消費者模型十分相似。
- 生產消費者模型
生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過內存緩沖區進行通信,生產者生產消費者需要的資料,消費者把資料做成產品。生產消費者模式如下圖。
在日益發展的服務類型中,譬如注冊用戶這種服務,它可能解耦成好幾種獨立的服務(賬號驗證,郵箱驗證碼,手機短信碼等)。它們作為消費者,等待用戶輸入數據,在前台數據提交之后會經過分解並發送到各個服務所在的url,分發的那個角色就相當於生產者。消費者在獲取數據時候有可能一次不能處理完,那么它們各自有一個請求隊列,那就是內存緩沖區了。做這項工作的框架叫做消息隊列。
- 生產者消費者模型的實現
生產者是一堆線程,消費者是另一堆線程,內存緩沖區可以使用List數組隊列,數據類型只需要定義一個簡單的類就好。關鍵是如何處理多線程之間的協作。這其實也是多線程通信的一個范例。
在這個模型中,最關鍵就是內存緩沖區為空的時候消費者必須等待,而內存緩沖區滿的時候,生產者必須等待。其他時候可以是個動態平衡。值得注意的是多線程對臨界區資源的操作時候必須保證在讀寫中只能存在一個線程,所以需要設計鎖的策略。
下面這個例子是書上介紹的,生產者負責生產一個數字並存入緩沖區,消費者從緩沖區中取出數據並且求出它的平方並輸出。
package ProducterAndConsumer.Version1;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生產者
* @author ctk
* 生產者消費者模型
*/
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;// 內存緩沖區
private static AtomicInteger count = new AtomicInteger();// 總數 原子操作
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producting id:" + Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data + " 加入隊列");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.err.println(" 加入隊列失敗");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
package ProducterAndConsumer.Version1;
/**
* 消費者
* @author ctk
*/
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id :"+Thread.currentThread().getId());
Random r = new Random();
try{
while(true){
PCData data = queue.take();
if(data != null)
{
int re = data.getData() * data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
package ProducterAndConsumer.Version1;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 主函數
* @author ctk
*
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
Producer p1 = new Producer(queue);
Producer p2 = new Producer(queue);
Producer p3 = new Producer(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
Thread.sleep(10*1000);
p1.stop();
p2.stop();
p3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
package ProducterAndConsumer.Version1;
/**
* 容器數據類型
* @author ctk
*
*/
public class PCData {
private final int intData;
public PCData(int d){
intData = d;
}
public PCData(String d){
intData = Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+intData;
}
}
因為BlockingQueue是一個阻塞隊列,它的存取可以保證只有一個線程在進行,所以根據邏輯,生產者在內存滿的時候進行等待,並且喚醒消費者隊列,反過來消費者在飢餓狀態下等待並喚醒生產者進行生產。
下面的兩個版本是使用notify/wait()和await()/signal()方法進行設計的。在結構上是一致遵從模型圖的。
package ProducterAndConsumer.Version2;
import java.util.List;
/**
* 消費者
*
* @author ctk
*
*/
public class Consumer implements Runnable {
private List<PCData> queue;
public Consumer(List<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
if (Thread.currentThread().isInterrupted())
break;
PCData data = null;
synchronized (queue) {
if (queue.size() == 0) {
queue.wait();
queue.notifyAll();
}
data = queue.remove(0);
}
System.out.println(
Thread.currentThread().getId() + " 消費了:" + data.get() + " result:" + (data.get() * data.get()));
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package ProducterAndConsumer.Version2;
import java.util.List;
import java.util.Random;
/**
* 生產者
*
* @author MacBook
*
*/
public class Producer implements Runnable {
private List<PCData> queue;
private int length;
public Producer(List<PCData> queue, int length) {
this.queue = queue;
this.length = length;
}
@Override
public void run() {
try {
while (true) {
if (Thread.currentThread().isInterrupted())
break;
Random r = new Random();
long temp = r.nextInt(100);
System.out.println(Thread.currentThread().getId() + " 生產了:" + temp);
PCData data = new PCData();
data.set(temp);
synchronized (queue) {
if (queue.size() >= length) {
queue.notifyAll();
queue.wait();
} else
queue.add(data);
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package ProducterAndConsumer.Version2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
List<PCData> queue = new ArrayList<PCData>();
int length = 10;
Producer p1 = new Producer(queue,length);
Producer p2 = new Producer(queue,length);
Producer p3 = new Producer(queue,length);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
}
}
package ProducterAndConsumer.Version2;
/**
* 基本數據類型
* @author ctk
*
*/
public class PCData {
private long value;
public void set(long value){
this.value = value;
}
public long get(){
return value;
}
}
package ProducterAndConsumer.Version3;
import java.util.List;
/**
* 消費者
* @author ctk
*
*/
public class Consumer implements Runnable{
private List<PCData> queue;
public Consumer(List<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
if (Thread.currentThread().isInterrupted())
break;
PCData data = null;
Main.lock.lock();
if (queue.size() == 0){
Main.full.signalAll();
Main.empty.await();
}
Thread.sleep(1000);
data = queue.remove(0);
Main.lock.unlock();
System.out.println("消費者ID:"+Thread.currentThread().getId()+" 消費了:"+data.getData()+" result:"+(data.getData()*data.getData()));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package ProducterAndConsumer.Version3;
import java.util.List;
import java.util.Random;
/**
* 生產者
* @author ctk
*
*/
public class Producter implements Runnable{
private List<PCData> queue;
private int len;
public Producter(List<PCData> queue,int len){
this.queue = queue;
this.len = len;
}
@Override
public void run() {
try{
while(true){
if(Thread.currentThread().isInterrupted())
break;
Random r = new Random();
PCData data = new PCData();
data.setData(r.nextInt(500));
Main.lock.lock();
if(queue.size() >= len)
{
Main.empty.signalAll();
Main.full.await();
}
Thread.sleep(1000);
queue.add(data);
Main.lock.unlock();
System.out.println("生產者ID:"+Thread.currentThread().getId()+" 生產了:"+data.getData());
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package ProducterAndConsumer.Version3;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
public static ReentrantLock lock = new ReentrantLock();
public static Condition empty = lock.newCondition();
public static Condition full = lock.newCondition();
public static void main(String[] args) {
List<PCData> queue = new ArrayList<PCData>();
int length = 10;
Producter p1 = new Producter(queue,length);
Producter p2 = new Producter(queue,length);
Producter p3 = new Producter(queue,length);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
}
}
package ProducterAndConsumer.Version3;
public class PCData {
private int data;
public int getData() {
return data;
}
public void setData(int data) {
this.data = data;
}
}
await的版本我個人寫出來之后感覺,每次控制台只輸出了一句話,說明在同一時間內生產者或者消費者只有一個是激活的,而wait的版本,一次可能有多個生成者激活。我個人覺得wait的版本更接近我的構想。


- 生產消費者模型思維
下午翻書,偶然發現並行計算的流水線思維。並行計算的要點就是分治法思維,如果能證明分割的兩部分在因果上沒有關聯,則可以進行並行計算。譬如書上的例子(A+B)*C,這個算式是不能使用並行計算分割的,因為它的結果是A+B之后的結果乘以C。但是並行流水線的思維是,我們可以請兩個工人,每個工人負責一步的處理。
分解后的架構是:P1:D = A + B;P2:R = D*3;
在這兩個線程處理中並不需要存在因果,所以他們可以並行計算了。
設計這個模式是基於生產消費者模型的,流水線需要使用流水線傳遞半成品,流水線就是內存緩沖區,對於P2來說,P1就是生產者,而對於系統需要的結果來說,P2就是生產者。

- 后記
偶然讀到一本書,上面提到的建立高速公路的學習方法是十分高效的學習方法,在學習新的技術的時候它們或多或少都會在現實中有所映射,所以讀萬卷書行萬里路,經歷和學術需要並行增長。技術模型不僅應用在技術領域,管理領域也可以參照思考,learn more,study less。
轉自:https://www.cnblogs.com/chentingk/p/6497107.html

