Zookeeper C API 指南八(Zookeeper C API 應用示例)


前面七講我們基本上介紹完了 Zookeeper C API 的所有內容,本文將結合一個小例子講講如何在你的實際項目中使用 Zookeeper 服務。

設想如下場景:

假設程序 A 需要 7* 24 小時在線對外提供服務,但是 A 程序在生產環境下總是不穩定,時常崩潰,不過幸運的是解決方案很簡單,在 A 程序崩潰以后只需要重啟它就可以了。當然如此簡單的問題你可以提出多種解決方案,比方說自己實現一個服務程序,每隔一定時間去輪詢 A 的狀態,如果發現 A 崩潰了,立即重啟它,並向管理人員報告問題。不過我們並不打算這么做,畢竟本文主題是講 Zookeeper C API 的應用,所以我們采用 Zookeeper 服務來解決該問題。

若采用 Zookeeper 服務可以按照如下方案解決問題,程序 A 在啟動時創建一個臨時(ZOO_EPHEMERAL) znode 節點 /A,然后按照正常流程對外提供服務。另外監控程序對 /A 節點設置監視,當 /A 節點消失(說明 A 程序已經崩潰)時,重啟 A 程序。假設 A 的名稱是 QueryServer,即對外提供查詢服務的程序,具體提供什么查詢服務由應用自身決定,我們這里只是簡單地模擬一下。QueryServer 在啟動時創建一個 /QueryServer 的臨時節點(ZOO_EPHEMERAL),然后,程序 QueryServerd 監控 /QueryServer 節點,當 /QueryServer 節點消失(說明 A 程序已經崩潰)時,重啟 QueryServer 程序。

下面是 QueryServer 的實現代碼:

/*
 * =============================================================================
 *
 *       Filename:  QueryServer.c
 *
 *    Description:  QueryServer
 *
 *        Created:  02/15/2013 08:48:49 PM
 *
 *         Author:  Fu Haiping (forhappy), haipingf@gmail.com
 *        Company:  ICT ( Institute Of Computing Technology, CAS )
 *
 * =============================================================================
 */
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>

void QueryServer_watcher_g(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            printf("[[[QueryServer]]] Connected to zookeeper service successfully!\n");
        } else if (state == ZOO_EXPIRED_SESSION_STATE) { 
            printf("Zookeeper session expired!\n");
        }
    }  
}

void QueryServer_string_completion(int rc, const char *name, const void *data)
{
    fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc);
    if (!rc) {
        fprintf(stderr, "\tname = %s\n", name);
    }
}

void QueryServer_accept_query()
{
    printf("QueryServer is running...\n");
}

int main(int argc, const char *argv[])
{
    const char* host = "127.0.0.1:2181,127.0.0.1:2182,"
        "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
    int timeout = 30000;
    
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zhandle_t* zkhandle = zookeeper_init(host,
            QueryServer_watcher_g, timeout, 0, "hello zookeeper.", 0);
    if (zkhandle == NULL) {
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");
        exit(EXIT_FAILURE);
    }

    // struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}};
    // struct ACL_vector ALL_PERMS = {1, ALL_ACL};
    int ret = zoo_acreate(zkhandle, "/QueryServer", "alive", 5,
           &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL,
           QueryServer_string_completion, "zoo_acreate");
    if (ret) {
        fprintf(stderr, "Error %d for %s\n", ret, "acreate");
        exit(EXIT_FAILURE);
    }

    do {
        // 模擬 QueryServer 對外提供服務.
        // 為了簡單起見, 我們在此調用一個簡單的函數來模擬 QueryServer.
        // 然后休眠 5 秒,程序主動退出(即假設此時已經崩潰).
        QueryServer_accept_query();
        sleep(5);
    } while(false);

    zookeeper_close(zkhandle);
}

Makefile如下:

all:QueryServer

QueryServer:QueryServer.o
    gcc -L/usr/local/lib/ -lzookeeper_mt -o $@ $^ 

QueryServer.o:QueryServer.c
    gcc -DTHREADED -I/usr/local/include/zookeeper -o $@ -c $^

.PHONY:clean

clean:
    rm QueryServer.o QueryServer

 

QueryServerd 代碼如下:

/*
 * =============================================================================
 *
 *       Filename:  QueryServerd.c
 *
 *    Description:  QueryServer daemon using zookeeper. 
 *
 *        Created:  02/15/2013 08:48:49 PM
 *
 *         Author:  Fu Haiping (forhappy), haipingf@gmail.com
 *        Company:  ICT ( Institute Of Computing Technology, CAS )
 *
 * =============================================================================
 */
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>

void QueryServerd_watcher_global(zhandle_t * zh, int type, int state,
                            const char *path, void *watcherCtx);
static void QueryServerd_dump_stat(const struct Stat *stat);
void QueryServerd_stat_completion(int rc, const struct Stat *stat,
                             const void *data);
void QueryServerd_watcher_awexists(zhandle_t *zh, int type, int state,
                              const char *path, void *watcherCtx);
static void QueryServerd_awexists(zhandle_t *zh);

void
QueryServerd_watcher_global(zhandle_t * zh, int type, int state,
                            const char *path, void *watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            printf("Connected to zookeeper service successfully!\n");
        } else if (state == ZOO_EXPIRED_SESSION_STATE) { 
            printf("Zookeeper session expired!\n");
        }
    }
}

static void
QueryServerd_dump_stat(const struct Stat *stat)
{
    char tctimes[40];
    char tmtimes[40];
    time_t tctime;
    time_t tmtime;

    if (!stat) {
        fprintf(stderr, "null\n");
        return;
    }
    tctime = stat->ctime / 1000;
    tmtime = stat->mtime / 1000;

    ctime_r(&tmtime, tmtimes);
    ctime_r(&tctime, tctimes);

    fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
            "\tmtime=%s\tmzxid=%llx\n"
            "\tversion=%x\taversion=%x\n"
            "\tephemeralOwner = %llx\n",
            tctimes, stat->czxid,
            tmtimes, stat->mzxid,
            (unsigned int) stat->version, (unsigned int) stat->aversion,
            stat->ephemeralOwner);
}

void
QueryServerd_stat_completion(int rc, const struct Stat *stat,
                             const void *data)
{
    // fprintf(stderr, "%s: rc = %d Stat:\n", (char *) data, rc);
    // QueryServerd_dump_stat(stat);
}

void
QueryServerd_watcher_awexists(zhandle_t *zh, int type, int state,
                              const char *path, void *watcherCtx)
{
    if (state == ZOO_CONNECTED_STATE) {
        if (type == ZOO_DELETED_EVENT) {
            printf("QueryServer gone away, restart now...\n");
            // re-exists and set watch on /QueryServer again.
            QueryServerd_awexists(zh);
            pid_t pid = fork();
            if (pid < 0) {
                fprintf(stderr, "Error when doing fork.\n");
                exit(EXIT_FAILURE);
            }
            if (pid == 0) { /* child process */
                // 重啟 QueryServer 服務.
                execl("/tmp/QueryServer/QueryServer", "QueryServer", NULL);
                exit(EXIT_SUCCESS);
            }
            sleep(1); /* sleep 1 second for purpose. */
        } else if (type == ZOO_CREATED_EVENT) {
            printf("QueryServer started...\n");
        }
    }

    // re-exists and set watch on /QueryServer again.
    QueryServerd_awexists(zh);
}

static void
QueryServerd_awexists(zhandle_t *zh)
{
    int ret =
        zoo_awexists(zh, "/QueryServer",
                     QueryServerd_watcher_awexists,
                     "QueryServerd_awexists.",
                     QueryServerd_stat_completion,
                     "zoo_awexists");
    if (ret) {
        fprintf(stderr, "Error %d for %s\n", ret, "aexists");
        exit(EXIT_FAILURE);
    }
}

int
main(int argc, const char *argv[])
{
    const char *host = "127.0.0.1:2181,127.0.0.1:2182,"
        "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
    int timeout = 30000;

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zhandle_t *zkhandle = zookeeper_init(host,
                                         QueryServerd_watcher_global,
                                         timeout,
                                         0, "QueryServerd", 0);
    if (zkhandle == NULL) {
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");
        exit(EXIT_FAILURE);
    }

    QueryServerd_awexists(zkhandle);
    // Wait for asynchronous zookeeper call done.
    getchar();

    zookeeper_close(zkhandle);

    return 0;
}

 

Makefile 如下:

all:QueryServerd

QueryServerd:QueryServerd.o
    gcc -L/usr/local/lib/ -lzookeeper_mt -o $@ $^ 

QueryServerd.o:QueryServerd.c
    gcc -g -DTHREADED -I/usr/local/include/zookeeper -o $@ -c $^

.PHONY:clean

clean:
    rm QueryServerd.o QueryServerd

 

首先執行 QueryServerd,

forhappy@haiping-ict:/tmp/QueryServerd$ ./QueryServerd 
Connected to zookeeper service successfully!

然后執行 QueryServer,

forhappy@haiping-ict:/tmp/QueryServer$ ./QueryServer 
QueryServer is running...
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer

可見 Queryerver 創建了 /QueryServer 節點,5 秒后 QueryServer 模擬程序崩潰而退出,那么此時在 QueryServerd 端輸出如下:

Connected to zookeeper service successfully!
QueryServer started... # QueryServerd 感知到 QueryServer 已正常啟動.
QueryServer gone away, restart now... # 5 秒鍾后,QueryServer 崩潰,QueryServerd 准備重啟 QueryServer.
QueryServer is running... # QueryServer 正在運行,以下 3 行是 QueryServer 輸出結果。
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer
QueryServer started... # QueryServerd 感知到 QueryServer 已正常啟動.
QueryServer gone away, restart now...# 又過了 5 秒鍾后,QueryServer 崩潰,QueryServerd 准備重啟 QueryServer.
QueryServer is running... # QueryServer 再次運行,以下 3 行是 QueryServer 輸出結果。
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer
QueryServer started... # QueryServerd 再次感知到 QueryServer 已正常啟動,如此反復.
QueryServer gone away, restart now...
QueryServer is running...
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer
QueryServer started...

即 QueryServer 每 5 秒鍾崩潰一次,然后又被 QueryServerd 重啟,模擬了上面的應用場景。

好了 Zookeeper C API 的應用小示例講完了,可能應用場景選取的不好,不過大致可以一些說明問題吧,如果你想看 Zookeeper 更貼近現實的應用場景,可以參考淘寶的一篇文章《ZooKeeper典型應用場景一覽》和 IBM developerWorks 的一篇博文《分布式服務框架 Zookeeper -- 管理分布式環境中的數據》。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM