gpfdist原理解析


gpfdist原理解析

 

前言gpfdist作為批量向postgresql寫入數據的工具,了解其內部原理有助於正確使用以及提供更合適的數據同步方案。文章先簡要介紹gpfdist的整體流程,然后針對重要步驟詳細展開。文章有的地方可能探索不夠深入,感興趣的可以繼續深入。如有錯誤請指出。

1 整體流程

Gpfdist的整體流程可簡單分為4步。

(1) 解析參數;

(2) 從指定的端口列表中搜尋可用端口;

(3) 監聽第一個可用端口;

(4) 注冊該端口的可讀事件,等待連接請求;

(5) 響應各類事件。

 

下面通過源碼及注釋詳細介紹上述過程。

int main(int argc, const char* const argv[])
{
    if (gpfdist_init(argc, argv) == -1)
        gfatal(NULL, "Initialization failed");
    return gpfdist_run();
}

Main函數很簡短,調用了gpfdist_initgpfdist_run,其中gpfdist_run比較簡單,源碼如下,僅僅調用了libevent的事件分發函數,以回調形式響應各類事件(主要是socket讀寫事件)。

int gpfdist_run()
{
    return event_dispatch();
}

 gpfdist_init比較復雜,完成了libevent的初始化、事件綁定、http服務啟動等功能,源碼如下。其中aprApache可移植運行庫在該項目中主要用於資源管理,不影響理解gpfdist原理,這里不再介紹,有興趣的可參考https://apr.apache.org/

int gpfdist_init(int argc, const char* const argv[])
{
    /*初始化apr資源池*/
    if (0 != apr_app_initialize(&argc, &argv, 0))
        gfatal(NULL, "apr_app_initialize failed");
    atexit(apr_terminate);

    if (0 != apr_pool_create(&gcb.pool, 0))
        gfatal(NULL, "apr_app_initialize failed");

    //apr_signal_init(gcb.pool);
    gcb.session.tab = apr_hash_make(gcb.pool);

    //解析命令行參數
parse_command_line(argc, argv, gcb.pool);
......
    event_init();
signal_register();
//啟動http服務
http_setup();
.....

 gpfdist_init通過調用http_setup函數完成http服務的啟動,http_setup源碼如下,主要功能是測試哪些端口可以使用

http_setup(void)
{
    SOCKET f;
    int on = 1;
    struct linger linger;
    struct addrinfo hints;
    struct addrinfo *addrs, *rp;
    int  s;
    int  i;

    char service[32];
    const char *hostaddr = NULL;
    //綁定gpfdist的文件讀寫函數,用於從文件或其他方式讀寫數據
gpfdist_send    = gpfdist_socket_send;
    gpfdist_receive = gpfdist_socket_receive;
   ......
/* 下面的內容就是從指定端口列表中測試哪些端口可用*/ for (;;) { //利用第一個端口組成socket使用的網絡地址 snprintf(service,32,"%d",opt.p); memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* tcp socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ s = getaddrinfo(hostaddr, service, &hints, &addrs); ....... /* 測試地址是否可用,這個for循環只會執行一次,因為rp->ai_next=0*/ for (rp = addrs; rp != NULL; rp = rp->ai_next) { gprint(NULL, "Trying to open listening socket:\n"); print_listening_address(rp); /* * getaddrinfo gives us all the parameters for the socket() call * as well as the parameters for the bind() call. */ f = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); //設置keep_alive linger等屬性 ...... if (bind(f, rp->ai_addr, rp->ai_addrlen) != 0) { ...... } /* listen with a big queue */ if (listen(f, opt.z)) { ...... } gcb.listen_socks[gcb.listen_sock_count++] = f; gprint(NULL, "Opening listening socket succeeded\n"); } ...... } /* * 為上述可用端口綁定可讀事件響應函數do_accept,用於接收客戶端的連接。 */ for (i = 0; i < gcb.listen_sock_count; i++) { /* when this socket is ready, do accept */ event_set(&gcb.listen_events[i], gcb.listen_socks[i], EV_READ | EV_PERSIST, do_accept, 0); ...... if (event_add(&gcb.listen_events[i], 0)) gfatal(NULL, "cannot set up event on listen socket: %s", strerror(errno)); } }

自此http服務已經建立起來,並准備好接收postgresql segment的連接。

 

2 核心數據結構間的聯系

  接下來說明一下gpfdist中的幾個核心數據結構及其之間的關系,便於對下文代碼邏輯關系的理解。

  session_t是一次會話,由成員key唯一標識,key = tid:pathtid = xid.cid.sn,其中xid是事務idcid是查詢命令id,每次查詢時屬於同一個sqlsegment請求的xidcid相同,但由於各segment請求的path可能不同,因此同一個查詢的不同segment請求可能屬於不同session。另外注意tid長度不能超過1023字節。

  request_t代表一個segment的請求,因此session_t對應多個request_t

  fstream_t代表屬於同一session_trequest_t想要請求的數據流,其成員glob_and_copy_t包含多個文件地址,fstream_t會順序讀取這些文件回應給segment

 1 核心數據結構

3 接受連接

  http服務接收到客戶端連接后由do_accept函數響應,該函數首先接收客戶端連接,並給該連接設置非阻塞等屬性,接着創建request_t對象並初始化其部分屬性,最后調用setup_read函數為該接綁定讀事件響應函數do_read_request,到此gpfdist已經與客戶端建立了連接並開始等待客戶端的http請求。

static void do_accept(int fd, short event, void* arg)
{
    address_t           a;
    socklen_t           len = sizeof(a);
    SOCKET              sock;
    request_t*          r;
    apr_pool_t*         pool;
    int                 on = 1;
    struct linger       linger;

    /* do the accept */
    if ((sock = accept(fd, (struct sockaddr*) &a, &len)) < 0)
    {
        gwarning(NULL, "accept failed");
        goto failure;
    }

    /* set to non-blocking, and close-on-exec */
    ......
    /* set keepalive, reuseaddr, and linger */
    ......
    /* create a pool container for this socket */
    ......
    /* 調用setup_read為上述socket設置讀事件響應函數do_read_request */
    if (setup_read(r))
    {
        http_error(r, FDIST_INTERNAL_ERROR, "internal error");
        request_end(r, 1, 0);
    }
    return;
}

 

 接收請求后的處理

  如圖2gpfdist接收到http請求解析出相關參數,包含tid、cid、文件路徑等信息,然后綁定到對應session上,根據請求類型分別調用不同函數完成對segment的響應。下面着重講解路徑提取、session綁定兩個操作的細節。

 2 接收請求

 

1)路徑提取

  segment請求中路徑參數格式如下所示:

1.csv空格t*.csv

(注意:該串不能含有相對路徑”..”)

gpfdist會遍歷該字符串,以空格為分隔符提取所有文件路徑,並在每個路徑前拼接gpfdist啟動時命令行輸入的目錄,最終得到如下路徑:

/home/test/data/1.csv 空格/home/test/data/t*.csv

轉換后的路徑將用於后面的文件讀取寫入操作。

 

2session與連接綁定

  接收到segmenthttp請求后需要將其與session綁定,流程如圖3。首先根據請求的key查找對應的session是否存在,存在則請求與session綁定,否則就新建並初始化fstream_tsession對象。

 

 3 綁定session

 

  新建fstream_t時會重新組織文件路徑並檢查是否有操作權限。首先把上文轉換后的路徑以空格分開,然后將每一個路徑中包含的通配符解析成具體的文件名,得到如下的路徑列表(這里假設目錄下存在t1.csv  t2.csv):

/home/test/data/1.csv

/home/test/data/t1.csv

/home/test/data/t2.csv

后嘗試打開上述文件以測試是否有操作權限。

4 GET請求

  如果segmentGET請求 對應的socket會被設置可寫事件響應函數do_write,其流程如4

4 發送數據

 

  在讀取一個數據塊時,gpfdist采用整行讀取方式,即每次回應的業務數據一定是源文件的完整若干行,目前gpfdist對於csv文件僅支持\n  \r  \r\n 三種行分隔符,但可通過修改scan_csv_records_crlf函數支持其他類型的行分隔符,另外csv文件允許數據中含有行分隔符;對於text格式的文件,行分隔只支持\n

  gpfdist會將本次讀取到的數據的元信息填充到回應頭部,包含本次回應的業務數據的長度、行數、文件名、在文件中的偏移等信息。

5 POST請求

  圖5gpfdistpost請求(寫請求)的處理流程,不再詳細展開。

  5 數據寫入文件

6 外表文件個數與segment數量的關系

  在此只針對文件形式的讀外表進行分析,讀外表的創建語句如下:

create external table test
(
  id integer,
  name varchar
)
location (‘gpfdist://$IP:$PORT/$file_name[,..])
format ‘csv’(delimiter’,’)
;

  從以上語句可以看出,外表可以配置多個文件,但應注意配置的文件數量與segment存在以下關系:

(1) 只有一個文件(通配符計為一個文件)

  每個segment都會請求該文件的數據,當數據量小時,有的segment可能獲取不到數據,這不會對表的讀取造成任何影響。

 

(2) 配置兩個以上文件

  • 文件數量 < segment數量

    postgresql會給每個segment分配一個文件進行讀取。

  • 文件數量 > segment

    gpfdist報錯,讀表失敗。

 

 

參考:

https://docs.greenplum.org/6-12/common/gpdb-features.html

https://greenplum.org/readable-external-protocol-gpfdist/

https://greenplum.org/introduction-writable-gpfdist/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM