worker_threads 的出現讓 Node.js 擁有多工作線程,但這個概念不同於Java等其它后端語言中的多線程。
Node.js 通過提供 cluster、child_process API 創建子進程的方式來賦予Node.js “多線程”能力。但是這種創建進程的方式會犧牲共享內存,並且數據通信必須通過json進行傳輸。(有一定的局限性和性能問題)
基於此 Node.js V10.5.0 提供了 worker_threads,它比 child_process 或 cluster更輕量級。 與child_process 或 cluster 不同,worker_threads 可以共享內存,通過傳輸 ArrayBuffer 實例或共享 SharedArrayBuffer 實例來實現。
這里有一個誤區:很多人可能認為在node.js核心模塊中添加一個新的模塊,來創建線程以及實現線程間同步問題,從而解決CPU密集型操作的問題?
但事實並非如此,Node.js 並沒有其它支持多線的程語言(如:java),諸如"synchronized"之類的關鍵字來實現線程同步的概念。Node.js的 worker_threads 區別於它們的多線程。如果添加線程,語言本身的性質將發生變化,所以不能將線程作為一組新的可用類或函數添加。
我們可以將其理解為:JavaScript和Node.js永遠不會有線程,只有基於Node.js 架構的多工作線程。
這張圖很好的詮釋了多工作線程機制。(1.理解node.js的event loop機制 2.和其它多線程語言對比性理解)
前置知識
Node.js 保持了JavaScript在瀏覽器中單線程的特點。它的優勢是沒有線程間數據同步的性能消耗也不會出現死鎖的情況。所以它是線程安全並且性能高效的。
單線程有它的弱點,無法充分利用多核CPU 資源,CPU 密集型計算可能會導致 I/O 阻塞,以及出現錯誤可能會導致應用崩潰。
為了解決單線程弱點:
瀏覽器端: HTML5 制定了 Web Worker 標准(Web Worker 的作用,就是為 JavaScript 創造多線程環境,允許主線程創建 Worker 線程,將一些任務分配給后者運行)。
Node端:采用了和 Web Worker相同的思路來解決單線程中大量計算問題 ,官方提供了 child_process 模塊和 cluster 模塊, cluster 底層是基於child_process實現。
child_process、cluster都是用於創建子進程,然后子進程間通過事件消息來傳遞結果,這個可以很好地保持應用模型的簡單和低依賴。從而解決無法利用多核 CPU 和程序健壯性問題。
Node V10.5.0: 提供了實驗性質的 worker_threads模塊,才讓Node擁有了多工作線程。
Node V12.0.0:worker_threads 已經成為正式標准,可以在生產環境放心使用。
也有很多開發者認為 worker_threads 違背了nodejs設計的初衷,事實上那是它並沒有真正理解 worker_threads 的底層原理。其次是每一種語言的出現都有它的歷史背景和需要解決的問題,在技術發展的過程中各種語言都是在取長補短,worker_threads 的設計就是技術發展的需要。
一、Nodejs事件循環模型
Node.js
Node.js是構建在 Chrome’s V8 引擎之上的JavaScript 運行時環境。事件驅動(event-driven)和非阻塞 I/O 模型(non-blocking I/O model)的語言特性使 Node.js 天生高效(efficient)且輕量(lightweight)。它使用 npm 作為包管理器。
Event Loop
事件循環(Event Loop)分發 I/O 任務,最終工作線程(Work Thread)將任務丟到線程池(Thread Pool)里去執行,而事件循環只要等待執行結果就可以了。
將上一張圖再細化
- Node Standard Library:Node.js 標准庫
- Node Bindings:將 V8 等暴露的 C/C++ 接口轉成JavaScript Api
- Chrome v8:JavaScript 引擎,采用 C/C++ 編寫
- libuv:由事件循環(Event Loop)和線程池(Async I/O)組成,負責所有 I/O 任務的分發與執行
- Client 請求到達 node api,該請求被添加到Event Queue(事件隊列)。這是因為Node.js 無法同時處理多個請求。
- Event Loop(事件循環) 始終檢查 Event Queue 中是否有待處理事件,如果有就從 Event Queue 中從前到后依次取出,然后提供服務。
- Event Loop 是單線程非阻塞I/O,它會把請求發送給 C++ Thread Pool(線程池)去處理,底層是基於C++ Libuv 異步I/O模型結構可以支持高並發。
- 現在 C++ Thread Pool有大量的請求,如數據庫請求,文件請求等。
- 任何線程完成任務時,Callback(回調函數)就會被觸發,並將響應發送給 Event Loop。
- 最終 Event Loop 會將請求返回給 Client。
二、一個實驗
我們以計算10,000,000以內的素數為實驗。
這個實驗會涉及到父子進程通信。比官方 http server 並發請求更有說服力。
block_primes.js
const min = 2
const max = 1e7
function generatePrimes(start, range) {
let primes = []
let isPrime = true
let end = start + range
for (let i = start; i < end; i++) {
for (let j = min; j < Math.sqrt(end); j++) {
if (i !== j && i%j === 0) {
isPrime = false
break
}
}
if (isPrime) {
primes.push(i)
}
isPrime = true
}
return primes
}
const primes = generatePrimes(min, max)
console.log(primes.length)
單線程計算
// 運行
$ time node block_primes.js
// 輸出
// 664579
8.11s user
0.03s system
99% cpu
8.147 total
結論:單核利用率 99%,總耗時 8.147s
三、cluster [了解]
如今的機器基本都是多核 cpu。為了能充分利用 cpu 計算能力,node.js V0.8(2012-06-22) 新增了一個內置模塊 cluster。它可以通過一個父進程管理一堆子進程的方式來實現集群的功能。
cluster 底層就是 child_process,master 進程做總控,啟動 1 個 agent 和 n 個 worker,agent 來做任務調度,獲取任務,並分配給某個空閑的 worker 來做。
需要注意的是:每個 worker 進程通過使用 child_process.fork() 函數,基於 IPC(Inter-Process Communication,進程間通信),實現與 master 進程間通信。
fork 出的子進程擁有和父進程一致的數據空間、堆、棧等資源(fork 當時),但是是獨立的,也就是說二者不能共享這些存儲空間。 那我們直接用 fork 自己實現不就行了。
這樣的方式僅僅實現了多進程。多進程運行還涉及父子進程通信,子進程管理,以及負載均衡等問題,這些特性 cluster 幫你實現了。
cluster_primes.js
該方法明顯是一個 cpu 密集型計算。 我本地電腦配置為 MacBook Pro (15-inch, 2018) ,運行該測試代碼,生成的報告顯示,需要9.731s 時間:
// 計算 start, 至 start + range 之間的素數
function generatePrimes(start, range) {
let primes = []
let isPrime = true
let end = start + range
for (let i = start; i < end; i++) {
for (let j = min; j < Math.sqrt(end); j++) {
if (i !== j && i%j === 0) {
isPrime = false
break
}
}
if (isPrime) {
primes.push(i)
}
isPrime = true
}
return primes
}
/**
* - 加載clustr模塊
* - 設定啟動進程數為cpu個數
*/
var cluster = require('cluster')
var numCPUs = require('os').cpus().length
// 素數的計算
const min = 2
const max = 1e7 // = 10000000
let primes = []
if (cluster.isMaster) {
const range = Math.ceil((max - min) / numCPUs)
let start = min
for (var i = 0; i < numCPUs; i++) {
const worker = cluster.fork() // 啟動子進程
// 在主進程中,這會發送消息給特定的工作進程
worker.send({ start: start, range: range })
start += range
worker.on('message', (msg) => {
primes = primes.concat(msg.data)
worker.kill()
})
}
// 當任何一個工作進程關閉的時候,cluster 模塊都將會觸發 'exit' 事件
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died')
})
} else {
// 監聽子進程發送的信息
process.on('message', (msg) => {
console.log(msg)
const { start, range} = msg
const data = generatePrimes(start, range)
// 在工作進程中,這會發送消息給主進程
process.send({ data: data })
})
}
// 執行
$ time node cluster_primes.js
// 輸出 可以看出一共啟動了 12個子進程
{ start: 2500004, range: 833334 }
{ start: 833336, range: 833334 }
{ start: 3333338, range: 833334 }
{ start: 2, range: 833334 }
{ start: 1666670, range: 833334 }
{ start: 4166672, range: 833334 }
{ start: 5000006, range: 833334 }
{ start: 5833340, range: 833334 }
{ start: 6666674, range: 833334 }
{ start: 7500008, range: 833334 }
{ start: 8333342, range: 833334 }
{ start: 9166676, range: 833334 }
worker 31008 died
worker 31009 died
worker 31010 died
worker 31011 died
worker 31012 died
worker 31013 died
worker 31014 died
worker 31015 died
worker 31018 died
worker 31019 died
worker 31016 died
worker 31017 died
// 性能
6.68s user
0.24s system
519% cpu
1.332 total
結論:啟動了12個子進程,cpu利用率為519%,總耗時1.332s
四、child_process [了解]
在Node.js中,提供了一個 child_process 模塊,通過它可以開啟多個子進程,在多個子進程之間可以共享內存空間,可以通過子進程的互相通信來實現信息的交換。
child_process_main.js
const { fork } = require('child_process')
const worker = fork(__dirname + '/child_process_worker.js')
var numCPUs = require('os').cpus().length
// 接收工作進程計算結果
let max = 1e7
let min = 2
let start = 2
let primes = []
const range = Math.ceil((max - min) / numCPUs)
for (var i = 0; i < numCPUs; i++) {
worker.send({ start: start, range: range })
start += range
worker.on('message', (msg) => {
primes = primes.concat(msg.data)
worker.kill()
})
}
child_process_worker.js
// 素數的計算
function generatePrimes(start, range) {
let primes = []
let isPrime = true
let end = start + range
for (let i = start; i < end; i++) {
for (let j = 2; j < Math.sqrt(end); j++) {
if (i !== j && i%j === 0) {
isPrime = false
break
}
}
if (isPrime) {
primes.push(i)
}
isPrime = true
}
return primes
}
// 監聽子進程發送的信息
process.on('message', (msg) => {
const { start, range} = msg
console.log(msg)
const data = generatePrimes(start, range)
// 在工作進程中,這會發送消息給主進程
process.send({ data: data })
})
// 收到kill信息,進程退出
process.on('SIGHUP', function() {
process.exit()
})
// 執行
$ time node child_process_main.js
// 輸出
{ start: 2, range: 833334 }
{ start: 833336, range: 833334 }
{ start: 1666670, range: 833334 }
{ start: 2500004, range: 833334 }
{ start: 3333338, range: 833334 }
{ start: 4166672, range: 833334 }
{ start: 5000006, range: 833334 }
{ start: 5833340, range: 833334 }
{ start: 6666674, range: 833334 }
{ start: 7500008, range: 833334 }
{ start: 8333342, range: 833334 }
{ start: 9166676, range: 833334 }
// 性能
5.94s user
0.06s system
100% cpu
5.998 total
結論:啟動了12個子進程,cpu利用率為100%,總耗時5.998s
五、worker_threads
1、加載 worker_threads 模塊
node.js v10.5.0 引入的實驗性質API,開啟時需要使用 --experimental-worker 參數。
node.js v12.0.0 里面默認開啟,也預示着您可以將該特性用於生產環境中。
/ v 10.15.3
$ node -e "require('worker_threads'); console.log('success');"
// 輸出
internal/modules/cjs/loader.js:584
throw err;
^
Error: Cannot find module 'worker_threads'
at Function.Module._resolveFilename (internal/modules/cjs/loader.js:582:15)
at Function.Module._load (internal/modules/cjs/loader.js:508:25)
at Module.require (internal/modules/cjs/loader.js:637:17)
at require (internal/modules/cjs/helpers.js:22:18)
at [eval]:1:1
at Script.runInThisContext (vm.js:119:20)
at Object.runInThisContext (vm.js:326:38)
at Object.<anonymous> ([eval]-wrapper:6:22)
at Module._compile (internal/modules/cjs/loader.js:701:30)
at evalScript (internal/bootstrap/node.js:589:27)
----------------------------------------------------------
// v 10.15.3
$ node --experimental-worker -e
"require('worker_threads'); console.log('success');"
// 輸出
success
// v 12.6.0
$ node -e "require('worker_threads'); console.log('success');"
// 輸出
success
2、官方介紹
Workers (threads) (工作線程)對於執行CPU密集型的JavaScript操作非常有用。它們對I/O密集型工作沒有多大幫助。js的內置異步I/O操作比 Workers 效率更高。
worker_threads 比使用 child_process 或 cluster可以獲得的並行性更輕量級。 此外,worker_threads 可以有效地共享內存。
3、Hello world
例子:threads_example1.js
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
// This code is executed in the main thread and not in the worker.
// Create the worker.
const worker = new Worker(__filename);
// Listen for messages from the worker and print them.
worker.on('message', (msg) => { console.log(msg); });
} else {
// This code is executed in the worker and not in the main thread.
// Send a message to the main thread.
parentPort.postMessage('Hello world!');
}
- Worker: 該類用於創建 worker對象。有一個必填參數__filename(文件路徑),該文件會被worker執行。同時我們可以在主線程中通過worker.on監聽message事件
- isMainThread: 該對象用於區分是主線程(true)還是工作線程(false)
- parentPort: 該對象的 postMessage 方法用於 worker 線程向主線程發送消息
測試
$ node threads_example1.js
Hello world!
4、對實驗進行改造
我們使用 worker_threads
對該程序進行改造。
worker_threads_example.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads')
function generatePrimes(start, range) {
let primes = []
let isPrime = true
let end = start + range
for (let i = start; i < end; i++) {
for (let j = 2; j < Math.sqrt(end); j++) {
if (i !== j && i%j === 0) {
isPrime = false
break
}
}
if (isPrime) {
primes.push(i)
}
isPrime = true
}
return primes
}
if (isMainThread) {
const max = 1e7
const min = 2
let primes = []
const threadCount = +process.argv[2] || 2
const threads = new Set()
console.log(`Running with ${threadCount} threads...`)
const range = Math.ceil((max - min) / threadCount)
let start = min
for (let i = 0; i < threadCount - 1; i++) {
const myStart = start
threads.add(new Worker(__filename, { workerData: { start: myStart, range }}))
start += range
}
threads.add(new Worker(__filename, { workerData: { start, range: range + ((max - min + 1) % threadCount)}}))
for (let worker of threads) {
worker.on('error', (err) => { throw err })
worker.on('exit', () => {
threads.delete(worker)
console.log(`Thread exiting, ${threads.size} running...`)
if (threads.size === 0) {
// console.log(primes.join('\n'))
}
})
worker.on('message', (msg) => {
primes = primes.concat(msg)
})
}
} else {
const data = generatePrimes(workerData.start, workerData.range)
parentPort.postMessage(data)
}
該代碼中在構造 worker的時候 傳入了一個名為workerData的對象,這是我們希望線程在開始運行時可以訪問的數據。
workerData 可以是任何一個JavaScript 值。
測試
// 開啟 1 個工作線程
$ time node worker_threads_example.js 1
Running with 1 threads...
Thread exiting, 0 running...
8.25s user| 0.04s system| 100% cpu| 8.286 total
// 開啟 2 個工作線程
$ time node worker_threads_example.js 2
Running with 2 threads...
Thread exiting, 1 running...
Thread exiting, 0 running...
7.22s user| 0.04s system| 175% cpu| 4.127 total
// 開啟 4 個工作線程
$ time node worker_threads_example.js 4
Running with 4 threads...
Thread exiting, 3 running...
Thread exiting, 2 running...
Thread exiting, 1 running...
Thread exiting, 0 running...
6.75s user| 0.05s system| 313% cpu| 2.171 total
// 開啟 8 個工作線程
$ time node worker_threads_example.js 8
Running with 8 threads...
Thread exiting, 7 running...
Thread exiting, 6 running...
Thread exiting, 5 running...
Thread exiting, 4 running...
Thread exiting, 3 running...
Thread exiting, 2 running...
Thread exiting, 1 running...
Thread exiting, 0 running...
6.53s user| 0.08s system| 473% cpu| 1.397 total
$ time node worker_threads_example.js 12
Running with 12 threads...
Thread exiting, 11 running...
Thread exiting, 10 running...
Thread exiting, 9 running...
Thread exiting, 8 running...
Thread exiting, 7 running...
Thread exiting, 6 running...
Thread exiting, 5 running...
Thread exiting, 4 running...
Thread exiting, 3 running...
Thread exiting, 2 running...
Thread exiting, 1 running...
Thread exiting, 0 running...
6.51s user| 0.10s system| 515% cpu| 1.282 total
$ time node worker_threads_example.js 60
7.67s user| 0.40s system| 471% cpu| 1.712 total
結論:
工作線程數從1 提升到 12 ,我們發現耗時從8.286s 提升到 1.282s,cpu 利用率從 100% 提升到了 515%。
當我門再次把工作線程數調大到60的時候,user 耗時達到7.67s,cpu利用率降低到471%,總耗時上升到 1.712s,所以並不是工作線程數越多越好。
worker_threads 極大的提升了cpu利用率,提高了程序的運行性能。但使用過程中需要合理控制。