筆者最近對項目進行優化,順帶就改了些東西,先把請求方式優化了,使用到了web worker。發現目前還沒有太多對web worker實際使用進行介紹使用的文章,大多是一些API類的講解,除了涉及到一些WebGL的文章,所以總結了這個文章,給大家參考參考。以下內容以默認大家對web worker已經有了初步了解,不會講解基礎知識。
一、為什么要開子線程
筆者這個項目是一個存儲系統的中后台管理GUI,一些數據需要通過CronJob定時地去獲取,並且業務對數據的即時性要求高,大量的和持久的HTTP請求是不可避免的,並且該項目部署了HTTP/2,帶寬和並發數可以極大,並且頁面上有很多的可視化儀表大盤,HTTP請求的返回數據中存在有大量的儀表盤統計信息,這些信息從服務端返回來后,並不能直接使用,需要修正為我們頁面需要的格式,並做時間格式化等其他工作,這個工作是有一定耗時的。而且我們的項目不需要兼容IE系列,哈哈哈,針對這些點於是決定優(瞎)化(弄)。
筆者一開始想到的就是使用HTML5的新特性web worker,然后將HTTP的請求工作從主線程放到子線程里面去做,工作完成后,返回子線程數據即可。這樣可以降低主線程中的負荷,使主線程可以坐享其成。一旦子線程中發起的請求成功或錯誤后,子線程返回給主線程請求的response對象或者直接返回請求得到的數據或錯誤信息。最終的方案里,選擇的是直接返回請求得到的數據,而不是response對象,這個在后面會詳細說明為什么這樣做。子線程對於處於復雜運算,特別是搭配wasm,對於處理WebGL幀等有極大的性能優勢。以往的純JS視頻解碼,筆者只看到過能夠解碼MPEG1(大概240P畫面)的canvas庫,因為要達到60幀的畫面流暢度,就必須保證1幀的計算時間要小於16ms,如果要解碼1080P的畫面甚至4K,JS可能跑不過來了,而且長時間的計算會嚴重阻塞主線程,影響頁面性能,如果能開啟子線程把計算任務交給子線程做,並通過wasm加快計算速度,這將在前端領域創造極大的可能性。
二、為什么要設計線程池
如果只開一個線程,工作都在這一個子線程里做,不能保證它不阻塞。如果無止盡的開啟而不進行控制,可能導致運行管理平台應用時,瀏覽器的內存消耗極高:一個web worker子線程的開銷大概在5MB左右。
無論這5MB內存是否已被這個子線程完全使用,還是說僅僅是給這個子線程預規划的內存空間,但這個空間確實是被占用了。並且頻繁地創建和終止線程,對性能的消耗也是極大的。所以我們需要通過線程池來根據瀏覽器所在計算機的硬件資源對子線程的工作進行規划和調度,以及對僵屍線程的清理、新線程的開辟等等。根據測試,在頁面關閉以后,主線程結束,子線程的內存占用會被一並釋放,這點不需要做額外的處理。
三、設計線程池
對於線程池,我們需要實現的功能有如下這些:
1. 初始化線程
通過 Navagitor 對象的 HardWareConcurrecy 屬性可以獲取瀏覽器所屬計算機的CPU核心數量,如果CPU有超線程技術,這個值就是實際核心數量的兩倍。當然這個屬性存在兼容性問題,如果取不到,則默認為4個。我們默認有多少個CPU線程數就開多少個子線程。線程池最大線程數量就這么確定了,簡單而粗暴:
class FetchThreadPool { constructor (option = {}){ const { inspectIntervalTime = 10 * 1000, maximumWorkTime = 30 * 1000 } = option; this.maximumThreadsNumber = window.navigator.hardwareConcurrency || 4; this.threads = []; this.inspectIntervalTime = inspectIntervalTime; this.maximumWorkTime = maximumWorkTime; this.init(); }
......
}
獲取到最大線程數量后,我們就可以根據這個數量來初始化所有的子線程了,並給它們額外加上一個我們需要的屬性:
init (){ for (let i = 0; i < this.maximumThreadsNumber; i ++){ this.createThread(i); } setInterval(() => this.inspectThreads(), this.inspectIntervalTime); } createThread (i){ // Initialize a webWorker and get its reference. const thread = work(require.resolve('./fetch.worker.js')); // Bind message event. thread.addEventListener('message', event => { this.messageHandler(event, thread); }); // Stick the id tag into thread. thread['id'] = i; // To flag the thread working status, busy or idle. thread['busy'] = false; // Record all fetch tasks of this thread, currently it is aimed to record reqPromise. thread['taskMap'] = {}; // The id tag mentioned above is the same with the index of this thread in threads array. this.threads[i] = thread; }
其中:
id為數字類型,表示這個線程的唯一標識,
busy為布爾類型,表示這個線程當前是否處於工作繁忙狀態,
taskMap為對象類型,存有這個線程當前的所有工作任務的key/value對,key為任務的ID taskId,value為這個任務的promise的resolve和reject回調對象。
由上圖還可以看出,在初始化每個子線程時我們還給這個子線程在主線程里綁定了接收它消息的事件回調。在這個回調里面我們可以針對子線程返回的消息,在主線程里做對應的處理:
messageHandler (event, thread){ let {channel, threadCode, threadData, threadMsg} = event.data; // Thread message ok. if (threadCode === 0){ switch (channel){ case 'fetch': let {taskId, code, data, msg} = threadData; let reqPromise = thread.taskMap[taskId]; if (reqPromise){ // Handle the upper fetch promise call; if (code === 0){ reqPromise.resolve(data); } else { reqPromise.reject({code, msg}); } // Remove this fetch task from taskMap of this thread. thread.taskMap[taskId] = null; } // Set the thread status to idle. thread.busy = false; this.redirectRouter(); break; case 'inspection': // console.info(`Inspection info from thread, details: ${JSON.stringify(threadData)}`); // Give some tips about abnormal worker thread. let {isWorking, workTimeElapse} = threadData; if (isWorking && (workTimeElapse > this.maximumWorkTime)){ console.warn(`Fetch worker thread ID: ${thread.id} is hanging up, details: ${JSON.stringify(threadData)}, it will be terminated.`); fetchThreadPool.terminateZombieThread(thread); } break; default: break; } } else { // Thread message come with error. if (threadData){ let {taskId} = threadData; // Set the thread status to idle. thread.busy = false; let reqPromise = thread.taskMap[taskId]; if (reqPromise){ reqPromise.reject({code: threadCode, msg: threadMsg}); } } } }
這里處理的邏輯其實挺簡單的:
1). 首先規定了子線程和主線程之間通信的數據格式:
{ threadCode: 0, threadData: {taskId, data, code, msg}, threadMsg: 'xxxxx', channel: 'fetch', }
其中:
threadCode: 表示這個消息是否正確,也就是子線程在post這次message的時候,是否是因為報錯而發過來,因為我們在子線程中會有這個設計機制,用來區分任務完成后的正常的消息和執行過程中因報錯而發送的消息。如果為正常消息,我們約定為0,錯誤消息為1,暫定只有1。
threadData: 表示消息真正的數據載體對象,如果threadCode為1,只返回taskId,以幫助主線程銷毀找到調用上層promise的reject回調函數。Fecth取到的數據放在data內部。
threadMsg: 表示消息錯誤的報錯信息。非必須的。
channel: 表示數據頻道,因為我們可能通過子線程做其他工作,在我們這個設計里至少有2個工作,一個是發起fetch請求,另外一個是響應主線程的檢查(inspection)請求。所以需要一個額外的頻道字段來確認不同工作。
這個數據格式在第4步的子線程的設計中,也會有對應的體現。
2). 如果是子線程回復的檢查消息,那么根據子線程返回的狀態決定這個子線程是否已經掛起了,如果是就把它當做一個僵屍線程殺掉。並重新創建一個子線程,替換它原來的位置。
3). 在任務結束后,這個子線程的busy被設置成了false,表示它重新處於閑置狀態。
4). 在給子線程派發任務的時候,我們post了taskId,在子線程的回復信息中,我們可以拿到這個taskId,並通過它找到對應的promise的resolve或者reject回調函數,就可以響應上層業務中Fetch調用,返回從服務端獲取的數據了。
2、執行主線程中Fetch調用的工作
首先,我們在主線程中封裝了統一調用Fetch的收口,頁面所有請求均走這個唯一入口,對外暴露Get和Post方法,里面的業務有關的部分代碼可以忽略:
const initRequest = (url, options) => { if (checkRequestUnInterception(url)){ return new Promise(async (resolve, reject) => { options.credentials = 'same-origin'; options.withCredentials = true; options.headers = {'Content-Type': 'application/json; charset=utf-8'}; fetchThreadPool.dispatchThread({url, options}, {resolve, reject}); }); } }; const initSearchUrl = (url, param) => (param ? url + '?' + stringify(param) : url); export const fetchGet = (url, param) => (initRequest(initSearchUrl(url, param), {method: 'GET'})); export const fetchPost = (url, param) => (initRequest(url, {method: 'POST', body: JSON.stringify(param)}));
在線程池中,我們實現了對應的方法來執行Fetch請求:
dispatchThread ({url, options}, reqPromise){ // Firstly get the idle thread in pools. let thread = this.threads.filter(thread => !thread.busy)[0]; // If there is no idle thread, fetch in main thread. if (!thread){ thread = fetchInMainThread({url, options}); } // Stick the reqPromise into taskMap of thread. let taskId = Date.now(); thread.taskMap[taskId] = reqPromise; // Dispatch fetch work to thread. thread.postMessage({ channel: 'fetch', data: {url, options, taskId} }); thread.busy = true; }
這里調度的邏輯是:
1). 首先遍歷當前所有的子線程,過濾出閑置中的子線程,取第一個來下發任務。
2). 如果沒有閑置的子線程,就直接在主線程發起請求。后面可以優化的地方:可以在當前子線程中隨機找一個,來下發任務。這也是為什么每個子線程不直接使用task屬性,而給它一個taskMap,就是因為一個子線程可能同時擁有兩個及以上的任務。
3、定時輪訓檢查線程與終結僵屍線程
inspectThreads (){ if (this.threads.length > 0){ this.threads.forEach(thread => { // console.info(`Inspection thread ${thread.id} starts.`); thread.postMessage({ channel: 'inspection', data: {id: thread.id} }); }); } } terminateZombieThread (thread){ let id = thread.id; this.threads.splice(id, 1, null); thread.terminate(); thread = null; this.createThread(id); }
從第1步的代碼中我們可以得知初始化定時檢查 inspectThreads 是在整個線程池init的時候執行的。對於檢查僵屍線程和執行 terminateZombieThread 也是在第1步中的處理子線程信息的回調函數中進行的。
4. 子線程的設計
子線程的設計,相對於線程池來說就比較簡單了:
export default self => { let isWorking = false; let startWorkingTime = 0; let tasks = []; self.addEventListener('message', async event => { const {channel, data} = event.data; switch (channel){ case 'fetch': isWorking = true; startWorkingTime = Date.now(); let {url, options, taskId} = data; tasks.push({url, options, taskId}); try { // Consider to web worker thread post data to main thread uses data cloning // not change the reference. So, here we don't post the response object directly, // because it is un-cloneable. If we persist to post id, we should use Transferable // Objects, such as ArrayBuffer, ImageBitMap, etc. And this way is just like to // change the reference(the control power) of the object in memory. let response = await fetch(self.origin + url, options); if (response.ok){ let {code, data, msg} = await response.json(); self.postMessage({ threadCode: 0, channel: 'fetch', threadData: {taskId, code, data, msg}, }); } else { const {status, statusText} = response; self.postMessage({ threadCode: 0, channel: 'fetch', threadData: {taskId, code: status, msg: statusText || `http error, code: ${status}`}, }); console.info(`%c HTTP error, code: ${status}`, 'color: #CC0033'); } } catch (e){ self.postMessage({ threadCode: 1, threadData: {taskId}, threadMsg: `Fetch Web Worker Error: ${e}` }); } isWorking = false; startWorkingTime = 0; tasks = tasks.filter(task => task.taskId !== taskId); break; case 'inspection': // console.info(`Receive inspection thread ${data.id}.`); self.postMessage({ threadCode: 0, channel: 'inspection', threadData: { isWorking, startWorkingTime, workTimeElapse: isWorking ? (Date.now() - startWorkingTime) : 0, tasks }, }); break; default: self.postMessage({ threadCode: 1, threadMsg: `Fetch Web Worker Error: unknown message channel: ${channel}}.` }); break; } }); };
首先,在每個子線程聲明了 taksk 用來保存收到的任務,是為后期一個子線程同時做多個任務做准備的,當前並不需要,子線程一旦收到請求任務,在請求完后之前, isWorking 狀態一直都為 true 。所有子線程有任務以后,會直接在主線程發起請求,不會隨機派發給某個子線程。
然后,我們在正常的Fecth成功后的數據通信中,post的是對response處理以后的結構化數據,而不是直接post這個response對象,這個在第一章節中有提到,這里詳細說一下:
Fetch請求的response對象並非單純的Object對象。在子線程和主線程之間使用postMessage等方法進行數據傳遞,數據傳遞的方式是克隆一個新的對象來傳遞,而非直接傳遞引用,但response對象作為一個非普通的特殊對象是不可以被克隆的......。要傳遞response對象只有就需要用到HTML5里的一些新特性比如 Transferable object 的 ArrayBuffer 、 ImageBitmap 等等,通過它們可以直接傳遞對象的引用,這樣做的話就不需要克隆對象了,進而避免因對response對象進行克隆而報錯,以及克隆含有大量數據的對象帶來的高額開銷。這里我們選擇傳遞一個普通的結構化Object對象來現實基本的功能。
對於子線程中每次給主線程post的message,也是嚴格按照第1步中說明的那樣定義的。
還有一點需要說明:筆者的項目都是基於webpack的模塊化開發,要直接使用一個web worker的js文件,筆者選了"webworkify-webpack"這個庫來處理模塊化的,這個庫還執行在子線程中隨意import其他模塊,使用比較方便:
import work from 'webworkify-webpack';
所以,在第1步中才出現了這樣的創建子線程的方式: const thread = work(require.resolve('./fetch.worker.js'));
該庫把web worker的js文件通過 createObjectURL 方法把js文件內容轉成了二進制格式,這里請求的是一個二進制數據的鏈接(引用),將會到內存中去找到這個數據,所以這里並不是一個js文件的鏈接:
如果你的項目形態和筆者不同,大可不必如此,按照常規的web worker教程中的指導方式走就行。
筆者這個項目在主線程和子線程之間只傳遞了很少量的數據,速度非常快,一旦你的項目需要去傳遞大量數據,比如說一個異常復雜的大對象,如果直接傳遞結構化對象,速度會很慢,可以先字符串化了以后再發送,避免了在post的過程中時間消耗過大。
筆者捕捉到的一個postMessage的消耗,如果數據量小的話,還算正常:
5. 通過子線程發起請求
// ... @catchError async getNodeList (){ let data = await fetchGet('/api/getnodelist'); !!data && store.dispatch(nodeAction.setNodeList(data)); }, // ...
fetchGet最終會在子線程中執行。
數據回來了:
從截圖中可以看出,和直接在主線程中發起的Fetch請求不同的是,在子線程中發起的請求,在Name列里會增加一個齒輪在開頭以區分。
需要注意的一點是:如果子線程被終結,無法查看返回信息等,因為這些數據的占用內存已經隨子線程的終結而被回收了。
我們在子線程中寫一個明顯的錯誤,也會回調reject,並在控制台報錯:
從開發者工具里可以檢測到這8個子線程:
大概的設計就是如此,目前這個線程池只針對Fetch的任務,后續還需要在業務中進行優化和增強,已適配更多的任務。針對其他的任務,在這里架子其實已基本實現,需要增加對不同channel的處理。
四、Web Worker的兼容性
從caniuse給出的數據來看,兼容性異常的好,甚至連IE系列都在好幾年前就已經支持:
但是...,這個兼容性只能說明能否使用Web Woker,這里的兼容並不能表明能在其中做其他操作。比如標准規定,可以在子線程做做計算、發起XHR請求等,但不能操作DOM對象。筆者在項目中使用的Fetch,而非Ajax,然后Fecth在IE系列(包括Edge)瀏覽器中並不支持,會直接報錯。在近新版本的Chrome、FireFox、Opera中均無任何問題。后來作者換成了Axios這種基於原生的XHR封裝的庫,在IE系列中還是要報錯。后來又換成了純原生的XmlHttpRequest,依舊報錯。這就和標准有出入了......。同學們可以試試,不知到筆者的方法是否百分百正確。但欣慰的是前幾天的新聞說微軟未來在Edge瀏覽器開發中將使用Chromium內核。
至於Web Woker衍生出來的其他新特性,比如 Shared Web Woker等,或者在子線程中再開子線程,這些特性的使用在各個瀏覽器中並不統一,有些支持,有些不支持,或者部分支持,所以對於這些特性暫時就不要去考慮它們了。
五、展望
在前端開發這塊(沒用Web前端了,是筆者認為現在的前端開發已經不僅限於Web平台了,也不僅限於前端了),迄今為止活躍度是非常之高了。新技術、新標准、新協議、新框(輪)架(子)的出現是非常快速的。技術跌該更新頻率極高,比如這個Web Worker,四年前就定稿了,筆者現在針對它寫博客......。一個新技術的出現可能不能造成什么影響,但是多種新技術的出現和搭配使用將帶來翻天覆地的變化。前端的發展越來越多地融入了曾經只在Client、Native端出現的技術。特別是近年來的WebGL、WebAssembly等新技術的推出,都是具有意義的。