NodeJS Addon 多線程通信


某個產品的Mac客戶端計划基於electron實現,因為現有SDK有C API,原理上用NodeJS Addon來封裝成JS API就可使用了。但涉及到與Addon多線程交互,翻找資料沒能找到到底該怎么做,只好翻看NodeJS實現找到實現思路。

實現思路上,NodeJS V8 引擎是libuv單線程的,客戶端前台頁面邏輯跑在libuv事件循環中,后台SDK跑在獨立的線程上。業務需要前后台線程之間互相通信,前台訪問后台直接API調用,然后異步回調、后台通知則需要喚醒libuv線程來完成。

 

libuv 唯一線程安全的接口是 uv_async_send,  用它可以喚醒libuv線程執行指定代碼。

 

后台SDK -> NodeJS 調用流程:

0. SDK 實例初試化時

   初試化一個uv_async_t: 

   uv_async_init(uv_default_loop(), &uv_async, node_event_process);

   注意必須在libuv線程線程執行,node_event_process 是被喚醒時執行的方法。

 

1.  將參數臨時存放在SDK實例

 

2.  喚醒libuv線程

  uv_async_send(&uv_async)

  libuv 在UNIX下的實現是,uv_async_init時創建一個namepipe fd, 在epoll等待,send時就是往fd寫入數據,epoll就會返回。

  要注意的是uv_async_send每次調用並不保證都會執行回調,只保證能喚醒線程,不能作為調用方式。

 

3. SDK線程執行 std::condition_variable.wait() 掛起等待libuv線程執行結果

 

4. libuv線程喚醒,讀取SDK實例臨時參數執行node_event_process調用

    通過V8接口,調用JSON庫,將參數解析為V8 Object對象

    通過CallID查找事務回調的CallBack Function,或者提供bind方法綁定的Callback Function,執行調用就會返回Javascript世界開始執行

 

5. 執行完成,將返回值臨時放到SDK實例上

    返回v8::Local<v8::Value> 需先處理為自己的類型,傳遞到SDK線程讀取會導致崩潰

 

6. std::condition_variable.notify_one()  喚醒SDK線程繼續執行

 

7. SDK wait() 繼續執行,讀取返回值

 

參考代碼如下:

class RcsSdk : public Nan::ObjectWrap {
public:
    static NAN_MODULE_INIT(Init);
    void addCallback(int cid, Nan::Callback* callback);
    const Nan::Callback* removeCallback(int cid);
    const Nan::Callback* getBindMethod(const char* method);
    
    void notifyMainLoop();

#ifdef CALL_NODE_USE_ASYNC_QUEUE
    std::queue<EventItem*> asyncEventQueue;
#else
    void waitForEventProcess(EventItem* item);
    void notifyEventComplete();
    EventItem* syncEventItem;
    char* syncEventResult;
#endif
    std::mutex eventSyncMutex;
    rcs_state *state;
    
private:
    explicit RcsSdk(const char* appId, const char* number, const char* imei,
                    const char* devicevendor,const char* devicemodel,
                    const char* deviceos,const char* deviceosversion,
                    const char* clientVendor, const char* clientVersion,
                    const char* storage, const char* privateStorage, int clientType);
    ~RcsSdk();
    
    static NAN_METHOD(New);
    static NAN_METHOD(bind);
    
    static void addApiMethods(v8::Local<v8::FunctionTemplate> &tpl);
    static Nan::Persistent<v8::Function> constructor;
    
    std::condition_variable eventCondition;
    uv_async_t uv_async;
    std::map<int, const Nan::Callback*> callbacks;
    std::map<std::string, const Nan::Callback*> bindMethods;
};




static std::map<rcs_state*, RcsSdk*> sdkMap;
static std::mutex globalMutex;

static RcsSdk* node_get_sdk(rcs_state *R){
    RcsSdk *sdk = NULL;
    globalMutex.lock();
    std::map<rcs_state*, RcsSdk*>::iterator iterator = sdkMap.find(R);
    if(iterator != sdkMap.end()){
        sdk = iterator->second;
    }
    globalMutex.unlock();
    return sdk;
}

char* node_event_deliver(struct rcs_state *R, const char* event, const char* json){
    //LuaSdk 后台線程
    LOG_DEBUG("node_event_deliver begin, event:%s json:%s", event, json);
    RcsSdk* sdk = node_get_sdk(R);
    char* result = NULL;
    if(NULL == sdk){
        LOG_DEBUG("node_event_deliver sdk NULL");
        return NULL;
    }

    EventItem item;
    item.event = event;
    item.data = (char *)json;
    sdk->waitForEventProcess(&item);
    sdk->eventSyncMutex.lock();
    result = sdk->syncEventResult;
    sdk->eventSyncMutex.unlock();
    LOG_DEBUG("node_event_deliver end");
    return result;
}
static int node_get_cid(v8::Handle<v8::Value> json){
    int cid = -1;
    if(json->IsObject()){
        v8::Handle<v8::Object> ojson = json->ToObject();
        cid = ojson->Get(Nan::New("id").ToLocalChecked())->Int32Value();
    }
    return cid;
}
static v8::Handle<v8::Value> parseJson(v8::Handle<v8::Value> jsonString) {
    // Call JSON.parse.apply(JSON, jsonString)
    v8::Handle<v8::Context> context = Nan::GetCurrentContext();
    v8::Handle<v8::Object> global = context->Global();
    
    v8::Handle<v8::Object> JSON = global->Get(Nan::New("JSON").ToLocalChecked())->ToObject();
    v8::Handle<v8::Function> JSON_parse =v8::Handle<v8::Function>::Cast(JSON->Get(Nan::New("parse").ToLocalChecked()));
    
    return JSON_parse->Call(JSON, 1, &jsonString);
}
static v8::Local<v8::Value> node_do_event_process(RcsSdk* sdk, EventItem *item){
    v8::Handle<v8::Value> json = parseJson(Nan::New(item->data).ToLocalChecked());
    v8::Local<v8::Value> argv[] = {json};
    v8::Local<v8::Value> result;
    
    if(!std::strcmp(item->event, "callback")){
        int cid = node_get_cid(json);
        LOG_DEBUG("node api callback cid:%d", cid);
        const Nan::Callback * callback = sdk->removeCallback(cid);
        if(NULL == callback){
            LOG_ERROR("node api callback=NULL");
        }else{
             result = callback->Call(1, argv);
             delete callback;
        }
    }else{
        LOG_DEBUG("node_do_event_process:%s %s", item->event, item->data);
        const Nan::Callback * callback = sdk->getBindMethod(item->event);
        if(NULL == callback){
            LOG_DEBUG("node listener callback=NULL");
        }else{
            result = callback->Call(1, argv);
        }
    }

    return result;
}
// SDK線程調用 uv_async_send 后,主線程被喚醒后執行
// uv_async_send 是libuv唯一線程安全的API,但不保證每次調用都通知到,只是能確保至少一次的喚醒主線程
// 參見: http://docs.libuv.org/en/v1.x/async.html
// int uv_async_send(uv_async_t* async)
// Wakeup the event loop and call the async handle’s callback.
//
// Note It’s safe to call this function from any thread. The callback will be
// called on the loop thread.
// Warning libuv will coalesce calls to uv_async_send(), that is, not every call to
// it will yield an execution of the callback. For example: if uv_async_send() is
// called 5 times in a row before the callback is called, the callback will only be
// called once. If uv_async_send() is called again after the callback was called, it
// will be called again.
static void node_event_process(uv_async_t *handle){
    //nodejs 主線程
    LOG_DEBUG("node_event_process begin");
    Nan::HandleScope scope;
    RcsSdk *sdk = (RcsSdk*)handle->data;
    
    sdk->eventSyncMutex.lock();
    EventItem *item = sdk->syncEventItem;
    sdk->syncEventItem = NULL;
    sdk->eventSyncMutex.unlock();
    if(NULL == item){
        LOG_ERROR("node_event_process sync syncEventItem=NULL");
    }else{
        v8::Local<v8::Value> ret = node_do_event_process(sdk, item);
        char *result = NULL;
        // 暫只支持返回String類型(必須在主線程解析v8對象)
        if(!ret.IsEmpty() && ret->IsString()){
            result = *Nan::Utf8String(v8::Local<v8::String>::Cast(ret));
        }
        // 此處分配內存,由下次調用前釋放,最多保留上一次的結果占用的內存
        if(result != NULL){
            char* cpy = (char*)RCS_MALLOC(strlen(result)+1);
            assert(cpy);
            strcpy(cpy, result);
            result = cpy;
        }
        sdk->eventSyncMutex.lock();
        sdk->syncEventResult = result;
        sdk->eventSyncMutex.unlock();
    }
    sdk->notifyEventComplete();
    LOG_DEBUG("node_event_process end");
}

// RcsSdk Imps
Nan::Persistent<v8::Function> RcsSdk::constructor;
NAN_MODULE_INIT(RcsSdk::Init) {
    v8::Local<v8::FunctionTemplate> tpl = Nan::New<v8::FunctionTemplate>(New);
    tpl->SetClassName(Nan::New("RcsSdk").ToLocalChecked());
    //這個數只要>0即可
    tpl->InstanceTemplate()->SetInternalFieldCount(1);
    //構建Js 的ProtoType
    Nan::SetPrototypeMethod(tpl, "bind", bind);
    addApiMethods(tpl);
    
    constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked());
    Nan::Set(target, Nan::New("RcsSdk").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
}

RcsSdk::RcsSdk(const char* number, const char* imei, const char* devicevendor,
               const char* devicemodel, const char* deviceos, const char* deviceosversion,
               const char* clientVendor, const char* clientVersion, const char* storage,
               const char* appId, const char* syspath, int clientType)
#ifndef CALL_NODE_USE_ASYNC_QUEUE
:syncEventItem(NULL),
syncEventResult(NULL)
#endif
{
    LOG_DEBUG("begin initSdk: %s %s %s %s %s", appId, number, clientVendor, clientVersion, storage);

    state = rcs_state_new();

    rcs_listeners listeners;
    node_get_listeners(listeners);
    rcs_set_listeners(state, &listeners);
    
    rcs_callbacks callbacks;
    node_get_callbacks(callbacks);
    rcs_set_callbacks(state, &callbacks);
    
    // 每個State上有且只有一個用於中斷主線程的 uv_async_t
    uv_async_init(uv_default_loop(), &uv_async, node_event_process);
    uv_async.data = this;
    
    rcs_init(state, number, imei, "node-imsi",
             devicevendor, devicemodel, deviceos, deviceosversion,
             clientVendor,clientVersion, storage, appId, syspath, clientType);
    rcs_start(state);
    
    globalMutex.lock();
    sdkMap[state] = this;
    globalMutex.unlock();
}

RcsSdk::~RcsSdk() {
    if(state){
        rcs_stop(state, 0);
        globalMutex.lock();
        sdkMap.erase(state);
        globalMutex.unlock();
        state = NULL;
    }
}
void RcsSdk::addCallback(int cid, Nan::Callback* callback){
    callbacks[cid] = callback;
}
const Nan::Callback* RcsSdk::removeCallback(int cid){
    std::map<int, const Nan::Callback*>::iterator iterator = callbacks.find(cid);
    const Nan::Callback* callback = NULL;
    if(iterator != callbacks.end()){
        callback = iterator->second;
        callbacks.erase(iterator);
    }
    return callback;
}
const Nan::Callback* RcsSdk::getBindMethod(const char* method){
    std::map<std::string, const Nan::Callback*>::iterator iterator = bindMethods.find(method);
    if(iterator != bindMethods.end()){
        return iterator->second;
    }
    return NULL;
}
void RcsSdk::notifyMainLoop(){
    uv_async_send(&uv_async);
}

NAN_METHOD(RcsSdk::New) {
    if (!info.IsConstructCall()) {
        // Invoked as plain function `RcsSdk(...)`, turn into construct call.
        const int argc = info.Length();
        v8::Local<v8::Value> argv[12];
        for(int i=0; i<argc; i++){
            argv[i] = info[i];
        }
        v8::Local<v8::Function> cons = Nan::New(constructor);
        info.GetReturnValue().Set(cons->NewInstance(argc, argv));
    } else {
        // Invoked as constructor: `new RcsSdk(...)`
        
        if(info.Length() != 12){
            Nan::ThrowError(Nan::Error("Wrong number of arguments"));
            return;
        }

        CHECK_ARGS_STRING(info, 0, "number");
        CHECK_ARGS_STRING(info, 1, "imei");
        CHECK_ARGS_STRING(info, 2, "devicevendor");
        CHECK_ARGS_STRING(info, 3, "devicemodel");
        CHECK_ARGS_STRING(info, 4, "deviceos");
        CHECK_ARGS_STRING(info, 5, "deviceosversion");
        CHECK_ARGS_STRING(info, 6, "clientVendor");
        CHECK_ARGS_STRING(info, 7, "clientVersion");
        CHECK_ARGS_STRING(info, 8, "storage");
        CHECK_ARGS_STRING(info, 9, "appId");
        CHECK_ARGS_STRING(info, 10, "sysPath");
        CHECK_ARGS_NUMBER(info, 11, "clientType");
        
        const char* number = *Nan::Utf8String(info[0]);
        const char* imei = *Nan::Utf8String(info[1]);
        const char* devicevendor = *Nan::Utf8String(info[2]);
        const char* devicemodel = *Nan::Utf8String(info[3]);
        const char* deviceos = *Nan::Utf8String(info[4]);
        const char* deviceosversion = *Nan::Utf8String(info[5]);
        const char* clientVendor = *Nan::Utf8String(info[6]);
        const char* clientVersion = *Nan::Utf8String(info[7]);
        const char* storage = *Nan::Utf8String(info[8]);
        const char* appId = *Nan::Utf8String(info[9]);
        const char* sysPath = *Nan::Utf8String(info[10]);
        int clientType = info[11]->Uint32Value();
        
        RcsSdk *sdk = new RcsSdk(number, imei, devicevendor, devicemodel, deviceos, deviceosversion,
                                 clientVendor, clientVersion, storage, appId, sysPath, clientType);
        sdk->Wrap(info.This());
        info.GetReturnValue().Set(info.This());
    }
}

NAN_METHOD(RcsSdk::bind) {
    RcsSdk* sdk = Nan::ObjectWrap::Unwrap<RcsSdk>(info.This());
    if(info.Length() != 2 || ! info[0]->IsString() || !info[1]->IsFunction()){
        Nan::ThrowError(Nan::Error("Wrong arguments"));
        return;
    }
    const char* method = *Nan::Utf8String(info[0]);
    Nan::Callback *callback = new Nan::Callback(info[1].As<v8::Function>());
    sdk->bindMethods[method] = callback;
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM