pomelo研究筆記-RPC服務端


POMELO 採用多進程的架構能夠非常好的實現游戲server(進程)的擴展性,達到支撐較多在線用戶、減少server壓力等要求。

進程間通信採用RPC的形式來完畢,pomelo的RPC實現的相當靜止。

採用相似例如以下的方式就能夠調用remoteserver提供的服務:

proxies.user.test.service.echo(routeParam, 'hello', function(err, resp) {
    if(err) {
        console.error(err.stack);
        return;
    }
    console.log(resp);
});

上面的一段RPC調用能夠理解為:
調用namespace類型為user、server類型為test的service模塊的echo接口
如今聽着有些拗口。沒關系,且聽我慢慢來分析:)


服務端源代碼分析

pomelo-rpc的源代碼我閱讀+debug了不下30次,以下我將按照從底層數據交換模塊到上層業務邏輯分發處理的方式依次介紹服務端與client的源代碼架構。

1. 基於socket.io模塊的數據通信模塊

一般來說我們在寫socket數據通信模塊有幾個問題是必需要去解決的,譬如說:
  • 粘包的問題
  • 丟包以及亂序的問題
  • ip地址過濾
  • 緩沖隊列的實現
  • 與上層模塊的交互模式
這里把pomelo-rpc實現過的來說一說。nodejs 內置一個events模塊。這也導致了把一個模塊封裝成一個事件收發器是相當自然的一件事情:
var Acceptor = function(opts, cb) {
    EventEmitter.call(this);
    this.bufferMsg = opts.bufferMsg;
    this.interval = opts.interval || 300;
    this.whitelist= opts.whitelist;

    this._interval = null;
    this.sockets = {};
    this.msgQueues = {};

    this.server = null;
    this.notify = cb;
};

util.inherits(Acceptor, EventEmitter);

利用node內置的util提供的繼承函數。簡單兩句話Acceptor繼承了events.翻開nodejs源代碼 inherits 函數的實現也是相當簡單:

var inherits = function(sub, super) {
    var tmp = function() {}
    tmp.prototype = super.prototype;
    sub.prototype = new tmp();
}

通過這樣的寄生組合式的繼承避免了調用兩次父類的構造函數,這里就不多展開了。

看到Acceptor構造函數接收一些配置信息:

bufferMsg: 配置是否啟用緩沖隊列interval: 配置定時數據發送模塊的間隔, Acceptor開啟監聽的時候。依據配置信息來確定是否開啟一個定時器,定時刷新緩沖:

    if(this.bufferMsg) {
        this._interval = setInterval(function() {
            flush(self);
        }, this.interval);
    }

flush函數主要做的是負責把緩沖的數據通過socket.io接口寫出去:

var flush = function(acceptor) {
    var sockets = acceptor.sockets;
    var queues = acceptor.msgQueues;
    var queue, socket;

    for(var socketId in queues) {
        socket = sockets[socketId];
        if(!socket) {
            delete queues[socketId];
            continue;
        }
        queue = queues[socketId];
        if(!queue.length) {
            continue;
        }
        socket.emit('message', queue);
        queues[socketId] = [];
    }
};

每一個client鏈接相應一個數據緩沖隊列,通過發送’message’消息的方式把數據發出。


IP地址過濾
開啟監聽后,假設有client鏈接(on connection 事件),第一件事情是IP地址過濾,IP地址白名單也是通過構造函數注入:whitelist.若IP地址非法則關閉鏈接,輸出警告信息。


數據處理模塊
上層模塊通知配置信息注入一個notify回調函數, acceptor監聽到數據后首先把數據拋給上層。上層處理完畢后推斷假設需要緩沖則寫入隊列。否則立即發送出去:

 acceptor.notify.call(null, pkg.msg, function() {
        var args = Array.prototype.slice.call(arguments);   
        for(var i = 0, l = args.length; i < l; i++) {
            if(args[i] instanceof Error) {
                args[i] = cloneError(args[i]);
            }
        }
        var resp = {id: pkg.id, resp: Array.prototype.slice.call(args)};
        if(acceptor.bufferMsg) {
            enqueue(socket, acceptor, resp);
        }
        else {
            socket.emit('message', resp);
        }
    });

2. 路由請求分發模塊

架在acceptor模塊上面的是gateway模塊,該模塊主要負責acceptor模塊的創建銷毀以及狀態控制。首先在創建acceptor模塊的時候傳入一個函數:

this.acceptor = this.acceptorFactory.create(opts, function(msg, cb) {
        dispatcher.route(msg, cb);
    });

通過工廠方法來構建一個acctpor實例。這樣底層數據處理模塊能夠方便的更換通信協議。這里回調函數做的一個工作是調用分發函數,把請求交給詳細的服務提供方。來看看dispatcher的實現:

var Dispatcher = function(services) {
    EventEmitter.call('this');
    var self = this;
    this.on('reload', function(services) {
        self.services = services;
    });
    this.services = services;
};
util.inherits(Dispatcher, EventEmitter);

相同Dispatcher模塊也變成一個事件收發器。同一時候構造器接收一個services參數。

依據改參數配合路由請求時傳入的參數,就能把請求交給詳細的子模塊。

所以,dispatcher.route(msg, cb);僅僅只是是匹配下參數調用相應接口罷了。看到構造器還監聽了一個reload事件,該事件有什么作用呢?這事實上就是pomelo的RPC 熱拔插模塊的實現。實現起來比較簡單:

var watchServices = function(gateway, dispatcher) {
    var paths = gateway.opts.paths;
    var app = gateway.opts.context;

    for(var i = 0; i < paths.length; i++) {
        (function(index) {
            fs.watch(paths[index].path, function(event, name) {
                if(event === 'change') {
                    var res = {};
                    var item = paths[index];
                    var m = Loader.load(item.path, app);
                    if(m) {
                        res[namespace] = res[namespace] || {};
                        for(var s in m) {
                            res[item.namespace][s] = m[s];
                        }
                    }
                    dispatcher.emit('reload', res);
                }
            });
        })(i);
    }
};

gateway模塊在啟動的時候會依據配置信息調用一個watchServices監聽模塊的變化。假設數據文件發生變化則又一次載入services並通知路由分發模塊。

為了保證server與client的正常通信。除了底層數據格式的一致,另一個是路由信息的匹配。假設調用Gateway傳入的配置路徑是例如以下形式:

var paths = [
    {namespace: 'user', path: __dirname + '/remote/test'}
];

假設當前文件夾下有/remote/test/service.js文件。文件包括兩個接口test1/test2

load之后返回的對象形式例如以下:

{
    service: {
        test1: 'function xxx',
        test2: 'function yyy'
    }
}

同一時候在pomelo你們有系統RPC服務以及自己定義RPC服務,完整的路由信息例如以下:

services: {
    sys: {
        sys_module1: {
            sys_module1_interface1: 'xxx'
        }
    },
    user: {
        user_module1: {
            user_module1_interface1: 'yyy'
        }
    }
}

服務端其它東西都比較簡單了,為了理清楚脈絡,以上代碼是經過刪減的。假設有興趣能夠到這里取。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM