上一節《Zookeeper C API 指南三(回調函數)》重點講了 Zookeeper C API 中各種回調函數的原型,本節將切入正題,正式講解 Zookeeper C API。相信大家讀完本文后應該對 Zookeeper C API 的使用有一個比較清晰的認識。
Zookeeper C API 概覽
Zookeeper C API 很規范,接口很容易記憶,大部分接口均以 zoo_ 開頭,只有少量接口以 zookeeper_ 開頭,所有的 API 匯總如下:
void zoo_create_op_init(zoo_op_t * op, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); void zoo_delete_op_init(zoo_op_t * op, const char *path, int version); void zoo_set_op_init(zoo_op_t * op, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); void zoo_check_op_init(zoo_op_t * op, const char *path, int version); ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t * clientid, void *context, int flags); ZOOAPI int zookeeper_close(zhandle_t * zh); ZOOAPI const clientid_t *zoo_client_id(zhandle_t * zh); ZOOAPI int zoo_recv_timeout(zhandle_t * zh); ZOOAPI const void *zoo_get_context(zhandle_t * zh); ZOOAPI void zoo_set_context(zhandle_t * zh, void *context); ZOOAPI watcher_fn zoo_set_watcher(zhandle_t * zh, watcher_fn newFn); ZOOAPI struct sockaddr *zookeeper_get_connected_host(zhandle_t * zh, struct sockaddr *addr, socklen_t * addr_len); ZOOAPI int zookeeper_interest(zhandle_t * zh, int *fd, int *interest, struct timeval *tv); ZOOAPI int zookeeper_process(zhandle_t * zh, int events); ZOOAPI int zoo_state(zhandle_t * zh); ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data); ZOOAPI int zoo_adelete(zhandle_t * zh, const char *path, int version, void_completion_t completion, const void *data); ZOOAPI int zoo_aexists(zhandle_t * zh, const char *path, int watch, stat_completion_t completion, const void *data); ZOOAPI int zoo_awexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget(zhandle_t * zh, const char *path, int watch, data_completion_t completion, const void *data); ZOOAPI int zoo_awget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, data_completion_t completion, const void *data); ZOOAPI int zoo_aset(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget_children(zhandle_t * zh, const char *path, int watch, strings_completion_t completion, const void *data); ZOOAPI int zoo_awget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_completion_t completion, const void *data); ZOOAPI int zoo_aget_children2(zhandle_t * zh, const char *path, int watch, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_awget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_async(zhandle_t * zh, const char *path, string_completion_t completion, const void *data); ZOOAPI int zoo_aget_acl(zhandle_t * zh, const char *path, acl_completion_t completion, const void *data); ZOOAPI int zoo_aset_acl(zhandle_t * zh, const char *path, int version, struct ACL_vector *acl, void_completion_t, const void *data); ZOOAPI int zoo_amulti(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results, void_completion_t, const void *data); ZOOAPI const char *zerror(int c); ZOOAPI int zoo_add_auth(zhandle_t * zh, const char *scheme, const char *cert, int certLen, void_completion_t completion, const void *data); ZOOAPI int is_unrecoverable(zhandle_t * zh); ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel); ZOOAPI void zoo_set_log_stream(FILE * logStream); ZOOAPI void zoo_deterministic_conn_order(int yesOrNo); ZOOAPI int zoo_create(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); ZOOAPI int zoo_delete(zhandle_t * zh, const char *path, int version); ZOOAPI int zoo_exists(zhandle_t * zh, const char *path, int watch, struct Stat *stat); ZOOAPI int zoo_wexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct Stat *stat); ZOOAPI int zoo_get(zhandle_t * zh, const char *path, int watch, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_wget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_set(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version); ZOOAPI int zoo_set2(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); ZOOAPI int zoo_get_children(zhandle_t * zh, const char *path, int watch, struct String_vector *strings); ZOOAPI int zoo_wget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings); ZOOAPI int zoo_get_children2(zhandle_t * zh, const char *path, int watch, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_wget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_get_acl(zhandle_t * zh, const char *path, struct ACL_vector *acl, struct Stat *stat); ZOOAPI int zoo_set_acl(zhandle_t * zh, const char *path, int version, const struct ACL_vector *acl); ZOOAPI int zoo_multi(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results);
除了基本的初始化、銷毀 Zookeeper 句柄(zhandle),設置日志等級、日志流以及一些具有輔助功能 API(zerror(), zoo_state()等) 外,Zookeeper C API 大部分接口可以根據同步和異步特性分為兩類,同步接口以 zoo_* 開頭,異步接口以則以 zoo_a* 開頭。並且除了 zookeeper_init() 以及與 zoo_multi() 或 zoo_amulti() 批量操作相關的 zoo_op_t 初始化外,其他的 API 的第一個參數均為 zhandle_t * zh, 即 Zookeeper 句柄的指針。
Zookeeper C API 分類
- 初始化、銷毀 Zookeeper 句柄
ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t * clientid, void *context, int flags); ZOOAPI int zookeeper_close(zhandle_t * zh);
- 輔助函數
// 設置日志等級、日志流 ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel); ZOOAPI void zoo_set_log_stream(FILE * logStream); ZOOAPI const clientid_t *zoo_client_id(zhandle_t * zh); ZOOAPI int zoo_recv_timeout(zhandle_t * zh); ZOOAPI const void *zoo_get_context(zhandle_t * zh); ZOOAPI void zoo_set_context(zhandle_t * zh, void *context); ZOOAPI watcher_fn zoo_set_watcher(zhandle_t * zh, watcher_fn newFn); ZOOAPI struct sockaddr *zookeeper_get_connected_host(zhandle_t * zh, struct sockaddr *addr, socklen_t * addr_len); ZOOAPI int zookeeper_interest(zhandle_t * zh, int *fd, int *interest, struct timeval *tv); ZOOAPI int zookeeper_process(zhandle_t * zh, int events); ZOOAPI int zoo_state(zhandle_t * zh); ZOOAPI const char *zerror(int c); ZOOAPI int is_unrecoverable(zhandle_t * zh); ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
- 與 zoo_multi() 和 zoo_amulti() 批量操作相關的 zoo_op_t 初始化
void zoo_create_op_init(zoo_op_t * op, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); void zoo_delete_op_init(zoo_op_t * op, const char *path, int version); void zoo_set_op_init(zoo_op_t * op, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); void zoo_check_op_init(zoo_op_t * op, const char *path, int version);
- Zookeeper C API 同步接口
ZOOAPI int zoo_add_auth(zhandle_t * zh, const char *scheme, const char *cert, int certLen, void_completion_t completion, const void *data); ZOOAPI int zoo_create(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); ZOOAPI int zoo_delete(zhandle_t * zh, const char *path, int version); ZOOAPI int zoo_exists(zhandle_t * zh, const char *path, int watch, struct Stat *stat); ZOOAPI int zoo_wexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct Stat *stat); ZOOAPI int zoo_get(zhandle_t * zh, const char *path, int watch, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_wget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_set(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version); ZOOAPI int zoo_set2(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); ZOOAPI int zoo_get_children(zhandle_t * zh, const char *path, int watch, struct String_vector *strings); ZOOAPI int zoo_wget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings); ZOOAPI int zoo_get_children2(zhandle_t * zh, const char *path, int watch, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_wget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_get_acl(zhandle_t * zh, const char *path, struct ACL_vector *acl, struct Stat *stat); ZOOAPI int zoo_set_acl(zhandle_t * zh, const char *path, int version, const struct ACL_vector *acl); ZOOAPI int zoo_multi(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results);
- Zookeeper C API 異步接口
ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data); ZOOAPI int zoo_adelete(zhandle_t * zh, const char *path, int version, void_completion_t completion, const void *data); ZOOAPI int zoo_aexists(zhandle_t * zh, const char *path, int watch, stat_completion_t completion, const void *data); ZOOAPI int zoo_awexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget(zhandle_t * zh, const char *path, int watch, data_completion_t completion, const void *data); ZOOAPI int zoo_awget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, data_completion_t completion, const void *data); ZOOAPI int zoo_aset(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget_children(zhandle_t * zh, const char *path, int watch, strings_completion_t completion, const void *data); ZOOAPI int zoo_awget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_completion_t completion, const void *data); ZOOAPI int zoo_aget_children2(zhandle_t * zh, const char *path, int watch, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_awget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_async(zhandle_t * zh, const char *path, string_completion_t completion, const void *data); ZOOAPI int zoo_aget_acl(zhandle_t * zh, const char *path, acl_completion_t completion, const void *data); ZOOAPI int zoo_aset_acl(zhandle_t * zh, const char *path, int version, struct ACL_vector *acl, void_completion_t, const void *data); ZOOAPI int zoo_amulti(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results, void_completion_t, const void *data);
Zookeeper C API 初體驗
有了上面的介紹,下面我們來看一看如何使簡單地使用 Zookeeper C API 吧。
在使用 Zookeeper C API 時應注意:
- 頭文件包含 #include <zookeeper/zookeeper.h>
- 如果你需要編譯多線程版本客戶端程序,請添加編譯選項 -DTHREADED,同時鏈接時應鏈接 zookeeper_mt 庫;如果你需要編譯單線程客戶端程序,請不要添加編譯選項 -DTHREADED,同時鏈接時應鏈接 zookeeper_st 庫。
一個基本的程序如下(更詳細的例子可以參看 src/c/src/cli.c):
/* * ============================================================================= * * Filename: zktest.c * * Description: zookeeper api testcase. * * Created: 02/15/2013 08:48:49 PM * * Author: Fu Haiping (forhappy), haipingf@gmail.com * Company: ICT ( Institute Of Computing Technology, CAS ) * * ============================================================================= */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <zookeeper/zookeeper.h> #include <zookeeper/zookeeper_log.h> void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) { printf("Something happened.\n"); printf("type: %d\n", type); printf("state: %d\n", state); printf("path: %s\n", path); printf("watcherCtx: %s\n", (char *)watcherCtx); } void zktest_dump_stat(const struct Stat *stat) { char tctimes[40]; char tmtimes[40]; time_t tctime; time_t tmtime; if (!stat) { fprintf(stderr,"null\n"); return; } tctime = stat->ctime/1000; tmtime = stat->mtime/1000; ctime_r(&tmtime, tmtimes); ctime_r(&tctime, tctimes); fprintf(stderr, "\tctime = %s\tczxid=%llx\n" "\tmtime=%s\tmzxid=%llx\n" "\tversion=%x\taversion=%x\n" "\tephemeralOwner = %llx\n", tctimes, stat->czxid, tmtimes, stat->mzxid, (unsigned int)stat->version, (unsigned int)stat->aversion, stat->ephemeralOwner); } void zktest_stat_completion(int rc, const struct Stat *stat, const void *data) { fprintf(stderr, "%s: rc = %d Stat:\n", (char*)data, rc); zktest_dump_stat(stat); } void zktest_void_completion(int rc, const void *data) { fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc); } void zktest_string_completion(int rc, const char *name, const void *data) { fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc); if (!rc) { fprintf(stderr, "\tname = %s\n", name); } } int main(int argc, const char *argv[]) { const char* host = "127.0.0.1:2181,127.0.0.1:2182," "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185"; int timeout = 30000; zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); zhandle_t* zkhandle = zookeeper_init(host, zktest_watcher_g, timeout, 0, "hello zookeeper.", 0); if (zkhandle == NULL) { fprintf(stderr, "Error when connecting to zookeeper servers...\n"); exit(EXIT_FAILURE); } // struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}}; // struct ACL_vector ALL_PERMS = {1, ALL_ACL}; int ret = zoo_acreate(zkhandle, "/xyz", "hello", 5, &ZOO_OPEN_ACL_UNSAFE, 0 /* ZOO_SEQUENCE */, zktest_string_completion, "acreate"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "acreate"); exit(EXIT_FAILURE); } ret = 0; ret = zoo_aexists(zkhandle, "/xyz", 1, zktest_stat_completion, "aexists"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "aexists"); exit(EXIT_FAILURE); } ret = 0; // Wait for asynchronous zookeeper call done. getchar(); ret = zoo_adelete(zkhandle, "/xyz", -1, zktest_void_completion, "adelete"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "adelete"); exit(EXIT_FAILURE); } // Wait for asynchronous zookeeper call done. getchar(); zookeeper_close(zkhandle); }
下面簡單講講這個程序的結構:
- 首先聲明 host, timeout 變量。
const char* host = "127.0.0.1:2181,127.0.0.1:2182," "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185"; int timeout = 30000;
其中 host 字符串格式為逗號隔開的 IP:PORT對,可以是 Zookeeper 集群中的全部或部分 Zookeeper 實例的 IP:PORT對,我們在第一講《准備工作》中介紹了如何部署一個偽分布式的集群,上述的 host 就是這些 zookeeper 實例的IP:PORT對。
另外 timeout 是 Zookeeper 客戶端連接服務器的超時時間,單位為毫秒,timeout = 30000 說明如果 30 秒內客戶端沒有連接上 Zookeeper 服務則表示連接超時。
- 設置日志等級。
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- 初始化 Zookeeper 句柄(zhandle_t)。
zhandle_t* zkhandle = zookeeper_init(host, zktest_watcher_g, timeout, 0, "hello zookeeper.", 0); if (zkhandle == NULL) { fprintf(stderr, "Error when connecting to zookeeper servers...\n"); exit(EXIT_FAILURE); }
初始化 Zookeeper 句柄 zookeeper_init() 函數原型如下:
ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t * clientid, void *context, int flags);
各個參數解釋如下:
| host | 逗號隔開的 host:port 對, 每個代表一個 zk server, 例如: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" |
| fn | 全局的監視器回調函數,當發生事件通知時,該函數會被調用。 |
| clientid | 客 戶端嘗試重連的先前會話的ID,如果不需要重連先前的會話,則設置為 0。客戶端可以通過調用 zoo_client_id來訪問一個已經連接上的並且有效的會話ID,如果clientid對應的會話超時,或者由於某種原因 clientid變為無效了,那么zookeeper_init 將返回一個非法的 zhandle_t, 通過 zhandle_t 的狀態可以獲知 zookeeper_init 調用失敗的原因。 (通常為 ZOO_EXPIRED_SESSION_STATE). |
| context | 與 zhandle_t 實例相關聯的“上下文對象”(可以通過該參數為 zhandle_t 傳入自定義類型的數據),應用程序可以通過 zoo_get_context 訪問它(例如在監視器回調函數中),當然 zookeeper 內部沒有用到該參數,所以 context 可以設置為 NULL。 |
| flags | 目前為保留參數,設置為 0。 |
- 創建一個 znode 節點。
// struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}}; // struct ACL_vector ALL_PERMS = {1, ALL_ACL}; int ret = zoo_acreate(zkhandle, "/xyz", "hello", 5, &ZOO_OPEN_ACL_UNSAFE, 0 /* ZOO_SEQUENCE */, zktest_string_completion, "acreate"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "acreate"); exit(EXIT_FAILURE); }
這里采用異步方式創建 znode 節點,zoo_acreate() 函數原型為:
ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data);
各個參數解釋如下:
| zh | zookeeper_init() 返回的 zookeeper 句柄。 |
| path | 節點路徑。 |
| value | 該節點保存的數據。 |
| valuelen | 該節點保存數據的大小。 |
| acl | 該節點初始 ACL,ACL 不能為null 或空。 |
| flags | 該參數可以設置為 0,或者創建標識符 ZOO_EPHEMERAL, ZOO_SEQUENCE 的組合或(OR)。 |
| completion | 當創建節點請求完成時會調用該函數,該函數原型詳見第三講《回調函數》一節。同時傳遞給completion的 rc參數為: ZOK 操作完成;ZNONODE 父節點不存在;ZNODEEXISTS 節點已存在;ZNOAUTH 客戶端沒有權限創建節點。ZNOCHILDRENFOREPHEMERALS 臨時節點不能創建子節點。 |
| data | completino函數被調用時,傳遞給 completion 的數據。 |
- 調用 exists() 函數,設置監視器。
ret = zoo_aexists(zkhandle, "/xyz", 1, zktest_stat_completion, "aexists"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "aexists"); exit(EXIT_FAILURE); }
- 調用 delete 函數,刪除 znode 節點。
ret = zoo_adelete(zkhandle, "/xyz", -1, zktest_void_completion, "adelete"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "adelete"); exit(EXIT_FAILURE); }
- 銷毀 zookeeper 句柄
zookeeper_close(zkhandle);
好了,至此本文大致講完了 Zookeeper C API 的分類和幾個基本函數的用法。之所以為 Zookeeper C API 分類是方便記憶,開發者可以迅速找到自己需要的 API;另外,本文還講了幾個基本函數的使用方法,包括 zookeeper_init(),zoo_acreate(), zoo_aexists(), zoo_adelete() 以及 zookeeper_close()。相信大家對 Zookeeper C API 也有了一個大致的了解,第五講我會給大家介紹 Zookeeper C API 中的同步調用的函數(即以 zoo_* 開頭的函數),然后第六講給大家介紹 Zookeeper C API 中的異步調用的函數(即以 zoo_a* 開頭的函數)。
