libuv 簡單使用


libuv 簡單使用

來源:https://zhuanlan.zhihu.com/p/50497450

前序:說說為啥要研究libuv,其實在很久之前(大概2年前吧)玩nodejs的時候就對這個核心庫非常感興趣,不過由於當年水平確實比較菜,大概看了看之后實在沒能靜下心來看下去。18年初的時候,360直播雲官網做了React同構,那個時候我問自己如果真有百萬並發,每天億級的訪問量有沒有信心保證中間node層一次不掛(或者不出任何事故),其實我到今天仍然是沒有足夠底氣的。原因有兩個吧:一是對nodejs和它底層的內容還遠遠不夠了解,二是對監控層面做的不夠好。我們大概也都知道alinode,他們早在3 4年前就在nodejs上做了很多工作,比如v8內存監控等,但是比較遺憾的是alinode至今沒有開源。於是乎有了我的第一篇關於libuv的文章,后面爭取還會更新nodejs、v8等相關的內容。 本文從下面幾個方面來介紹libuv,通過fs、net兩方面介紹libuv的思想。

如何安裝、使用libuv這個框架

首先我們可以在libuv上找到libuv這個框架,在README.md里,我們就可以在Build Instructions找到安裝方法,作者電腦操作系統是macox(所以后面的實例也是以linux、unix為主,不會講windows)。我們首先把項目clone到我們的電腦上,在項目根目錄執行一下的命令,在執行過程中可能會出現各種底層庫沒有安裝的情況,按照提示自行安裝就可以了,作者在執行 xcodebuild 的時候發現不能加上 -target All 的參數,不加的話可以順利build過去。

$ ./gyp_uv.py -f xcode
$ xcodebuild -ARCHS="x86_64" -project uv.xcodeproj \
     -configuration Release -target All

build完成后 我們可以在項目目錄里找到 build/Release/libuv.a 文件,這個就是編譯后的文件了,我們稍后會用到。 准備工作做好之后我們就可以創建一個C或者C++的工程了,在Mac上我一般使用xcode來編寫oc、c、c++的項目。 首先創建一個C項目,這個時候我們需要把我們之前編譯的libuv.a的文件加入到項目的依賴中,我們在Build Phases中的 Link Binary with Libraries中添加libuv.a的路徑,同時我們需要在項目根目錄引入uv.h等文件頭。准備工作做好之后,我們就開始學習怎么寫標准的hello world了 哈哈哈哈。

#include <stdio.h>
#include <stdlib.h>
#include <uv.h>

int main() {
    uv_loop_t *loop = malloc(sizeof(uv_loop_t));
    uv_loop_init(loop);

    printf("Now quitting.\n");
    uv_run(loop, UV_RUN_DEFAULT);

    uv_loop_close(loop);
    free(loop);
    return 0;
}

上述代碼僅僅初始化了一個loop循環,並沒有執行任何內容,然后就close且退出了。雖然上述代碼並沒有利用libuv的async功能,但是給我們展示了 uv_loop_init uv_run 兩個核心函數。我們稍后會介紹他們做了什么。

先從一個數據結構開始

在開始介紹整個整個libuv之前,我不得不首先介紹一個數據結構,因為這個數據結構在libuv里無處不在,這個數據結構就是--循環雙向鏈表。 我們在項目根目錄下的src目錄可以找到queue.h的頭文件。不錯,這個數據結構就是用宏實現的,那我讓我們一起來學習一下什么是鏈表。

鏈表的定義:

鏈表是一種物理存儲單元上非連續、非順序的存儲結構

那什么是雙向鏈表呢?

雙向鏈表其實就是頭尾相連

那什么是雙向循環鏈表呢?

看圖我們就明白了,所謂的循環鏈表就是把頭尾相連。

來看一下 queue.h 是怎么實現的

#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - offsetof(type, field)))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

上述代碼我只截取了部分的實現 其實這里我只想講兩個點 1:QUEUE_NEXT 的實現

(*(QUEUE **) &((*(q))[0]))

在這個宏里,他為什么用這個復雜的方式來實現呢? 其實他有兩個目的:強制類型轉換、成為左值

*(q))[0]

這個步驟是取到數組的第一個元素

(QUEUE **)

這個步驟進行強制類型轉換

(*(nnn) &(xxx))

這個步驟目的就是為了使xxx成為左值

2:QUEUE_DATA 獲取鏈表的值 巧妙的使用了地址的偏移量來完成

來看一個使用queue.h的demo吧

#include "queue.h"
#include <stdio.h>

static QUEUE* q;
static QUEUE queue;

struct user_s {
    int age;
    char* name;
    QUEUE node;
};

int main() {
    struct user_s* user;
    struct user_s john;
    struct user_s henry;

    john.name = "john";
    john.age = 44;
    henry.name = "henry";
    henry.age = 32;

    QUEUE_INIT(&queue);
    QUEUE_INIT(&john.node);
    QUEUE_INIT(&henry.node);
    QUEUE_INIT(&willy.node);
    QUEUE_INIT(&sgy.node);

    ((*(&queue))[0]) = john.node;
    (*(QUEUE **) &((*(&queue))[0])) = &john.node;

    QUEUE_INSERT_TAIL(&queue, &john.node);
    QUEUE_INSERT_TAIL(&queue, &henry.node);

    q = QUEUE_HEAD(&queue);

    user = QUEUE_DATA(q, struct user_s, node);

    printf("Received first inserted user: %s who is %d.\n",
           user->name, user->age);

    QUEUE_REMOVE(q);

    QUEUE_FOREACH(q, &queue) {
        user = QUEUE_DATA(q, struct user_s, node);

        printf("Received rest inserted users: %s who is %d.\n",
               user->name, user->age);
    }

    return 0;
}

從上面代碼可以總結出5個方法 QUEUE_INIT 隊列初始化 QUEUE_INSERT_TAIL 插入到隊尾 QUEUE_HEAD 頭部第一個元素 QUEUE_DATA 獲得元素的內容 QUEUE_REMOVE 從隊列中移除元素

那雙向循環鏈表就先簡單介紹到這。

libuv的核心

libuv為什么可以這么高效呢?實際他使用了操作系統提供的高並發異步模型

linux: epoll

freebsd: kqueue

windows: iocp

每個我們常見的操作系統都為我們封裝了類似的高並發異步模型,那libuv其實就是對各個操作系統進行封裝,最后暴露出統一的api供開發者調用,開發者不需要關系底層是什么操作系統,什么API了。 我們來看一下同步模型和異步模型的區別

阻塞模型

 

我們在一個線程中調用網絡請求,之后線程就會被阻塞,直到返回結果才能繼續執行線程

異步模型

 

在異步模型中 我們調用網絡請求后不在去直接調用accept阻塞線程,而是輪詢fd是否發生變化,在返回內容后我們在調用cb執行我們的代碼,這個過程是非阻塞的。 說了這么多我們通過2個例子了解一下其中的原理。

學習如何建立一個socket

我們首先了解一下 C是如何創建socket的,之后我們在看一下如果通過高並發異步模型來創建socket,最后我們在了解一下 libuv下怎么創建socket。

C如何創建一個socket呢?

#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>

#define MYPORT  8887
#define QUEUE   20
#define BUFFER_SIZE 1024

int main()
{
    //定義sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX協議) AF_ROUTE(路由套接字) AF_KEY(秘鑰套接字)
    // SOCK_STREAM(字節流套接字) SOCK_DGRAM
    int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);

    ///定義sockaddr_in
    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(MYPORT);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    ///bind,成功返回0,出錯返回-1
    if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }

    printf("監聽%d端口\n", MYPORT);
    ///listen,成功返回0,出錯返回-1
    if(listen(server_sockfd, QUEUE) == -1)
    {
        perror("listen");
        exit(1);
    }

    ///客戶端套接字
    char buffer[BUFFER_SIZE];
    struct sockaddr_in client_addr;
    socklen_t length = sizeof(client_addr);

    printf("等待客戶端連接\n");
    ///成功返回非負描述字,出錯返回-1
    int conn = accept(server_sockfd, (struct sockaddr*)&client_addr, &length);
    if(conn<0)
    {
        perror("connect");
        exit(1);
    }
    printf("客戶端成功連接\n");

    while(1)
    {
        memset(buffer,0,sizeof(buffer));
        long len = recv(conn, buffer, sizeof(buffer), 0);
        //客戶端發送exit或者異常結束時,退出
        ;
        if(strcmp(buffer,"exit\n")==0 || len<=0) {
            printf("出現異常");
            break;
        }

        printf("來自客戶端數據:\n");
        fwrite(buffer, len, 1, stdout);
        send(conn, buffer, len, 0);
        printf("發送給客戶端數據:\n");
        fwrite(buffer, len, 1, stdout);
    }
    close(conn);
    close(server_sockfd);
    return 0;
}

代碼一大坨,其實上我們簡單拆分一下

第一步:創建socket 文件描述符
第二步:定義socket addr
第三步:綁定文件描述符和地址  bind
第四步:監聽文件描述符 listen
第五步:等待socket返回內容 accept
第六步:接收信息 recv

那我們如何使用kqueue來創建socket呢?

由於作者電腦是macos,所以只能使用kqueue,不能使用epoll。

#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>

#define MYPORT  8887
#define QUEUE   20
#define BUFFER_SIZE 1024

int main()
{
    // 定義sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX協議) AF_ROUTE(路由套接字) AF_KEY(秘鑰套接字)
    // SOCK_STREAM(字節流套接字) SOCK_DGRAM
    int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);

    // 定義sockaddr_in
    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(MYPORT);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    // bind,成功返回0,出錯返回-1
    if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }

    printf("監聽%d端口\n", MYPORT);
    // listen,成功返回0,出錯返回-1
    if(listen(server_sockfd, QUEUE) == -1)
    {
        perror("listen");
        exit(1);
    }

    //創建一個消息隊列並返回kqueue描述符
    int kq =  kqueue();
    struct kevent change_list;  //想要監控的事件
    struct kevent event_list[10000];  //用於kevent返回
    char buffer[1024];
    int nevents;
    // 監聽sock的讀事件
    EV_SET(&change_list, server_sockfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
    while(1) {
        printf("new loop...\n");
        // 等待監聽事件的發生
        nevents = kevent(kq, &change_list, 1, event_list, 2, NULL);
        if (nevents < 0) {
            printf("kevent error.\n");  // 監聽出錯
        } else if (nevents > 0) {
            printf("get events number: %d\n", nevents);
            for (int i = 0; i < nevents; ++i) {
                printf("loop index: %d\n", i);
                struct kevent event = event_list[i]; //監聽事件的event數據結構
                int clientfd = (int) event.ident;  // 監聽描述符
                // 表示該監聽描述符出錯
                if (event.flags & EV_ERROR) {
                    close(clientfd);
                    printf("EV_ERROR: %s\n", strerror(event_list[i].data));
                }
                // 表示sock有新的連接
                if (clientfd == server_sockfd) {
                    printf("new connection\n");
                    struct sockaddr_in client_addr;
                    socklen_t client_addr_len = sizeof(client_addr);
                    int new_fd = accept(server_sockfd, (struct sockaddr *) &client_addr, &client_addr_len);
                    long len = recv(new_fd, buffer, sizeof(buffer), 0);
                    char remote[INET_ADDRSTRLEN];
                    printf("connected with ip: %s, port: %d\n",
                           inet_ntop(AF_INET, &client_addr.sin_addr, remote, INET_ADDRSTRLEN),
                           ntohs(client_addr.sin_port));
                    send(new_fd, buffer, len, 0);
                }
            }
        }
    }
    return 0;
}

我們可以看到,listen之前都是一樣的,不在贅述,簡化一下后面的步驟

第一步:創建 kqueue描述符
第二部:監聽socket讀事件 EV_SET
第三步:綁定kq 和 change_list kevent

一直while循環直到kevent返回可以的文件描述符數量 那到這里其實我們就完全弄懂了 如何直接用C寫出高並發異步是怎么運行的。那么我們就看看使用libuv的例子吧

使用libuv的scoket

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>

#define DEFAULT_PORT 7000
#define DEFAULT_BACKLOG 128

uv_loop_t *loop;
struct sockaddr_in addr;

typedef struct {
    uv_write_t req;
    uv_buf_t buf;
} write_req_t;

void free_write_req(uv_write_t *req) {
    write_req_t *wr = (write_req_t*) req;
    free(wr->buf.base);
    free(wr);
}

void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
    buf->base = (char*) malloc(suggested_size);
    buf->len = suggested_size;
}

void on_close(uv_handle_t* handle) {
    free(handle);
}

void echo_write(uv_write_t *req, int status) {
    if (status) {
        fprintf(stderr, "Write error %s\n", uv_strerror(status));
    }
    free_write_req(req);
}

void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
    if (nread > 0) {
        write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
        req->buf = uv_buf_init(buf->base, nread);
        fwrite(buf->base, 30, 1, stdout);
        uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);
        return;
    }
    if (nread < 0) {
        if (nread != UV_EOF)
            fprintf(stderr, "Read error %s\n", uv_err_name(nread));
        uv_close((uv_handle_t*) client, on_close);
    }

    free(buf->base);
}

void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }
    else {
        uv_close((uv_handle_t*) client, on_close);
    }
}

int main() {
    loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);

    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        return 1;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
}

實際上整體我們都可以把libuv和我們原生的c kqueue進行一一對應,發現相差不多,唯一不同是我們需要定義 uv_loop 這個內部循環,后面我們在來講套循環機制。

學習如何進行文件讀寫

我們學習完了網絡,那么我們再來看看文件i/o是怎么處理的。

 

剛剛我們玩轉了socket來看這張圖是不是很熟悉?但是發現右側有了很大的不同。文件操作、DNS、用戶代碼不是基於epoll這種模型嗎? 顯而易見我們有了答案,這是為什么呢?其實很簡單文件的很多操作就是同步的,但是libuv為了統一異步,利用開辟線程進行文件等操作模擬了異步的過程!!原來我們用了這么久才發現他是個騙子。哈哈!其實是我們學藝不精。 那其實講到這里文件讀寫其實講的差不多了,我們還是來看看例子吧!

#include <stdio.h>
#include <uv.h>

uv_fs_t open_req;
uv_fs_t _read;

static char buffer[1024];
static uv_buf_t iov;

void on_read(uv_fs_t *req) {
    printf("%s\n",iov.base);
}
void on_open(uv_fs_t *req) {
    printf("%zd\n",req->result);
    iov = uv_buf_init(buffer, sizeof(buffer));
    uv_fs_read(uv_default_loop(), &_read, (int)req->result,
               &iov, 1, -1, on_read);
}
int main() {
    const char* path = "/Users/sgy/koa/package.json";
    // O_RDONLY 、 O_WRONLY 、 O_RDWR 、 O_CREAT
    uv_fs_open(uv_default_loop(), &open_req, path, O_RDONLY, 0, on_open);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    uv_fs_req_cleanup(&open_req);
    return 0;
}

其實libuv底層對文件open和read的操作是分開的。 看到這里文件api沒啥講的了,我們來簡單講講線程池。

線程池

線程池就是對線程的統一管理,預先創建出線程,如果有任務就把任務放到線程池里去執行。

 

通過上圖我們可以看到有任務進來首先會插入到鏈表中進行排隊等待, 直到線程空余就會去鏈表中去取。 通過閱讀 src/threadpool.c文件我們可以了解 MAX_THREADPOOL_SIZE 128 最大線程為128個 default_threads[4] 默認只會開辟4個線程 如果你對底層不了解 那當你在進行大量的文件i/o時 線程池數量就是阻礙你的最大障礙。 為啥最大只能創建128個線程呢?因為大多數操作系統創建一個線程大概花費1M的內存空間,外加用戶本身代碼也要占用大量的內存,所以這里設置了最大128的限制。

了解libuv的循環機制

我們通過網絡和文件了解了libuv,那么我們來看看libuv的循環機制

uv_loop_t *loop;
 loop = uv_default_loop()
 uv_run(loop, UV_RUN_DEFAULT);

首先我們會創建 loop 然后一系列的騷操作之后 最后我們執行了uv_run 嗯嗯 那uv_run 肯定是突破口了 在src/unix/core.c 文件里 我們找到了 uv_run的定義

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    ran_pending = uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

從代碼中 我們就可以總結出libuv的運行周期 通過while循環不斷的查詢 loop中是否有停止符 如果有則退出 否則就不停的進行循環。

 

上面的圖已經清楚的描述我們uv_run的流程了 那其中的核心 就在*uvio_poll* 中 例如在 src/unix/linux-core.c 中的uvio_poll函數 我們就可以找到 我們 epoll 熟悉的身影了。實現邏輯也和我們之前使用過的差不多。

總結

洋洋灑灑寫了這么多,最后總結一下也提出自己的思考。 其實libuv底層的 actor模型是非常高效的,很多游戲服務器內核也使用actor模型,那相對於火的不行的go(協程模型) nodejs一直沒有在服務端發揮它的高效呢? 我覺得其實原因很簡單,因為nodejs他並不高效,我覺得nodejs能夠快速的被開發出來並且js運行如此高效 v8功不可沒。但是成也v8敗也v8,JIT優化的在好 依然和編譯型語言相差甚遠。 但是一點的性能是阻礙大數據等框架使用go而不是用nodejs的原因嗎?我覺得其實並不是,最大的原因我覺得是生態!非常多的Apache開源框架使用java編寫,很多大數據使用go來承載,nodejs有什么頂級生態嗎?我覺得並沒有,他大多數面向的是前端這個群體導致他的生態的發展。 謝謝大家能看到這里,上述的心得都是近期整理的,如果有不對的地方歡迎大家多多批評。上述內容如果轉載請附帶原文鏈接,感謝。

 

================= End

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM