高並發處理:請求合並


場景:在高並發的查詢場景下,如果查詢的參數都是相似的,類似id=1,id=2,id=3這種那么就可以通過請求合並來解決

請求合並就是每隔一段時間就將這段時間內的請求合並到一起進行批量查詢,減少查詢數據庫的操作。

請求合並是以時間換空間的方式

類似於Redis就是以空間換時間的方式

技術實現要求:

LinkedBlockQueue阻塞隊列

ScheduledThreadPoolExecutor 定時任務線程池

CompleteableFuture future阻塞機制

@Service
public class CommodityService {
    @Autowired
    QueryServiceRemoteCall queryServiceRemoteCall;

    /**
     * 請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區分
     * CompletableFuture將處理結果返回
     */
    class Request{
        String code;
        CompletableFuture completableFuture;
    }
    /*
    LinkedBlockingQueue是一個阻塞的隊列,內部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
    LinkedBlockingQueue與ArrayBlockingQueue的區別
    ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小於添加的速度時,容易造成OOM。
    ArrayBlockingQueue的存儲容器是數組,而LinkedBlockingQueue是存儲容器是鏈表
    兩者的實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
     */
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();   //這里因為是測試,所以使用的是無界隊列

    @PostConstruct
    public void init(){
        //定時任務線程池,創建一個支持定時、周期性或延時任務的限定線程數目(這里傳入的是1)的線程池
        //scheduleAtFixedRate是周期性執行 schedule是延遲執行 initialDelay是初始延遲 period是周期間隔 后面是單位
        //這里我寫的是周期性執行10毫秒執行一次
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(()->{
            int size = queue.size();
            //如果隊列沒數據,表示這段時間沒有請求,直接返回
            if(size==0){
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合並了"+size+"個請求");
            //將隊列的請求消費到一個集合保存
            for (int i=0;i<size;i++){
                list.add(queue.poll());
            }
            //拿到我們需要去數據庫查詢的特征,保存為集合
            List<String> commodityCodes = new ArrayList<>();
            for (Request request : list) {
                commodityCodes.add(request.code);
            }
            //將參數傳入service處理
            Map<String, HashMap<String, Object>> response = queryServiceRemoteCall.queryCommodityByCodeBatch(commodityCodes);
            //將處理結果返回各自的請求
            for (Request request : list) {
                Map<String,Object> result = response.get(request.code);
                request.completableFuture.complete(result);    //completableFuture.complete方法完成賦值,這一步執行完畢,阻塞的請求可以繼續執行了
            }
        },0,10,TimeUnit.MILLISECONDS);
    }

    public Map<String,Object> queryCommodity(String code) throws ExecutionException, InterruptedException {
        Request request = new Request();
        request.code = code;
        CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
        request.completableFuture =  future;
        //將對象傳入隊列
        queue.add(request);
        //如果這時候沒完成賦值,那么就會阻塞,知道能夠拿到值
        return future.get();
    }
@SpringBootTest
public class RequestMerge {
    @Autowired
    CommodityService commodityService;
    @Test
    void context() throws ExecutionException, InterruptedException {
        //CountDownLatch來讓主線程等待
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i=0;i<100;i++){
            final String code = "code"+i;
            Thread thread = new Thread(() -> {
                try {
                    Map<String, Object> map = commodityService.queryCommodity("000" + code);
                    System.out.println(Thread.currentThread().getName() + "的查詢結果是:" + map);
                } catch (Exception e) {
                    System.out.println(Thread.currentThread().getName() + "出現異常:" + e.getMessage());
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
            thread.setName("price-thread-"+code);
            thread.start();
        }
        countDownLatch.await();
    }
}
@Service
public class QueryServiceRemoteCall {
    /**
     * 模擬從數據庫查詢
     * @param codes
     * @return
     */
    public Map<String,HashMap<String, Object>> queryCommodityByCodeBatch(List<String> codes) {
        Map<String,HashMap<String, Object>> result = new HashMap();
        for (String code : codes) {
            HashMap<String, Object> hashMap = new HashMap<>();
            hashMap.put("commodityId", new Random().nextInt(999999999));
            hashMap.put("code", code);
            hashMap.put("phone", "huawei");
            hashMap.put("isOk", "true");
            hashMap.put("price","4000");
            result.put(code,hashMap);
        }
        return result;
    }
}

缺點:請求的時間在執行實際的邏輯之前增加了等待時間,不適合低並發的場景。

優點:可以將每段時間內的請求合並,減少數據庫的壓力,避免大量重復語句的查詢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM