使用Promise並發限制


背景

我們在需要保證代碼在多個異步處理之后執行,我們通常會使用

Promise.all(promises: []).then(fun: function);

Promise.all可以保證,promises數組中所有promise對象都達到resolve狀態,才執行then回調

那么會出現的情況是,你在瞬間發出幾十萬http請求(tcp連接數不足可能造成等待),或者堆積了無數調用棧導致內存溢出.

這個時候需要我們對HTTP的連接數做限制。

內容

//promise並發限制
class PromisePool {
    constructor(max, fn) {
        this.max = max; //最大並發量
        this.fn = fn; //自定義的請求函數
        this.pool = []; //並發池
        this.urls = []; //剩余的請求地址
    }
    start(urls) {
        this.urls = urls; //先循環把並發池塞滿
        while (this.pool.length < this.max) {
            let url = this.urls.shift();
            this.setTask(url);
        }
        //利用Promise.race方法來獲得並發池中某任務完成的信號
        let race = Promise.race(this.pool);
        return this.run(race);
    }
    run(race) {
        race
            .then(res => {
                //每當並發池跑完一個任務,就再塞入一個任務
                let url = this.urls.shift();
                this.setTask(url);
                return this.run(Promise.race(this.pool));
            })
    }
    setTask(url) {
        if (!url) return
        let task = this.fn(url);
        this.pool.push(task); //將該任務推入pool並發池中
        console.log(`\x1B[43m ${url} 開始,當前並發數:${this.pool.length}`)
        task.then(res => {
            //請求結束后將該Promise任務從並發池中移除
            this.pool.splice(this.pool.indexOf(task), 1);
            console.log(`\x1B[43m ${url} 結束,當前並發數:${this.pool.length}`);
        })
    }
}
//test
const URLS = [
    'bytedance.com',
    'tencent.com',
    'alibaba.com',
    'microsoft.com',
    'apple.com',
    'hulu.com',
    'amazon.com'
]
//自定義請求函數
var requestFn = url => {
    return new Promise(resolve => {
        setTimeout(() => {
            resolve(`任務${url}完成`)
        }, 1000)
    }).then(res => {
        console.log('外部邏輯', res);
    })
}
const pool = new PromisePool(5, requestFn); //並發數為5
pool.start(URLS)

從上面可以看出,思路如下:定義一個 PromisePool 對象,初始化一個 pool 作為並發池,然后先循環把並發池塞滿,不斷地調用 setTask 然后通過自己自定義的任務函數(任務函數可以是網絡請求封裝的 promise 對象,或者是其他的),而且每個任務是一個Promise對象包裝的,執行完就 pop 出連接池, 任務push 進並發池 pool 中。

 //利用Promise.race方法來獲得並發池中某任務完成的信號
let race = Promise.race(this.pool);
return this.run(race);
 run(race) {
    race
        .then(res => {
            //每當並發池跑完一個任務,就再塞入一個任務
            let url = this.urls.shift();
            this.setTask(url);
            return this.run(Promise.race(this.pool));
        })
}

這個地方就是不斷通過遞歸的方式,每當並發池跑完一個任務,就再塞入一個任務

其他

npm中有很多實現這個功能的第三方包,比如async-pool、es6-promise-pool、p-limit,也可以直接拿來用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM