在nodejs的官方網站中有關於C++擴展的詳細說明,其中包含了從"hello world"到對象封裝的一系列示例。其中的“callback”節是關於回調函數的,美中不足的是,這個回調是阻塞的回調。
官方示例的回調函數用JS代碼來模擬的話,大致是這個樣子:
function syncCallback(callback) {
// 業務代碼
// 業務代碼
callback();
}
使用C++擴展的一個最大好處就是處理一些CPU密集的業務,因此這部分代碼一定是比較耗時的,否則用C++去實現完全沒有意義。業務代碼中的阻塞操作,例如傳統文件讀寫、密集計算等都會導致nodejs原始線程的阻塞,導致后來的請求無法得到及時響應,嚴重影響node的並發性能。
有服務器程序開發的朋友肯定已經想到用多線程的方法解決這個問題。是的,我要分享的就是在C++擴展中用多線程的方法處理回調,從而達到解決復雜的業務同時保證node線程的無阻塞特性。
node C++擴展中,可以使用libuv提供的線程方法,非常方便的進行線程調度。
下面是具體代碼,詳細解釋見注釋
#include <v8.h>
#include <node.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
using namespace node;
using namespace v8;
//
// 定義線程入參結構體
//
// a: 整型入參1
// b: 整型入參2
// result: 在工作線程里面計算好的a+b
// name: 線程名稱,由JS調用代碼指定
// callback: 回調函數
struct reqData
{
int result;
int a;
int b;
char name[128];
Persistent<Function> callback;
};
//
// 定義工作線程處理函數
//
// 入參為libuv指定的結構體格式
// 沒有返回值
// 具體的業務處理函數
void workerFunc(uv_work_t* req)
{
// 從uv_work_t的結構體中獲取我們定義的入參結構
reqData* request = (reqData*)req->data;
// 計算結果
request->result = request->a + request->b;
// 模擬耗時業務
for(int i = 0; i < 50; ++i) {
// 模擬密集運算導致的阻塞
Sleep(1000);
// 打印每次循環的內容
printf("[%s %04d] I am working.\n", request->name, i);
}
}
//
// 定義線程的回調函數
// req: 處理后的數據結構體
// status: 線程狀態
void afterWorkFunc(uv_work_t* req, int status)
{
HandleScope scope;
// 獲取我們定義的數據結構
reqData* request = (reqData*)req->data;
// 釋放請求結構體
delete req;
// 構造JS回調函數的arguments
Handle<Value> argv[2];
argv[0] = Undefined(); // err
argv[1] = Integer::New(request->result); // data
// 下面代碼相當於JS中的
// try {
// callback.apply(global, arguments);
// } catch (err) {
// throw err;
// }
TryCatch try_catch;
request->callback->Call(Context::GetCurrent()->Global(), 2, argv);
if (try_catch.HasCaught())
{
FatalException(try_catch);
}
// 回收資源
request->callback.Dispose();
delete request;
}
// 定義模塊的導出函數
static Handle<Value> test(const Arguments& args)
{
HandleScope scope;
// 判斷入參是否滿足條件
if ( args.Length() < 3 || !args[0]->IsNumber() || !args[1]->IsNumber() )
{
return ThrowException(Exception::TypeError(String::New("Bad argument")));
}
// 獲取參數,並轉化格式
ssize_t int1 ( args[0]->Int32Value() );
ssize_t int2 ( args[1]->Int32Value() );
char nameBuffer[128] = {0};
args[2]->ToString()->WriteAscii(nameBuffer);
// 檢查回調參數是否為函數
if ( args[3]->IsFunction() )
{
// 構造數據結構
Local<Function> callback = Local<Function>::Cast(args[3]);
reqData* request = new reqData;
request->callback = Persistent<Function>::New(callback);
request->a = int1;
request->b = int2;
strcpy(request->name, nameBuffer);
uv_work_t* req = new uv_work_t();
req->data = request;
// 調用libuv的線程處理函數
uv_queue_work(uv_default_loop(), req, workerFunc, afterWorkFunc);
}
else
{
return ThrowException(Exception::TypeError(String::New("Callback missing")));
}
return Undefined();
}
extern "C"
{
// 相當於JS中的
//
// exports.test = function Test(){};
//
static void init(Handle<Object> target)
{
target->Set(String::NewSymbol("test"), FunctionTemplate::New(test)->GetFunction());
}
}
NODE_MODULE(asyncAddon, init);
