nodejs中的並發編程


從sleep的實現說起

在nodejs中,如果要實現sleep的功能主要是通過“setTimeout + promise”實現,也可以通過“循環空轉”來解決。前者是利用定時器實現任務的延遲執行,並通過promise鏈管理任務間的時序與依賴,本質上nodejs的執行線程並沒有真正的sleep,事件循環以及v8仍在運行,是僅僅表現在業務邏輯上sleep;而后者的實現則無疑實在浪費CPU性能,有點類似自旋鎖,不符合大多數場景。

若要實現引擎層面(運行時)的sleep,事情在ECMAScript Latest Draft (ECMA-262)出現之后開始有了轉機。ECMA262規定了 Atomics.wait,它會將調用該方法的代理(引擎)陷入等待隊列並讓其sleep,直到被notify或者超時。該規范在8.10.0以上版本的nodejs上被實現。

事實上,Atomics.wait 的出現主要解決瀏覽器或nodejs的worker之間數據同步的問題。瀏覽器上的web-worker、正式被nodejs@12納入的worker-threads模塊,這些都是ECMAScript多線程模型的具體實現。既然出現多線程那么線程間的同步也就不可避免的被提到,在前端以及nodejs范圍內可以使用Atomics.wait和notify來解決。

說的有些跑題,回到本節,如何實現運行時的sleep呢?很簡單,利用Atomics.wait的等待超時機制:

let sharedBuf = new SharedArrayBuffer(4);
let sharedArr = new Int32Array(sharedBuf);
// 睡眠n秒
let sleep = function(n){
	Atomics.wait(sharedArr, 0, 0, n * 1000);
}

此處的sleep並不是異步方法,它會阻塞執行線程直到超時,因此需要根據業務場景來使用該sleep模型。
關於Atomics.wait的具體使用方法,下文會着重講解。

多線程同步

雖然nodejs多線程使用場景不是很多,但是一旦涉及到多線程,那么線程間同步就必不可少,否則無法解決臨界區的問題。不過nodejs的work_threads對線程的創建不同於c或者java,它使用libuv的API創建線程 “uv_thread_create”,但是在此之前需要初始化一些設施如MessagePort、v8實例設置等,因此創建一個thread並不是一個輕量級的操作,需要結合場景酌情創建適量的threads。

回到正題,多線程間的同步一般需要依賴鎖,而鎖的實現需要依賴於全局變量。在nodejs的work_threads實現中,主線程無法設置全局變量,因此可以通過Atomics實現。正如上例中所示,Atomics.wait依賴 SharedArrayBuffer,這是共享內存的ArrayBuffer,threads之間可通過它共享數據,可真正操作ArrayBuffer時並不直接使用該對象,而是TypeArray。如Atomics.wait,第一個參數必須是Int32Array對象,而該對象指向的緩沖區為SharedArrayBuffer。當線程A因為Atomics.wait而阻塞后,可通過其它線程B調用Atomics.notify進行喚醒從而讓線程A的v8繼續執行。

let { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
var sab = new SharedArrayBuffer(1024);
var int32 = new Int32Array(sab);
if (isMainThread) {
	const  worker  =  new Worker(__filename, {
		workerData: sab
	});
	worker.on('message', (d) => {
		console.log('parent receive message:', d);
	});
	worker.on('error', (e) => {
		console.error('parent receive error', e);
	});
	worker.on('exit', (code) => {
		if (code !==  0)
			console.error(new  Error(`工作線程使用退出碼 ${code} 停止`));
	});

	Atomics.wait(int32, 0, 0); // A
	console.log(int32[0]); // C: 123
} else {
	let buf = workerData;
	let arrs = new Int32Array(buf);
	Atomics.store(arrs, 0, 123); 
	Atomics.notify(arrs, 0); // B
}

上例中,主線程創建thread后,在A處進行阻塞;在新線程中,通過原子操作Atomics.store修改SharedArrayBuffer的第一項為123后,於B處喚醒阻塞在SharedArrayBuffer第一項的其它線程;此時主線程被喚醒,執行console.log(int32[0]),輸出被新線程修改后的SharedArrayBuffer第一項數據123。

分析一個公平、排它、不可重入鎖的實現,它使用Atomics.wait/notify/compareExchange完成線程的同步。

main-thread.js

let  Lock  =  require('./lock').Lock;
let { Worker } =  require('worker_threads');
const  sharedBuffer  =  new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT);
const  sharedArray  =  new  Int32Array(sharedBuffer);
let worker = new Worker('./worker-lock.js', {
	workerData:  sharedBuffer
});
Lock.initialize(sharedArray, 0);
const  lock  =  new  Lock(sharedArray, 0);
// 獲取鎖
lock.lock(); 

// 3s后釋放鎖
setTimeout(() => {
	lock.unlock(); // (B)
}, 3000)
worker-thread.js

let  Lock  =  require('./lock').Lock;
let { parentPort, workerData } =  require('worker_threads');
const  sharedArray  =  new  Int32Array(workerData);
const  lock  =  new  Lock(sharedArray, 0);

console.log('Waiting for lock...'); // (A)
// 獲取鎖
lock.lock(); // (B) blocks!
console.log('Unlocked'); // (C)

主線程初始化互斥鎖,同時創建線程,主線程獲取鎖后三秒鍾釋放;
worker線程嘗試獲取鎖,此時鎖已被主線程獲取,因此worker線程在此阻塞,等待3s后主線程釋放鎖被喚醒,繼續執行輸出。

lock.js

const  UNLOCKED  =  0;
const  LOCKED_NO_WAITERS  =  1;
const  LOCKED_POSSIBLE_WAITERS  =  2;
const  NUMINTS  =  1;

class  Lock {
	// 'iab' must be a Int32Array mapping shared memory.
	// 'ibase' must be a valid index in iab, the first of NUMINTS reserved for the lock.
	constructor(iab, ibase) {
		if (!(iab  instanceof  Int32Array  &&  ibase|0  ===  ibase  &&  ibase  >=  0  &&  ibase+NUMINTS  <=  iab.length)) {
			throw  new  Error(`Bad arguments to Lock constructor: ${iab}  ${ibase}`);
		}
		this.iab  =  iab;
		this.ibase  =  ibase;
	}
	static  initialize(iab, ibase) {
		if (!(iab  instanceof  Int32Array  &&  ibase|0  ===  ibase  &&  ibase  >=  0  &&  ibase+NUMINTS  <=  iab.length)) {
			throw  new  Error(`Bad arguments to Lock constructor: ${iab}  ${ibase}`);
		}
		Atomics.store(iab, ibase, UNLOCKED);
		return  ibase;
	}
	// Acquire the lock, or block until we can. Locking is not recursive:
	lock() {
		const  iab  =  this.iab;
		const  stateIdx  =  this.ibase;
		var  c;
		if ((c  =  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS)) !==  UNLOCKED) { // A
			do {
				if (c  ===  LOCKED_POSSIBLE_WAITERS
				||  Atomics.compareExchange(iab, stateIdx, LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !==  UNLOCKED) {
					Atomics.wait(iab, stateIdx, LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY);
				}
			} while ((c  =  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !==  UNLOCKED); // B
		}
	}
	tryLock() {
		const  iab  =  this.iab;
		const  stateIdx  =  this.ibase;
		return  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS) ===  UNLOCKED;
	}
	unlock() {
		const  iab  =  this.iab;
		const  stateIdx  =  this.ibase;
		var  v0  =  Atomics.sub(iab, stateIdx, 1);
		// Wake up a waiter if there are any
		if (v0  !==  LOCKED_NO_WAITERS) {
			Atomics.store(iab, stateIdx, UNLOCKED);
			Atomics.notify(iab, stateIdx, 1);
		}
	}
	toString() {
		return  "Lock:{ibase:"  +  this.ibase  +"}";
	}
}
exports.Lock  =  Lock;

當進程A嘗試獲取鎖成功時,A處判斷語句為false,因此由compareExchange設置狀態為LOCKED_NO_WAITERS,直接執行其后續邏輯;
若進程B此時執行lock獲取鎖時,A處判斷為true,進入do while循環體,在wait處sleep;
進程A通過unlock釋放鎖,會將鎖狀態置為UNLOCKED,同時喚醒阻塞的進程B;
進程B執行循環判斷語句B,此時為false,跳出循環執行B的邏輯。

當然,也可通過tryLock實現自旋鎖或者其他邏輯實現非阻塞等待。

參考

libuv漫談之線程
Atomics
Atomics MDN


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM