高並發場景-請求合並(二)揭秘HystrixCollapser-利用Queue和線程池異步實現


背景

在互聯網的高並發場景下,請求會非常多,但是數據庫連接池比較少,或者說需要減少CPU壓力,減少處理邏輯的,需要把單個查詢,用某些手段,改為批量查詢多個后返回。
如:支付寶中,查詢“個人信息”,用戶只會觸發一次請求,查詢自己的信息,但是多個人同時這樣做就會產生多次數據庫連接。為了減少連接,需要在JAVA服務端進行合並請求,把多個“個人信息”查詢接口,合並為批量查詢多個“個人信息”接口,然后以個人信息在數據庫的id作為Key返回給上游系統或者頁面URL等調用方。

目的

  1. 減少訪問數據庫的次數
  2. 單位時間內的多個請求,合並為一個請求。讓業務邏輯層把單個查詢的sql,改為批量查詢的sql。或者邏輯里面需要調用redis,那批量邏輯里面就可以用redis的pipeline去實現。
  3. 本次需要使用JDK原生手段來實現請求合並,因為大家不一定會有Hystrix,所以用原生辦法實現,並解析HystrixCollapser里面是如何實現的。

點贊再看,關注公眾號:【地藏思維】給大家分享互聯網場景設計與架構設計方案
掘金:地藏Kelvin https://juejin.im/user/5d67da8d6fb9a06aff5e85f7

主要解決手段

  1. SpringCloud的Hystrix的自定義HystrixCollapse和HystrixCommand
  2. SpringCloud的Hystrix注解方式。
  3. 沒有服務治理框架時,利用JDK隊列、定時任務線程池處理。

在上一章已經說了第一二種,鑒於有同學沒有SpringCloud,所以使用第3種來做請求合並,並一起分析請求合並的原理。

建議先看第一章,第二章相當於為HystrixCollapser的內部原理描述
高並發場景-請求合並(一)SpringCloud中Hystrix請求合並

交互流程

請求合並-SpringCloud (1).png

開發

本章節為利用JDK原生包開發,所以沒有SpringCloud那么多東西要配置,編寫代碼只有一個類。

1. 創建請求層

只需要暴露單個查詢的接口,業務邏輯層里做請求合並的邏輯。

@RestController
public class UserController {
    @Autowired
    private UserBatchWithFutureServiceImpl userBatchWithFutureServiceImpl;
   @RequestMapping(method = RequestMethod.GET,value = "/userbyMergeWithFuture/{id}")
    public User userbyMergeWithFuture(@PathVariable Long id) throws InterruptedException, ExecutionException {
        User user = this.userBatchWithFutureServiceImpl.getUserById(id);
        return user;
    }
}

2. 請求合並邏輯層

  1. 創建請求合並邏輯入口
  2. 創建阻塞隊列,用於累計多個請求參數
  3. 創建CompletableFuture類,為了本條線程阻塞,等批量查詢處理完后,異步獲取當前id對應的User結果信息。
  4. 執行CompletableFuture.get方法等待異步結果通知。
@Component
public class UserBatchWithFutureServiceImpl {
    /** 積攢請求的阻塞隊列 */
    private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>();

    public User getUserById(Long id) throws InterruptedException, ExecutionException {

        UserQueryDto userQueryDto = new UserQueryDto();
        userQueryDto.setId(id);
        CompletableFuture<User> completedFuture = new CompletableFuture<>();
        userQueryDto.setCompletedFuture(completedFuture);

        requestQueue.add(userQueryDto);

        User user = completedFuture.get();
        return user;
    }

HystrixCollapser也是利用這種辦法來做異步通知的手段,讓請求接口主線程在獲得真正結果前阻塞等待。

3. 定時任務

在相同的類下創建定時任務,利用@PostConstruct讓當前類的Bean構造完后執行該方法,生成一個5秒定時任務。
大家可以設定定時的時間,我為了比較方便測試,而用了5秒。

    /** 線程池數量 */
    private int threadNum = 1;
    /** 定時間隔時長 */
    private long period = 5000;
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);
        // 每5秒執行一次
        scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, createDeviceMergePeriod,
                TimeUnit.MILLISECONDS);
    }

HystrixCollapser的每隔n毫秒就會處理一次執行單個方法轉批量方法,也是通過這類來實現的。

4. 在UserBatchWithFutureServiceImpl 類下創建內部類

創建內部類為了定時任務執行此邏輯,並且為了代碼整潔,不在創建線程池時編寫大方法塊的代碼。

在內部類里面主要邏輯:

  1. 從存放請求接口參數的requestQueue 隊列中,獲取所有成員,並放入當此觸發任務邏輯的局部變量中
  2. 並且取出關鍵的請求參數id放入局部變量List中。
  3. 只要獲取出變量,則進行批量查詢
  4. 最后利用CompletedFuture異步通知並喚醒getUserById方法等待的線程。
public class UserBatchThread implements Runnable {

        @Override
        public void run() {
            List<UserQueryDto> requestQueueTmp = new ArrayList<>();
            // 存放批量查詢的入參
            List<Long> requestId = new ArrayList<>();

            // 把出請求層放入的消息queue的元素取出來
            int size = requestQueue.size();
            for (int i = 0; i < size; i++) {
                UserQueryDto request = requestQueue.poll();
                if (Objects.nonNull(request)) {
                    requestQueueTmp.add(request);
                    requestId.add(request.getId());
                }
            }

            if (!requestId.isEmpty()) {
                try {
                    List<User> response = getUserBatchById(requestId);
                    Map<Long, User> collect = response.stream().collect(
                            Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2));
                    // 通知請求的線程
                    for (UserQueryDto request : requestQueueTmp) {
                        request.getCompletedFuture().complete(collect.get(request.getId()));
                    }

                } catch (Exception e) {
                    // 通知請求的線程-異常
                    requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e));
                }
            }
        }

    }

    public List<User> getUserBatchById(List<Long> ids) {
        System.out.println("進入批量處理方法" + ids);
        List<User> ps = new ArrayList<>();
        for (Long id : ids) {
            User p = new User();
            p.setId(id);
            p.setUsername("dizang" + id);
            ps.add(p);
        }
        return ps;
    }

請求接口中入隊列的元素,就會從這里取出,HystrixCollasper也是利用這種poll方法原子性的獲取隊列里面元素,不會被定時任務的多次觸發而重復的獲取,只要滿足有至少一個都會做批量查詢,所以HystrixCollasper合並請求時,即使n毫秒內只有一個請求,也會去處理。

測試驗證

  1. 同上一章一樣觸發Swagger-ui頁面
  2. 請求兩次不同的參數
  3. 結果如下圖中,console日志已經輸出了兩次請求的入參

output.png

總結

到這里相信大家都已經完成了合並請求了。這次沒有依賴框架,基於原生做法,利用隊列存查詢所需的入參,然后利用線程池定時地獲取隊列的入參,再批量處理,利用線程的Future做異步返回結果。這樣我們就理解了SpringCloud的HystrixCollasper的內部流程了。希望能夠幫助沒有框架的項目,或者公司技術棧不合適的情況下的同學。

本文Demo

都在我springcloud的demo里面了,看provider-hystrix-request-merge這個工程下的內容,在UserBatchWithFutureServiceImpl類中。

https://gitee.com/kelvin-cai/spring-cloud-demo


歡迎關注公眾號,文章更快一步

我的公眾號 :地藏思維

掘金:地藏Kelvin

簡書:地藏Kelvin

我的Gitee: 地藏Kelvin https://gitee.com/kelvin-cai


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM