一 前言
看了一下,距離上一篇博客的發表已經過去了4個月,時間過得好快啊。本篇博客是JDFS系列的第三篇博客,JDFS的目的是為了實現一個分布式的文件管理系統,前兩篇實現了基本的上傳、下載功能,但是那還不能算作分布式文件管理。本篇博客將在前兩篇的基礎上增加一系列分布式的功能,比如流式雲存儲,就是客戶端把本地的文件切分成若干片后,以冗余的方式存儲到分布式數據結點上;所謂的流式傳遞讀者可以在網上搜索一下HDFS的流式傳遞,基本上就是那個意思,正文中會詳細介紹這個,此處不再贅述。除了分布式存儲外,當然我們還得支持客戶端顯示特定文件的信息,比如這個文件有多少個block組成,每一個block存儲在哪幾個數據結點上等,我們不妨把文件的這些信息稱之為文件元信息,元信息是存儲在name node結點上面的。然后有了雲存儲、文件元信息的顯示,當然少不了文件的下載,文件的下載會從不同的數據結點下載文件的block,然后再本地把所有的block合並成一個完整的文件。最后支持刪除操作,客戶端向name node結點發起刪除文件的操作,name node會通知所有存儲該文件的數據結點刪除對應的block,最后name node結點刪除存儲對應文件的元信息。
上一段提到了三個角色:客戶端、數據結點(data node)、name node,。所有的操作都有客戶端發起,文件的正文信息冗余地存儲在data node上,而文件的元信息存儲於name node上。說到這,讀者朋友們可能會感覺很熟悉,沒錯,HDFS在存儲文件時也是這么干的。本篇博客將會聚焦於文件的“流式雲存儲”,而其他功能將會在本系列博客的第四篇進行介紹。JDFS系列博客截止目前一共三篇,內容上相互承接,因此如果讀者朋友是第一次閱讀本篇博客的話,筆者建議大家先閱讀前兩篇博客,以保持連貫性,鏈接在下面:
JDFS相關代碼已經更新到github, 其鏈接地址請點擊我, 如果讀者感覺本文對你有所幫助,不妨給JDFS的github點個star, 嘿嘿~~
Note: 網絡問題是一個非常復雜的問題,再加上多線程,epoll調試起來非常困難。截止目前JDFS的功能並不是十分完美,偶爾會出錯,但是這些錯誤很難重現,一時半會並不能全部解決,但是這並不影響對JDFS總體設計的介紹,總體的框架是沒問題的。這些潛在的問題會在后續版本想辦法解決,爭取早日得到一個穩定版的JDFS.
PS: 本篇博客是博客園用戶“cs小學生”的原創作品,轉載請注明原作者和原文鏈接,謝謝。
二 實驗平台
前文提到了分布式結點,包括data node, name node等,目前筆者並沒有條件搭建一個真正的分布式環境。但是好在有免費版的vmware player, 所以筆者用vmware 創建了4個虛擬機,每一個虛擬機模擬一個結點,四個虛擬結點的名字分別是data node 1, data node 2, data node3, data node4, 為了方便后續代碼的處理,四個虛擬結點分別分配了一個串號:1,2,3,4 。串號唯一標志了一個虛擬結點。為什么不用ip地址來標記呢?因為結點的ip 地址並不是一直保持不變的。根據筆者這段時間做實驗的經驗來看,每隔幾天,虛擬機的ip地址都會變好一次,使用ifconfig命令就可以看到虛擬ubuntu的ip v4地址。
在筆者的實驗里:data node 2上面跑的是name node server,充當name node角色, data node 1,, 3,4 上面跑的是data node server代表了數據結點, 其中data node 3上面同時還運行着客戶端結點。筆者只用了四個結點來測試,其原因是JDFS代碼剛剛更新,先從簡單入手,在較少的結點上進行測試,方便些,等功能穩定了再在多個結點上進行測試。
實驗平台截圖如下:
如上圖所示,是data node server, name node server都啟動后的截圖,上面三個是數據結點, 最下面的是name node。
三 一些遺留的問題
筆者在上篇博客的基礎上添加分布式功能的時候,發現了幾個bug, 在介紹正式功能前我們先來看看這幾個bug
1. 都是水平觸發惹的禍
熟悉linux網絡編程的人大都知道,epoll是高並發服務端設計的首選,epoll中有兩個概念:水平觸發、邊緣觸發。在epoll中我們通過不斷地調用epoll_wait()函數來檢測當前監聽的<ip:port>上面是否有讀寫事件發生,水平觸發和邊緣觸發的區別就在於調用epoll_wait()的時候如何報告讀寫事件。以報告讀事件為例(客戶端發數據給服務端),對於水平觸發:如果當前的socket fd內部的緩沖區中有數據,epoll_wait()就會無條件地告訴調用者:hi, 緩沖區里現在有數據,快來讀啊。即使服務端已經指派了一個線程來處理這個socket fd上的數據,只要epoll_wait()執行時,數據還沒被處理完,epoll_wait()就會報告讀事件。而對於邊緣觸發:在第一次報告socket_fd上有讀數據后,以后epoll_wait()執行時,即使socket fd內部的緩沖區里有數據也不報告,只有緩沖區再一次變空后,再有數據到來才會報告讀事件;簡言之,只有緩沖區的狀態從“空”變為“不空”才會報告事件,所以如果線程讀取一個緩沖區的數據,如果沒有讀完,那么緩沖區讓然處於“不空”的狀態,因此沒有發生“狀態”的改變,所以不會報告事件。
我們假想一種場景,在水平觸發的情況下,epoll_wait()報告了一個讀事件,並且把該事件派發給線程1來處理,然后epoll_wait()再一次執行的時候線程1沒有把數據處理完,於是epoll_wait()再次報告讀事件,並且派發給線程2來執行。加入這一片數據邏輯上只應該由同一個線程來處理,那么此時錯誤就會發生。筆者寫完流式雲存儲部分的功能測試的時候,一開始老是出錯,發現線程讀的數據始終少了一部分,而且少的這部分數據恰好就是http_request頭部的長度(注:頭部是筆者自定義的一個數據結構,客戶端每次發送的數據都會帶一個頭部,用來描述數據正文的相關信息,比如正文長度等,同樣服務端的線程在讀socket fd的數據時,也是首先讀取頭部,解析后再讀取正文)。
說道這,估計讀者已經猜出了原因了,筆者之前的代碼使用的是水平觸發,假設線程1讀取頭部解析到需要讀取4096字節的正文數據后就開始使用recv()來讀取相應的數據,與此同時,epoll_wait()檢測到同一個socket fd上還有數據,於是把該讀事件作為作業加入到作業隊列里,假設線程2競爭到該作業,於是開始讀取一個頭部(實際上是正文數據),解析后發現不是有效的頭部於是丟棄該作業,而線程1由於線程2已經讀取了一個頭部,而少讀取了一個頭部,這就是前文所述bug的來龍去脈。
那么讀者可能要問了之前怎么沒有發現這個bug呢?答案是之前不存在發生這個bug的機會,在本篇博客之前,在做測試時,只有一個服務端和客戶端,這樣的話,很可能下一次epoll_wait()被調用的時候線程1已經把數據讀取完畢了,問題主要發送在流式傳遞(詳情見后文)的過程中:客戶端--->data node 1--->data node3--->data node 4,筆者實驗的時候問題主要發生在數據流式傳遞到data node 3和 data node 4的情況,應該是隨着結點數目的增多以及代碼邏輯的復雜性的增加導致了線程還沒把數據讀取完畢另外一個線程就也開始讀取同一個socket fd上的數據了。
於是筆者后來采用了邊緣觸發的方式,使用邊緣觸發后,上述的bug大大減少了,但是在很少的情況下讓然會出現數據讀取不全的問題。經過反復測試與思考,筆者想出了一個可能導致出錯的場景(后來驗證確實是這個原因):我們假設有一片數據[a,b] a表示起始位置,b表示結束位置。由於網絡的原因,這片數據不是一下就到達,而是分為兩次[a,m]和[m+1,b]到達,假設在[a,m]到達后,在線程1讀取完[a,m]后[m+1,b]由於網絡延遲或者其他原因沒有到達,這時候socket fd的內部緩沖區已經空了,過了一會[m+1,b]已經到達了,結果緩沖區的狀態由“空”變為“不空”,這種情況下即使是邊緣觸發也會報告讀事件,此時線程1卡住了,等待數據[m+1,b]而線程池里空閑的線程檢測到剛才epoll報告的讀事件也開始讀取同一個socket fd的數據,於是bug又發生了,雖然這次發生的概率比較低。
針對這個bug場景,解決的辦法是使用EPOLLONESHOT標志,使用了這個標志后,不管緩沖區狀態是否發生變化,都不會報告事件,這樣線程1可以安心地把數據讀完,讀完后再把EPOLLONESHOT標志復位,這樣epoll_wait()又可以愉快地報告讀事件了。
2. malloc & fopen: 都是越界寫數據惹的禍
這個bug的截圖當時筆者恰好保存了,我們先來看下截圖吧:
從圖上看是malloc相關的錯誤,很奇怪的是經過筆者的反復調試,最終定位的發生錯誤的位置在一個fopen語句,malloc的報錯為什么會在fopen的時候出現呢。網上搜索了一下好像fopen內部會調用malloc()函數,但是筆者反復檢查了fopen的參數,也沒有傳遞非法參數啊。后來看了下fopen的前邊有一個數據寫入的操作,寫入的是一個用malloc分配的內存,然后看了下malloc分配該數據的地方,發現分配的長度不對,長度應該是 xx*sizeof(int)錯寫成了xx,改了后,於是錯誤就消失了。猜想原因可能是這樣的:寫數據的地方越界了,而fopen內部再一次調用malloc時發現了某些字段被越界寫破壞了,於是就報錯了。
3. ftell: 都是磁盤緩沖惹的禍
在添加分布式功能的時候,有一個操作是:在本地把文件分成若干個block(假設5個block),並且存儲於磁盤,然后客戶端在挨個讀取每個block,用ftell獲取文件長度,但是發現在處理block1-4都正常,而ftell block5的長度總是少了一部分。於是筆者另外單獨寫了一個main函數來ftell block5的長度是正確的,只有在JDFS里ftell這個block長度才會出錯,經過一番思考,靈光乍現,可能是內存里有一部分在緩沖里沒有寫到磁盤里,JDFS是分片后立即ftell block的長度,還沒來得及寫入磁盤,而單獨寫的main函數是執行時已經過了一段時間,緩沖已寫入磁盤。然后再檢測分片函數時發現,原始文件分片存儲后沒有調用fclose()函數,加上fclose()后bug就消失了,因為fclose會刷新緩沖區的數據將之存儲到磁盤里面。
4. 線程池:都是變量初始化惹的禍
還有一個是線程池相關的bug,這里簡單的提一下,原代碼里面是在創建了線程之后,才開始初始化諸如mutex變量,這樣是不對的,應該先初始化后再創建線程,否則讀到為初始化的數據導致邏輯錯誤。
四 流式雲存儲
哎呀,總算到了講正題的地方了,哈哈~~。
http_request_buffer結構體是JDFS最重要的一個數據結構,其中的一個字段request_kind代表了不同的請求類型,客戶端向服務端發請求,服務端向另外一個服務端發請求都得帶上一個頭部,最初支持查詢、上傳、下載三個功能,這次更新后添加了很多其他的功能,讓我們先來看看request_kind現在都有哪些功能吧。下面是截取的該數據結構的代碼注釋:
1 /* 2 ** request_kind, 0:query file size 1: upload 2: download 3,4: ack 3 ** 5: query node list alive 6: query one certain node alive 4 ** 7: ack alive of one certain node 5 ** 8: query ip form node serial 9: return to client,not found 6 ** 10: stream transform 7 ** 11: client post clousd store,giving the meta info 12: failed ack 8 ** 13: query file meta info, read or delete 14: not exsists ack 9 ** 15: update meta info 16: update failed ack 10 ** 17: wait meta complete 18: not complete 11 ** 19: delet meta file 12 ** 20: delete file 13 */
其中主要是0,1,2,5,6,8,10,11,13,15,17,19,20. 本文涉及到的主要是:5,6,8,10,11,15,17。下面我們就開始分別詳細的介紹~~
1. 流式雲存儲總體流程的簡介
我們先來描述一下從客戶端發出請求到服務端接收完數據,這一個過程中都發生了哪些事情。下面我們簡稱client 為C, data node 1,2,3為D1, D2, D3, name node server為 NS. 客戶端要處理的文件名為CRLS.pdf(其實就是算法導論,哈哈)
我們的目標是:C把CRLS.pdf切分成m份,然后向NS查詢當前虛擬集群里還“活着”的data node list,然后為每一份數據指定存儲於哪幾個數據結點(例如CRLS.pdf.blk1存儲於D1, D2, D3), 以CRLS.pdf存儲於D1, D2, D3為例,C把CRLS.pdf.blk1按照10K的大小切分成n個分片,把每一分片上傳給D1, 以分片S1為例:前文我們提到過正文之前有一個頭部http_request_buffer,我們在這個頭部里記錄結點D2,D3的信息,於是D1從C接收到分片S1后,緊接着檢測到還需要“流式”傳遞給D2,D3,於是D1把該片數據傳遞給D2,當然傳遞前清除頭部的D2,以免后續重復傳遞給D2,同理,D2接收到數據后在傳遞給D3.經過一段時間后CRLS.pdf.blk1傳遞完畢,CRLS.pdf.blk2,3,...m按照同樣的流程處理,最終整個CRLS.pdf被分成了m份,冗余地存儲於不同的數據結點上。而對於CRLS.pdf被分成了多少個block,每一個block存儲於哪些數據結點這樣的元信息存儲於NS上,這其實就是模仿了HDFS的處理過程,在這里我們向HDFS致敬。PS: 筆者寫JDFS的目的不是為了實現一個和HDFS功能類似的系統,而是鍛煉運用若干技術實現解決一個實際問題的能力,或者說是鍛煉設計軟件架構從而實現一個完整的系統而不是簡單寫一個具有單一功能的程序的能力。
每一個data node在接收完某個block比如CRLS.pdf.blk1的時候需要向NS報告“我已經接收完畢CRLS.pdf的第一個block了,請將之記錄到對應文件的元信息里吧”。NS端記錄着CRLS.pdf的每一個block應當存儲於哪些data node里,當所有的data node都接收完畢對應的block后,NS就會在該文件的元信息里標記為已完成存儲。這樣后續C就可以請求該文件的元信息,並根據元信息到不同的data node下載對應的block,然后合並block還原出原來的文件,后續還可以請求NS從虛擬集群里刪除這個文件。
那么對於特定的data node,比如D1接收完CRLS.pdf.blk1,怎么才算接收完畢呢?對於data node 1,線程池里同時有N個線程在處理接收CRLS.pdf.blk1的不同的分片。筆者是這樣的處理的:在發送數據的頭部,記錄該分片是所在block的第幾個分片,以及所在的block是原始文件的第幾個block. 當D1檢測到所接收的數據分片是對應的block的最后一片分片的時候,D1就會等待其他該block的其他分片被處理完畢,然后發消息給NS請求更新文件的元信息。為了支持該操作,我們定義了一個二維數組node_block_meta_info[m][n], 其中m是原始文件被分成的block的數目,n是特定block對應的分片的信息。如果某個線程接收到第一個block的第一個分片,那么它就負責分配node_block_meta的第一維空間,如果線程接收到了第x個block的第一個分片,那么負責創造對應的第二維存儲空間,也即node_block_meta_info[x]=....malloc()....
在上文的基礎上,假設某個線程接收到CRLS.pdf.blk1的第m片數據,於是就設置node_block_meta_info[1][m]=1,意為該分片被成功接收了,當然必須是node_block_meta_info!=NULL&&node_block_meta_info[1]!=NULL的時候才能寫入,否則應該等待相應的內存被分配后在寫入。這樣一來,線程檢測某個block是否被全部接收就容易了,直接遍歷node_block_meta_info數組就行了。關於node_block_meta_info的free,也很容易,接收完對應的block比如x,free x位置處的內存,整個文件都接收完畢,就free第一維的node_block_meta_info。
關於誰來創建文件的問題,之前由於客戶端是確認一個分片到達后再上傳下一個分片,因此服務端只要簡單判斷磁盤里是否有該文件,沒有則創建就可以了。加入分布式功能后,D1往D2或者D3流式傳遞數據就不一定是串行的,有可能第1個分片還沒到達,第2個分片已經接收完畢了,那么文件由誰來創建的?類似地,我們也讓接收第一個分片的線程創建,其他的線程等待,這樣會不會經常卡住其他線程呢?基本不會,比如文件被分為5個block,則頂多卡住其他線程5次,而一個block被分成上百個分片,這樣一來卡的情況可以忽略不計。
接下來我們分別從客戶端、name node、data node的角度來分析如何實現流式雲存儲
2 客戶端邏輯
客戶端實現了如下幾個函數,具體代碼詳見前言里的github鏈接
1 int JDFS_cloud_query_node_list( char *server_ip, int server_port, node_list *nli); 2 int JDFS_cloud_store_file(char *file_name, int flag); 3 int JDFS_cloud_store_one_block(char *file_name, int block_num,char nodes_8_bits, node_list *nli, int total_num_of_blocks); 4 int Split_file_into_several_parts(char *file_name, int part_size); 5 int Extract_part_file_and_store(FILE *fp,char *new_file_name,int range_begin,int range_end); 6 int JDFS_wait_file_transform_completed(char *file_name, char *ip_str, int port);
客戶端的邏輯比較簡單,JDFS_cloud_store_file()是客戶端執行的最初入口,參數flag如果為0表示文件不用切分為若干block,否則按照實現定義的規則切分文件為若干個block。該函數會首先調用JDFS_cloud_query_node_list()函數,該函數會向name node server查詢當前集群里活躍的data node列表,該列表存儲在類型為node_list類型的參數里。這個時候客戶端得到了活躍的結點列表,也知道了應該把文件分為幾個block,接下來就要為每個block選取若干個結點來存儲了,截止目前筆者簡單地為每個block選取前3個data node,后續穩定后,會實現一個函數按照特定的規則為每一個block選取相應的結點。查詢活躍結點的截圖如下:
緊接着,調用Split_file_into_several_parts()函數和Extract_part_file_and_store()函數將原始文件進行切分。然后調用JDFS_cloud_store_one_block()把某個特定的block上傳給選擇的data node,注意參數nodes_8_bits是一個字符型數據,如果某個bit被設置了就意味着對應的data node 也需要接收此block,服務端的流式傳遞需要查詢這個數據。最后客戶端傳完數據后,data node之間的流式傳遞可能還沒有結束,此時客戶端調用JDFS_wait_file_transform_completed()函數等待文件全部傳遞完畢,該函數內部會每隔一段時間向name node請求標記該文件為已完成,name node會讀取文件元信息,如果確實完成了全部的存儲,就會標記為完成,並告訴客戶端我已經更新狀態為完成了,否則不做更改。
下圖是data node接收到數據后的截圖:
下面是傳輸過程中命令行打印的一些消息的截圖:
3. Name node 端的邏輯
name node端實現了以下幾個函數
1 int make_socketfd_nonblocking(int socket_fd); 2 void *Http_server_callback_query_nodelist(void *arg); 3 int DataNode_alive_or_not(char *ip, int port); 4 void *Http_server_callback_query_ip_from_node_serial(void *arg); 5 void *Http_server_callback__cloudstr_meta(void *arg); 6 void *Http_server_callback_update_meta_info(void *arg); 7 void *Http_server_callback_wait_meta_complete(void *arg);
由於改版后服務端使用了邊緣觸發,邊緣觸發下數據讀取最好使用非阻塞方式,因此make_socketfd_nonblocking()就是用來設置對應的socket fd為非阻塞模式的。Http_server_callback_query_nodelist()用來響應服務端查詢活躍的data node結點列表,name node會首先讀取配置文件里記錄的所有存在的data node的ip,port,然后它會調用DataNode_alive_or_not()函數檢測ip,port對應的data node是否活躍,搜集完所有的活躍結點后,發送給客戶端。前文中提到,某一個data node接收完數據后需要把數據流式傳遞給其他data node, 所以需要先向name node查詢對應串號的data node的ip,port, 而 Http_server_callback_query_ip_from_node_serial()就是用來響應這個請求的。Http_server_callback__cloudstr_meta()函數用來響應客戶端請求存儲文件的請求用的,具體是用來創建該文件對應的元信息文件。前文說過,某個data node存儲完一個block后需要告訴name node更新對應的元數據信息,而Http_server_callback_update_meta_info()就是用來響應該請求的。前文還說過,客戶端發送完所有block后,需要間隔式地發送請求給name node請求標記文件為已完成存儲,而Http_server_callback_wait_meta_complete()就是用來響應這個請求的。
4. data node端的邏輯
對於data node端,主要更新了void *Http_server_callback_upload(void *arg);函數以支持流式傳遞,並且增加了Http_stream_transform_file(callback_arg_upload *cb_arg_upload, char *namenode_ip, int namenode_port)來做具體的流式傳遞工作,所謂流式傳遞即為某個data node接收完一片數據后,緊接着傳遞給另一個需要該片數據的data node.
4.1 void *Http_server_callback_upload(void *arg)
該函數用來接收客戶端或者其他data node發送過來的數據分片,為支持流式傳遞,做了一些更改。
首先前文討論過關於由哪個線程創建文件的問題以及如何確定某個block的分片全部接收完畢,這部分都是在data node server端來做的,首先該函數如果檢測到當前接收到的分片是某個block的第一個分片,那么就創建文件,並且為node_block_meta_info數組分配內存空間,並做相應的初始化。接收完數據后,該函數會調用Http_stream_transform_file()把該片數據傳遞給下一個需要該數據的data node, 傳遞完成后,需做如下判斷:
如果該片數據是對應block的最后一片數據,那么就等待該block的所有數據片接收完畢,然后釋放node_block_meta_info數組對應的內存空間,如果該片數據是最后一個block的最后一個分片,那么就等待所有block傳遞完畢后徹底釋放node_block_meta_info數組。
4.2 int Http_stream_transform_file(callback_arg_upload *cb_arg_upload, char *namenode_ip, int namenode_port);
該函數的功能也很簡明,首先檢測是否存在下一需要改數據分片的data node,如果有的話,根據該data node的串號,向name node查詢對應的ip地址和端口號port,然后把該數據分片發送給<ip,port>標志的data node.
五. 結束語
至此本篇博客就結束了,主要是在上一篇的基礎上增加了分布式存儲的功能,本文簡要的介紹了流式雲存儲的執行流程,以及三個角色:name node, data node, client如何共同協作完成這一功能的。具體的代碼邏輯請見前言里給出的github鏈接。當然目前JDFS的功能並不是完美穩定的,主要是因為網絡+線程池+epoll使得一些潛在的bug很難重現與調試;當然后續會不斷地對JDFS進行優化,調試,爭取早日達到穩定狀態。
聯系方式:聯系方式:https://github.com/junhuster/