1、任何的高並發,請求總是會有一個順序的
2、java的隊列的數據結構是先進先出的取值順序
3、BlockingQueue類(線程安全)(使用方法可以百度)
一般使用LinkedBlockingQueue
利用以上幾點,我們可以把高並發時候的請求放入一個隊列,隊列的大小可以自己定義,比如隊列容量為1000個數據,那么可以利用過濾器或者攔截器把當前的請求放入隊列,如果隊列的容量滿了,其余的請求可以丟掉或者作出相應回復
具體實施:
利用生產者、消費者模型:
將隊列的請求一一處理完。
上代碼:
1 /** 2 * @author fuguangli 3 * @description 前沿消費者類 4 * @Create date: 2017/3/7 5 * @using EXAMPLE 6 */ 7 public class Customer implements Runnable{ 8 9 10 /** 11 * 拋出異常 特殊值 阻塞 超時 12 插入 add(e) offer(e) put(e) offer(e, time, unit) 13 移除 remove() poll() take() poll(time, unit) 14 檢查 element() peek() 不可用 不可用 15 16 */ 17 private BlockingQueue blockingQueue; 18 private AtomicInteger count = new AtomicInteger(); 19 public Customer(BlockingQueue blockingQueue) { 20 this.blockingQueue = blockingQueue; 21 } 22 23 /** 24 * When an object implementing interface <code>Runnable</code> is used 25 * to create a thread, starting the thread causes the object's 26 * <code>run</code> method to be called in that separately executing 27 * thread. 28 * <p/> 29 * The general contract of the method <code>run</code> is that it may 30 * take any action whatsoever. 31 * 32 * @see Thread#run() 33 */ 34 @Override 35 public void run() { 36 System.out.println("消費者線程啟動..."); 37 LockFlag.setCustomerRunningFlag(true); 38 try { 39 while (LockFlag.getProducerRunningFlag()){ 40 System.out.println(Thread.currentThread().getId()+"I'm Customer.Queue current size="+blockingQueue.size()); 41 String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS); 42 if(data!=null){ 43 System.out.println(Thread.currentThread().getId()+"*************正在消費數據 data="+data); 44 }else{ 45 //表示超過取值時間,視為生產者不再生產數據 46 System.out.println(Thread.currentThread().getId()+"隊列為空無數據,請檢查生產者是否阻塞"); 47 } 48 Thread.sleep(50); 49 } 50 System.err.println("消費者程序執行完畢"); 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 System.err.println("消費者程序退出"); 54 LockFlag.setCustomerRunningFlag(false);//異常退出線程 55 Thread.currentThread().interrupt(); 56 } 57 } 58 }
1 package com.qysxy.framework.queue; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.atomic.AtomicInteger; 6 7 /** 8 * @author fuguangli 9 * @description 隊列生產者類 10 * @Create date: 2017/3/7 11 * @using EXAMPLE 12 */ 13 public class Producer implements Runnable{ 14 15 16 /** 17 * 拋出異常 特殊值 阻塞 超時 18 插入 add(e) offer(e) put(e) offer(e, time, unit) 19 移除 remove() poll() take() poll(time, unit) 20 檢查 element() peek() 不可用 不可用 21 22 */ 23 private BlockingQueue blockingQueue; 24 private AtomicInteger count = new AtomicInteger(); 25 public Producer(BlockingQueue blockingQueue) { 26 this.blockingQueue = blockingQueue; 27 } 28 29 /** 30 * When an object implementing interface <code>Runnable</code> is used 31 * to create a thread, starting the thread causes the object's 32 * <code>run</code> method to be called in that separately executing 33 * thread. 34 * <p/> 35 * The general contract of the method <code>run</code> is that it may 36 * take any action whatsoever. 37 * 38 * @see Thread#run() 39 */ 40 @Override 41 public void run() { 42 System.out.println("生產者線程啟動..."); 43 LockFlag.setProducerRunningFlag(true); 44 try { 45 while (LockFlag.getProducerRunningFlag()){ 46 String data = "data:"+count.incrementAndGet(); 47 if(blockingQueue.offer(data,10, TimeUnit.SECONDS)){ 48 //返回true表示生產數據正確 49 System.out.println("^^^^^^^^^^^^^^正在生產數據 data="+data); 50 }else { 51 //表示阻塞時間內還沒有生產者生產數據 52 System.out.println("生產者異常,無法生產數據"); 53 } 54 Thread.sleep(50); 55 56 } 57 } catch (InterruptedException e) { 58 e.printStackTrace(); 59 System.err.println("生產者程序退出"); 60 LockFlag.setProducerRunningFlag(false);//異常退出線程 61 Thread.currentThread().interrupt(); 62 } 63 } 64 }
1 package com.qysxy.framework.queue; 2 3 /** 4 * @author fuguangli 5 * @description 前沿生產者消費者模型的鎖類 6 * @Create date: 2017/3/7 7 */ 8 public class LockFlag { 9 /** 10 * 生產者互斥鎖 11 */ 12 private static Boolean producerRunningFlag = false; 13 /** 14 * 消費者互斥鎖 15 */ 16 private static Boolean customerRunningFlag = false; 17 18 public static Boolean getProducerRunningFlag() { 19 return producerRunningFlag; 20 } 21 22 public static void setProducerRunningFlag(Boolean producerRunningFlag) { 23 LockFlag.producerRunningFlag = producerRunningFlag; 24 } 25 26 public static Boolean getCustomerRunningFlag() { 27 return customerRunningFlag; 28 } 29 30 public static void setCustomerRunningFlag(Boolean customerRunningFlag) { 31 LockFlag.customerRunningFlag = customerRunningFlag; 32 } 33 }
1 package com.qysxy.framework.queue; 2 3 import javax.servlet.http.HttpServletRequest; 4 import javax.servlet.http.HttpServletResponse; 5 import java.util.Queue; 6 import java.util.concurrent.*; 7 8 /** 9 * @author fuguangli 10 * @description 前沿隊列實用類,用於大量並發用戶 11 * @Create date: 2017/3/7 12 */ 13 public class BlockingQueueHelper { 14 15 16 private static final Integer maxQueueSize = 1000; 17 private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize); 18 private static ExecutorService threadPool = Executors.newCachedThreadPool(); 19 20 21 public static BlockingQueue getBlockingQueue() { 22 if (blockingQueue == null) { 23 blockingQueue = new LinkedBlockingQueue(maxQueueSize); 24 } 25 return blockingQueue; 26 } 27 28 /** 29 * @param o 隊列處理對象(包含request,response,data) 30 */ 31 public static void requestQueue(Object o) { 32 //檢測當前的隊列大小 33 if (blockingQueue != null && blockingQueue.size() < maxQueueSize) { 34 //可以正常進入隊列 35 if (blockingQueue.offer(o)) { 36 //添加成功,檢測數據處理線程是否正常 37 if (LockFlag.getCustomerRunningFlag()) { 38 //說明處理線程類正常運行 39 } else { 40 //說明處理線程類停止,此時,應重新啟動線程進行數據處理 41 LockFlag.setCustomerRunningFlag(true); 42 43 //example:run 44 Customer customer = new Customer(blockingQueue); 45 threadPool.execute(customer); 46 47 } 48 49 } else { 50 //進入隊列失敗,做出相應的處理,或者嘗試重新進入隊列 51 52 } 53 } else { 54 //隊列不正常,或隊列大小已達上限,做出相應處理 55 56 } 57 58 } 59 }
好了,這時候,利用過濾器或者攔截器將每個請求封裝成隊列元素進行處理就行。
當然了,對於多應用服務器的部署架構來說,數據庫也需要加鎖,數據庫隔離級別下篇再說。