淺析如何實現一個並發請求控制函數並限制並發數


  面試題:

1、批量請求:要實現批量請求,而且並不需要按順序發起請求(如果需要按順序可以存入隊列中,按優先級則可以存入優先隊列中),所以這里我們存入數組中即可,然后進行遍歷,取出數字中的每一項丟去fetch中進行調用。

2、可控制並發度:控制並發數,一個簡單的辦法就是對數組進行切片,分成一段一段,完成一段后再調用另一段。這里我們可以使用遞歸或者循環來實現,我覺得遞歸比較直觀,所以這里使用遞歸來實現。

本題的難點就在這里,之前做錯了就是這一步。在控制並發數的同時,每結束一個請求並發起一個新的請求。依舊使用遞歸的方式,但這次添加一個請求隊列,然后我們只要維護這個隊列,每次發起一個請求就添加進去,結束一個就丟出來,繼而實現了控制並發。

3、全部請求結束,執行 callback:因為是異步請求,我們無法寄希望於安裝正常的順序在函數調用后執行,但是每次fetch有返回結果會調用then或者catch,我們可以在這個時候判斷請求數組是否為空就可以知道是否全部被調用完

  做過爬蟲的都知道,要控制爬蟲的請求並發量,其實也就是控制其爬取頻率,以免被封IP,還有的就是以此來控制爬蟲應用運行內存,否則一下子處理N個請求,內存分分鍾會爆。而 python 爬蟲一般用多線程來控制並發,

  然而如果是 node.js 爬蟲,由於其 單線程無阻塞 性質以及事件循環機制,一般不用多線程來控制並發(當然node.js也可以實現多線程,此處非重點不再多講),而是更加簡便地直接在代碼層級上實現並發。

  為圖方便,開發者在開發node爬蟲一般會找一個並發控制的npm包,然而第三方的模塊有時候也並不能完全滿足我們的特殊需求,這時候我們可能就需要一個自己定制版的並發控制函數。

  下面我們用15行代碼實現一個並發控制的函數。

1、參數

  首先,一個基本的並發控制函數,基本要有以下3個參數:

  • list {Array} - 要迭代的數組
  • limit {number} - 控制的並發數量
  • asyncHandle {function} - 對list的每一個項的處理函數

2、設計

  以下以爬蟲為實例進行講解:設計思路其實很簡單,假如並發量控制是 5

(1)首先,瞬發 5 個異步請求,我們就得到了並發的 5 個異步請求

// limit = 5
while(limit--) { handleFunction(list) }

(2)然后,這 5 個異步請求中無論哪一個先執行完,都會繼續執行下一個list

let recursion = (arr) => { return asyncHandle(arr.shift()).then(()=>{ // 迭代數組長度不為0, 遞歸執行自身
            if (arr.length!==0) return recursion(arr) // 迭代數組長度為0,結束 
            else return 'finish'; }) }

(3)等list所有的項迭代完之后的回調

return Promise.all(allHandle)

3、具體代碼:上述步驟組合起來,就是

/** * @params list {Array} - 要迭代的數組 * @params limit {Number} - 並發數量控制數 * @params asyncHandle {Function} - 對`list`的每一個項的處理函數,參數為當前處理項,必須 return 一個Promise來確定是否繼續進行迭代 * @return {Promise} - 返回一個 Promise 值來確認所有數據是否迭代完成 */ let mapLimit = (list, limit, asyncHandle) => { let recursion = (arr) => { return asyncHandle(arr.shift()).then(()=>{ if (arr.length!==0) return recursion(arr)   // 數組還未迭代完,遞歸繼續進行迭代
                else return 'finish'; }) }; let listCopy = [].concat(list); let asyncList = []; // 正在進行的所有並發異步操作
    while(limit--) { asyncList.push( recursion(listCopy) ); } return Promise.all(asyncList);  // 所有並發異步操作都完成后,本次並發控制迭代完成
}

  這里解釋一下代碼:

(1)recursion函數用來處理傳參的回調函數asyncHandle,並將要迭代的數組作為參數傳入,方便asyncHandle函數回調繼續處理。

(2)recursion函數將要迭代的數組的第一個元素作為參數傳給asyncHandle進行業務處理,asyncHandle需返回promise,才能進入 then 回調判斷要迭代的數組是否迭代完,未迭代完的話就繼續調用自身。

(3)concat深拷貝數組避免數據污染

4、測試demo

  模擬一下異步的並發情況

var dataLists = [1,2,3,4,5,6,7,8,9,11,100,123]; var count = 0; mapLimit(dataLists, 3, (curItem)=>{ return new Promise(resolve => { count++ setTimeout(()=>{ console.log(curItem, '當前並發量:', count--) resolve(); }, Math.random() * 5000) }); }).then(response => { console.log('finish', response) })

  手動拋出異常中斷並發函數測試:

var dataLists = [1,2,3,4,5,6,7,8,9,11,100,123]; var count = 0; mapLimit(dataLists, 3, (curItem)=>{ return new Promise((resolve, reject) => { count++ setTimeout(()=>{ console.log(curItem, '當前並發量:', count--) if(curItem > 4) reject('error happen') resolve(); }, Math.random() * 5000) }); }).then(response => { console.log('finish', response) })

  並發控制情況下,迭代到5,6,7時,判斷6>4,手動拋出異常,停止了后續迭代。

原文鏈接:https://segmentfault.com/a/1190000013128649


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM