一 前言
截止到目前為止,雖然並不完美,但是JDFS已經初步具備了完整的分布式文件管理功能了,包括:文件的冗余存儲、文件元信息的查詢、文件的下載、文件的刪除等。本文將對JDFS做一個總體的介紹,主要是介紹JDFS的整體架構,流程圖等,另外還會介紹如何安裝部署運行JDFS.當然正如前面幾篇博客里筆者提到的,JDFS並不完美,有一些潛在的難以發現的bug偶爾會出現,這個有賴於后續的不斷測試、調試來解決。如果你是第一次閱讀JDFS系列博客,筆者建議先讀一下該系列的另外幾篇博客,其鏈接如下:
- JDFS:一款分布式文件管理實用程序第一篇(線程池、epoll、上傳、下載) 點擊我
- JDFS:一款分布式文件管理實用程序第二篇(更新升級、解決一些bug) 點擊我
- JDFS:一款分布式文件管理系統,第三篇(流式雲存儲) 點擊我
- JDFS:一款分布式文件管理系統,第四篇(流式雲存儲續篇) 點擊我
JDFS代碼已經上傳到github,地址請點擊我
PS: 本篇博客是博客園用戶“cs小學生”的原創作品,轉載請注明原作者和原文鏈接,謝謝。
二 JDFS 架構概覽
在前四篇博客中,筆者以文字、代碼片段的方式敘述了JDFS的技術細節,每一篇都是針對某個特定功能的。而現在JDFS已經作為一個整體初步完成了,所以在這里我們將從總體上看一下JDFS長什么樣,由哪些component組成,從客戶端提交任務到任務完成走了一個怎樣的路徑。接下來將用3個小節來分別介紹一下:JDFS服務端的架構、文件流式冗余存儲的流程圖、文件下載的執行流。
1 JDFS服務端的架構
我們先來看一張JDFS server端的架構示意圖:
如上圖所示,即為Server端的架構示意圖,在JDFS中,name node和data node上面跑的server均是采用的圖中的架構。從圖中可以看出,server端主要包括兩個大的部分:監聽客戶端事件的模塊(圖左),線程池&作業隊列(圖右)。監聽模塊監聽客戶端的事件,如果是連接事件,則服務端接受之,不需要線程池的參與,而如果epoll監聽到讀事件(也即客戶端發送數據過來),則把該事件打包成一個task,加入到作業隊列中,線程池中的線程會時時刻刻盯着作業隊列,有task則試圖從隊列中取出task來執行。在這里不管是server端把task加入到作業隊列還是線程池里的線程從task隊列中取出task都需要在同步鎖的保護下互斥地執行,以保證數據的正確訪問。
上一段描述了server端執行的大致過程,現在我們分別介紹下server-epoll端、線程池都有哪些函數來支撐的,然后結合着圖中的執行流以及實現的函數,詳細描述一下服務端的執行細節。
server-epoll端實現的函數如下:
1 int Http_server_bind_and_listen(char *ip, int port, int *server_listen_fd); 2 int Http_server_body(char *ip, int port, int *server_listen_fd, threadpool *pool); 3 void *Http_server_callback(void *arg);
線程池&作業隊列端實現的函數如下:
1 int threadpool_create(threadpool *pool, int num_of_thread, int max_num_of_jobs_of_one_task_queue); 2 int threadpool_add_jobs_to_taskqueue(threadpool *pool, void * (*call_back_func)(void *arg), void *arg); 3 int threadpool_fetch_jobs_from_taskqueue(threadpool *pool, job **job_fetched); 4 void *thread_func(void *arg); 5 int destory_threadpool(threadpool *pool);
其中threadpool_add_jobs_to_taskqueue()是線程池暴露給server-epoll端的接口,用來向作業隊列加入task用的。下面我們結合函數接口來詳細介紹一下服務端啟動的過程。
在c語言里,顯然我們需要寫一個main()函數來啟動server,我們需要先創建線程池,然后啟動服務端,可能的啟動代碼如下:
1 threadpool *pool=(threadpool *)malloc(sizeof(threadpool)); 2 threadpool_create(pool, 6, 20); 3 Http_server_body(ip_str,port,&server_listen_fd,pool); 4 destory_threadpool(pool);
首先我們用threadpool_create()函數創建了一個線程池,該線程池中有6個線程,作業隊列中最大容納20個task,在圖中我們可以看到,矩形框代表着線程池,框的右邊有一個while(1),意為線程池是一個無限循環,循環里不斷從task隊列中取task,執行之,執行完畢后,繼續在while循環里互斥地取下一個task.當然截止目前,作業隊列里面還沒有任何task,此時線程池里面所有的線程都卡在pthread_cond_t類型的變量上,后續一旦有task被加入隊列中,被卡住的線程會被喚醒並同時競爭隊列里的task.
現在讓我們把目光從線程池移到server-epoll端。上圖左邊最上邊帶“server-epoll”字樣的矩形框標志着Http_server_body()被調用,服務端已經啟動了。Http_server_body()會先調用Http_server_bind_and_listen()來監聽server端的<ip,port>,后續客戶端的請求都發送到這個ip-端口所代表的服務端上。監聽端口搞定后,接下來我們需要創建一個epoll,並且把監聽的socket fd加入到epoll中,接下來我們就需要在一個無限循環中while(1)來不斷調用epoll_wait()檢測是否有事件發生。上圖中server-epoll矩形框下面有三條紅色虛線箭頭,旁邊有一個while(1)也即代表了我們所說的:在無限循環中監測事件的發生。從圖中紅虛線箭頭所指向的邏輯可以看出來,客戶端發過來的請求有兩種:其一是客戶端的連接請求,其二是某個socket fd上監測到讀事件(客戶端發送數據過來)。
對於客戶端的連接請求,將由服務端“親自”處理,從圖中我們可以看出:服務端主線程需要首先調用accept()函數接受客戶端的連接請求,接受成功則會得到一個socket-fd代表着某個客戶端與服務端的socket連接。緊接着我們會把該socket-fd設置為非阻塞、邊緣觸發、epolloneshot、epollin. 其中之所以會用到邊緣觸發和epolloneshot的原因在之前的博客里面有詳細介紹,讀者可以到前言里面找到這些連接自行閱讀,此處不再贅述,epollin的意思是監聽該socket-fd上的讀事件。這些屬性都設置好后,需要把該socket-fd再次加入到之前創建的epoll中,使用接口epoll_ctl()可以達到這個目的。這些都做完后,如果客戶端發送數據過來,epoll就會監測並報告,正如圖中我們畫了三個小矩形框代表着監測到的3個讀事件,這個時候server端主線程需要把讀事件包裝成一個task,並調用線程池提供的接口threadpool_add_jobs_to_taskqueue()把該作業加入到task隊列中,當然該函數在此處的實現是線程安全的。我們說了task,task包括回調函數的地址、對應讀事件的socket-fd、epollfd,所謂回調函數以本文為例,是這樣的:回調函數邏輯上是和server-epoll端是一體的,但是調用回調函數的caller並不是server端,而是線程池端通過指針來調用的,簡言之,A處的函數,B處通過函數指針來調用。
現在讓我們把目光再次轉移到線程池里,前文說過線程池剛創建的時候所有的線程都卡在pthread_cond_t類型的變量上,如圖所示,運行一段時間后,server端主線程已經將5個task加入到作業隊列里面了,再加入task的過程中,如果當前task隊列為空,則作業加入后會使用pthread_cond_broadcast()來喚醒所有卡住的線程,此時如圖所示,我們假設線程T2成功競爭獲得task1,於是線程T2從task1中拿到回調函數的地址,和參數,然后調用之。此刻回調函數開始執行,圖中帶“server call back function”字樣的矩形框代表了這一執行過程,callback函數會首先使用recv()函數接收一個頭部(代碼里自定義的頭部,長度是已知的),接收完頭部后,callback()函數會解析頭部看看是哪種類型的請求,並調用具體的接口函數來執行該請求,如圖所示對於data node來說有查詢、上傳、下載等不同的功能接口函數。
以上便是server端的一個完整執行流的過程描述,在JDFS中有兩種計算結點,一個是data node一個是name node,上面跑的server均是按照本小節描述的過程來工作的。
2 流式冗余存儲執行流
先來看一下對應的執行流程圖:
上面這張圖描述了我們把算法導論CLRS-en.pdf流式冗余地存儲到三個data node的全過程。圖中的數字標志了執行的先后順序,有些線條用同一種特殊的顏色比如黑色虛線,意為這幾個步驟邏輯上連接比較緊密。現在我們就順着數字1,2,3....10來詳細看一看具體的過程。在數字1處,用戶User1向Client發出存儲CLRS-en.pdf的請求,client是提供了若干面向用戶的接口,用戶可以直接調用該接口來完成相應的功能,從圖中我們可以看到用戶把CLRS-en.pdf作為參數傳遞給了Client。然后入數字2標志的箭頭所示,Client首先會向Name node發送一個http請求,查詢當前虛擬集群里面還活躍着的data node。順着數字3我們看到name node首先會讀取本地的配置文件(里面記錄了所有data node的ip地址、監聽端口、串號等信息),對於配置文件里面列出的所有<ip,port>對,name node會向<ip,port>標志的服務發送一個查詢是否活躍的請求,這個過程用數字4標志的黃色虛線表示,該虛線是個雙向箭頭,活躍着的data node在接收到name node發過來的查詢請求后,會沿着同一個socket-fd(如圖中的黃色虛線)發送數據給name node告訴它我目前處於活躍狀態,可以提供相應的服務。
接下來的邏輯我們聚焦於數字5,6,7,8標志的黑色虛線,數字5標志的虛線表示name node會把上一步查詢到的活躍的data node的ip地址,端口等發送給Client,接着兩個數字6標志的綠色虛線表示Client按照一定的策略把CLRS-en.pdf分割為3個block,然后基於<活躍的結點列表、本地文件的分塊信息>, Client端會按照一定的策略為每一個block選取若干個data node結點來冗余地存儲該block. 然后Client會把這些信息沿着8標志的黑色虛線發送給name node,告訴name node“我打算把CRLS-en.pdf”分成3個block冗余地存儲於虛擬集群里。Name node會在本地創建一個同名的文件來存儲該文件的元信息。如圖所示,元信息記錄了如下信息:文件名,文件的狀態(0表示打算存儲,但是還沒有完成),文件的block數目,接下來是每一個block對應的具體信息:該block存儲於哪個結點上,標志為0的表示不打算存儲在對應的數據結點上,標志為1的表示應該存儲到對應的數據結點上但是還沒有存儲成功。在JDFS的實現當中,10010010表示第3個block應當冗余存儲到第1,4,7個結點上面。
接下來如圖中9所標志的4條虛線箭頭代表了流式冗余存儲的過程。如圖所示每一個block在傳輸前又繼續被分成了6個pieces(此處簡化了,實際代碼中被分成了100多片),client會挨個遍歷每個block的每一個pieces,並且把它傳輸給datanode3,這個過程是串行的,也即成功傳遞一個piece后才會傳遞下一個piece. datanode3每成功接收一個piece都會給Client發送一個確認信息,這樣Client就知道該piece成功傳遞,否則需要重新傳輸。
接下來的情況比較復雜,也是潛在bug容易出現的地方:流式傳遞。所謂流式傳遞,以上圖為例,每一個data node再接收上一個data node傳輸過來的piece后,需要把這個piece傳遞給下一個data node,同時繼續接收上一個data node發送過來的下一個piece,這個過程就像是字節流一樣,所以被稱之為流式傳遞。筆者曾將在網上找到一篇介紹HDFS的博客,里面介紹HDFS就是采用的流式傳遞的方式來傳輸數據分片,本文就是借鑒HDFS的這個處理方式來實現的,在此感謝HDFS。剛才說了這個過程是比較復雜的,原因在於此時數據的傳遞已經不向Client-datanode3之間那樣是串行傳輸,從datanode3--->datanode2--->datanode1的傳輸過程不再是串行的,對於datanode2來說,很可能第一個block的第2個piece比第1個piece先到達,此時接收順序已經是亂序、並行的了。所以此時面臨的問題是datanodex里的所有線程同時接收block的piece,文件由誰來創建是一個問題,另外在最初筆者經過很多的bug調試后決定使用邊緣觸發、epolloneshot的,這些問題的原因讀者都可以在前幾篇博客里面找到,此處不再贅述。
對於Client來說,在把所有數據分片成功傳輸給datanode3后,剩下的事情就是數字11標志的“wait file to be completed”的過程,注意此時Client的數據雖然成功發送完畢了,但是datanode3-datanode2-datanode1之間的流式傳遞很可能還沒有完成,所以此處需要等待數據傳輸完畢。Client會間歇式地給name node發送請求查詢文件是否存儲完畢,Name node會讀取對應文件的元信息,檢測每一個block對應的元信息,如果所有先前標志為1的數字都變成了2,則說明該文件已經存完畢,name node會把該文件標志為已經完成存儲,然后告訴Client,此時Client收到存儲成功的消息后就會返回。
那么這些原先標志為1的數字怎么變成2了呢?讓我們把目光移動到圖中數字10標志的黑色實線處,每一個data node在成功接收完畢一個完整的block后,就會發送消息給name node,name node此時就會更新該block對應的元信息,也即把相應的1該為2,表示該block成功存儲到對應的datanode上了。前面我們說過,對於data node2,3來說,接收到的數據分片是亂序的,很可能接收到某個block的最后一個分片后,實際上該block的其他分片還沒有被成功傳輸,那么這個問題怎么解決呢?在JDFS中我們讓接收某個block的最后一個分片的線程負責這件事,該線程會等待其他所有分片到達完畢后,發消息給name node告知該block已經接收完畢。具體的實現可以參見github里面的代碼。
以上就是CLRS-en.pdf被分割后,流式冗余存儲到虛擬集群里的全過程,下面我們羅列一下name node, data node, client分別實現的函數接口:
name node的實現代碼:
1 void *Http_server_callback_query_nodelist(void *arg);3 void *Http_server_callback_query_ip_from_node_serial(void *arg); 4 void *Http_server_callback__cloudstr_meta(void *arg); 5 void *Http_server_callback_update_meta_info(void *arg); 6 void *Http_server_callback_wait_meta_complete(void *arg); 7 void *Http_server_callback_delete_meta_file(void *arg);
data node的實現代碼:
1 void *Http_server_callback_query(void *arg); 2 void *Http_server_callback_upload(void *arg); 3 void *Http_server_callback_download(void *arg); 4 void *Http_server_callback_query_node_alive(void *arg); 5 int Http_stream_transform_file(callback_arg_upload *cb_arg_upload, char *namenode_ip, int namenode_port); 6 void *Http_server_callback_delete_file(void *arg);
client端的實現代碼,包括了后續將要介紹的下載、刪除、顯示等功能的代碼:
1 int JDFS_cloud_query_node_list( char *server_ip, int server_port, node_list *nli); 2 int JDFS_cloud_store_file(char *file_name, int flag); 3 int JDFS_cloud_store_one_block(char *file_name, int block_num,char nodes_8_bits, node_list *nli, int total_num_of_blocks); 4 int Split_file_into_several_parts(char *file_name, int part_size); 5 int Extract_part_file_and_store(FILE *fp,char *new_file_name,int range_begin,int range_end); 6 int JDFS_wait_file_transform_completed(char *file_name, char *ip_str, int port); 7 int JDFS_cloud_query_file(char *file_name, JDFS_file_info *jfi,int flag);//flag, 0: query for reading ,1 :query for deleting 8 int JDFS_cloud_query_ip_from_node_serial(int serial_num, char *ip_str); 9 int JDFS_cloud_fetch_file(char *file_name); 10 int JDFS_cloud_merge_file(char *file_name, int num_of_blocks); 11 int JDFS_cloud_delete_file(char *file_name); 12 int JDFS_cloud_delete_file_internal(char *file_name, char *ip_str, int port); 13 int JDFS_cloud_delete_meta_file(char *file_name, char *ip_str, int port); 14 int JDFS_cloud_display_file_info(char *file_name); 16 int JDFS_select_optimal_node_for_one_block(each_block_info *ebi);//return the optimal node num 17 int JDFS_cloud_fetch_one_block(char *file_name, int block_num,int node_num, char *destination_file_path);
3 從雲端下載文件到本地
如上圖所示,CLRS-en.pdf已經被分成若干個block,每一個block冗余地存儲在不同的data node里。這里我們要從雲端把組成文件的所有block下載到本地,然后合並還原原始文件。在數字1標志的箭頭處,用戶user1向Client發出fetch file的請求,並且把文件名CLRS-en.pdf傳給了Client. 如數字2,3標志的箭頭所示,Client先向Name node發出查詢文件CLRS-en.pdf元信息的請求,然后name node在本地的配置文件里找到CLRS-en.pdf的元信息文件,讀取后把相關信息發送給Client. 在數字5標志的箭頭處我們可以看到Client先解析name node發送過來的數據,解析的結果是CLRS-en.pdf有3個block組成,block 1冗余地存儲在data node 2,5上,block2 冗余地存儲於data node 3,4,6,7,8上,block3 冗余地存儲於data node 1,4上。(因為篇幅有限,圖中只列出了data node 1,2,3)。為了獲得CLRS-en.pdf,客戶端需要把組成該文件的3個block都下載下來。客戶端通過特定的策略決定從data node 2下載block1, data node 3下載block2, data node 1 下載block3. 此時Client還不知道data node 1,2,3的ip地址,於是數字6,7,處客戶端向name node查詢data node 1,2,3的ip地址。接下來數字8,9,10處表示客戶端分別從不同的data node下載相應的block, 在11處,Client將3個block合並成原始文件,返回給用戶。至此文件獲取的過程就全部結束了。
另外關於文件的顯示、刪除等邏輯比較直觀,本文將不再對此進行介紹,讀者可以自行閱讀github上的代碼或者參見前言列出的博客。
三 結束語
到此JDFS的整體結束就結束了,原本打算在本篇博客里面介紹下如何安裝、部署運行JDFS的,但是寫到這里,感覺有點累了,又加上篇幅太長閱讀起來容易疲勞,我想還是放在下一篇博客里面介紹吧,在此期間筆者正好可以繼續測試JDFS,解決一下潛在的bug之類的。聯系方式:https://github.com/junhuster/