NSQ部署


一、      簡介

NSQ主要有三個主要程序和一個Web服務程序:

  1. nsqd:是守護進程,接收,緩存,並投遞消息給客戶端
  2. nsqlookupd:是一個守護進程,為消費者提供運行時發現服務,來查找指定話題(topic)的生產者 nsqd
  3. nsq_to_http:消費指定的話題(topic)/通道(channel)和執行 HTTP requests (GET/POST) 到指定的端點。
  4. nsqadmin:是 Web 服務,用來實時的管理你的 NSQ 集群。它通過和 nsqlookupd 實例交流,來確定生產者

除了nsqadmin是獨立提供WEB信息以外,三個主要程序組成類似下面的網絡拓撲,以提供消息服務:

二、      NSQ獲取

下載最新的程序。並將NSQ所在目錄加入${PATH}中。

三、      部署

以下假設我們要在兩台機器上部署NSQ:

192.168.30.54(簡稱54)

192.168.30.55(簡稱55)。

其中nsqlookupd和nsqadmin部署到54,nsq兩台機器上都部署,以冗余nsqd來實現容災。

 

以下假設nsq工具集的路徑已包含在${PATH}環境變量中。

 

1)     54的部署:

  

當程序未指定—config參數時,程序會嘗試從配置文件中讀取配置信息。配置文件是以cfg為擴展名,且與程序名同名的文本文件。

nsqlookupd.cfg

一般情況下nsqlookupd不需要配置文件,我們使用它的缺省配置即可,如果需要它在特殊端口上監聽或修改某些配置,可以修改配置文件

nsqadmin.cfg

nsqd.cfg

2)     55的部署:

55上只需啟動nsqd即可

該上面的配置文件,除了id被設置成2外,與54上全部相同。

四、      監控

4.1 可視化監控

通過http瀏覽器,我們可以看到NSQ的運行情況,見下圖。該圖是由nsqadmin提供的。

 

4.2 自動監控

自動監控腳本,見附錄。

自動監控目前主要監控兩個指標:

1)  運行中的nsqd的數量。主要監控其是否正常運行。

2)  nsqd上各channel最大堆積的消息數量。主要監控下游處理情況。

 

自動監控輸出示例:

說明:

第一行主要監控nsqd數量,特征字段為nsqd_count,當前系統中有2個nsqd。

第二、三行監控nsqd運行情況,特征字段為nsqd。

warning表明是否有報警。off – 不報警,監控項運行正常;on – 報警,該項運行異常。

 

例:

第一行,當前運行的nsqd有兩個(nsqd_count),系統中應有nsqd數量為2(should_be),不報警(warning[off])

第二行,nsqd地址為192.168.30.54:4151,nsqd運行良好(health[OK], 不報警(warning[off])。

五、      異常處理

當NSQ發生異常時,可以通過http://192.168.30.54:4171觀察到,或通過自動監控機制接收到報警。這時可以重啟有問題的進程,以使系統恢復。

 

例如,如果nsqd運行異常,則可以運行如下命令重啟nsqd:

目前已知異常的處理:

1)  系統時間向后調整后, nsqd占用100%的CPU

描述:將系統時間由2015-08-25 11:02:00 向后調整2015-08-24 11:02:00后,nsqd占用100%的CPU

分析:由於是時間向后調整,因此造成nsqd定時器,向前追趕新時間,造成定時器被密集調用。

解決方法:不管是向前(跨度較大時),還是向后調整時間,都建議重啟nsqd。

六、下游消息處理(nsq_to_http)(SUB)

6.1 GET推送

典型的命令行參數:

nsq_to_http --topic=demo --lookupd-http-address=127.0.0.1:4161 --get=http://localhost/nsqphp/sub?msg=%s

參數—get表明消息將以GET請求向WebServer(SUB)推送。%s以可打印的形式被實際消息替代。

nsq_to_http需要三元組(topic, lookupd的http地址,get推送地址)

 

6.2 POST推送(非表單數據)

典型的命令行參數

nsq_to_http --topic=demo --lookupd-http-address=127.0.0.1:4161 --post=http://localhost/nsqphp/sub

參數—post表明消息將以POST請求向WebServer(SUB)推送。消息在http的body部分。

此時推送的數據將以二進制推送,也即HTTP請求的Content-Type:application/octet-stream。如果WebServer是以php實現的,則需要以file_get_contents(“php://input”)的形式讀取數據。

6.3 POST推送(表單數據)

典型的命令行參數

nsq_to_http --topic=demo --lookupd-http-address=127.0.0.1:4161 --post=http://localhost/nsqphp/sub --content-type="application/x-www-form-urlencoded"

參數—post表明消息將以POST請求向WebServer(SUB)推送。消息在http的body部分。

注意—content-type參數,該參數只有在—post的情況下才有效。該參數表明post的消息是以表單的形式組織的,也即可以通過$_POST[]或$_REQUEST[]的方式取到消息

七、其它

1)   nsqphp向nsqd發送信息時各階段超時時間

名稱

超時時間

說明

connectionTimeout

3秒

連接nsqd的超時時間

readWriteTimeout

3秒

收發數據的超時時間

readWaitTimeout

5秒

接收回應的超時時間

 

附錄:

自動監控腳本: 

<?php
/**
 * Created by PhpStorm.
 * User: Lenovo
 * Date: 2015/8/22
 * Time: 18:05
 */

error_reporting(E_ALL || ~E_NOTICE); //ÏÔʾ³ýÈ¥ E_NOTICE Ö®ÍâµÄËùÓдíÎóÐÅÏ¢

// http://192.168.30.54:4161/nodes

function queryLookupdForNsqd($lookupAddrs) {
    foreach ($lookupAddrs as $addr) {
        $text = file_get_contents("http://$addr/nodes");
        // $nodes¸ñʽ
        // {"status_code":200,"status_txt":"OK","data":{"producers":[{"remote_address":"192.168.30.54:38631","hostname":"localhost.localdomain","broadcast_address":"192.168.30.54","tcp_port":4150,"http_port":4151,"version":"0.3.6-alpha","tombstones":[false,false],"topics":["demo","hancj"]},{"remote_address":"192.168.30.55:35621","hostname":"localhost.localdomain","broadcast_address":"192.168.30.55","tcp_port":4150,"http_port":4151,"version":"0.3.6-alpha","tombstones":[false,false],"topics":["demo","hancj"]}]}}
        $nodes = json_decode($text, true);

        $arrNode = array();
        foreach ($nodes['data']['producers'] as $nsqd) {
            $addr = "{$nsqd['broadcast_address']}:{$nsqd['http_port']}";
            //echo $addr, PHP_EOL;
            $arrNode[] = $addr;
        }

        return $arrNode;
    }
}

function queryNsqd($nsqdAddr) {
    $httpAddr = "http://$nsqdAddr/stats?format=json";
    // infoµÄ¸ñʽ
    // {"status_code":200,"status_txt":"OK","data":{"version":"0.3.6-alpha","health":"OK","start_time":1440233123,"topics":[{"topic_name":"demo","channels":[{"channel_name":"nsq_to_http","depth":0,"backend_depth":0,"in_flight_count":0,"deferred_count":0,"message_count":7010004,"requeue_count":1046,"timeout_count":649,"clients":[],"paused":false,"e2e_processing_latency":{"count":0,"percentiles":null}}],"depth":0,"backend_depth":0,"message_count":7010004,"paused":false,"e2e_processing_latency":{"count":0,"percentiles":null}},{"topic_name":"hancj","channels":[{"channel_name":"nsq_to_http","depth":1,"backend_depth":0,"in_flight_count":0,"deferred_count":0,"message_count":0,"requeue_count":0,"timeout_count":0,"clients":[],"paused":false,"e2e_processing_latency":{"count":0,"percentiles":null}}],"depth":0,"backend_depth":0,"message_count":0,"paused":false,"e2e_processing_latency":{"count":0,"percentiles":null}}]}}
    $info = json_decode(file_get_contents($httpAddr), true);

    $nsqd = array('health' => $info['data']['health'], 'channels' => 0, 'topics' => 0);

    foreach ($info['data']['topics'] as $topic) {
        $nsqd['topics'] += 1;
        foreach ($topic['channels'] as $channel) {
            $nsqd['channels'] += 1;
            if ($channel['channel_name'] == 'nsq_to_http') {
                if (!$nsqd['max_channel'] || $nsqd['max_channel']['depth'] < $channel['depth']) {
                    $nsqd['max_channel'] = $channel;
                }
            }
        }
    }

    return $nsqd;
}

function main($argv) {
    if (count($argv) < 3) {
        echo "Usage: {$argv[0]} nsqd_count nsqlookupd_address...\n";
        exit;
    }

    $nsqdCount = $argv[1];
    $nsqlookupd = array_slice($argv, 2);

    $now = date('Y-m-d H:i:s');
    $arrNSQD = queryLookupdForNsqd($nsqlookupd);//array("192.168.30.54:4161",));
    $warn = (count($arrNSQD) == $nsqdCount ? "off" : "on");
    echo "$now nsqd_count[" . count($arrNSQD) . "] should_be[{$nsqdCount}] warning[$warn]", PHP_EOL;
    foreach ($arrNSQD as $nsqdAddr) {
        $info = queryNsqd($nsqdAddr);

        $warn = ((!$info || $info['health'] != 'OK' || $info['max_channel']['depth'] > 100000) ? "on" : "off");
        echo "$now nsqd[$nsqdAddr] health[{$info['health']}] warning[$warn] topics[{$info['topics']}]";
        echo " channels[{$info['channels']}] depth[{$info['max_channel']['depth']}]";
        echo PHP_EOL;
    }
}

main($argv);

  

NSQ官網地址:http://nsq.io/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM