Spark存儲原理——數據寫入過程


        Spark數據的寫入過程的入口點位doPutIterator方法。下面是一些方法的調用關系圖:

        在該方法中,根據數據是否緩存到內存中處理。如果不緩存到內存中,則調用BlockManager的putIterator方法直接存儲到磁盤中;如果緩存到內存中,則先判斷數據存儲級別是否對數據進行了反序列化操作:如果設置了反序列化操作,則調用putIteratorAsValues方法,直接操作值類型數據;如果沒有反序列化,則調用putIteratorAsBytes方法,操作字節類型數據。在將數據寫入到內存中時,還需要判斷在內存展開(unroll)該數據的大小是否足夠,當內存足夠時,寫入內存,否則寫入到磁盤。寫入完成時,一方面將數據的元數據信息發送給Driver終端,請求更新元數據信息,另一方面判斷是否需要創建數據副本,如果需要,則調用replicate方法,通過Netty方式寫到遠程節點上。下面是寫數據邏輯的簡易邏輯圖:

一、寫入內存

Spark內存結構圖:

        Spark內存大致分為兩部分:圖中下半部分是已經使用的內存,內存里存放entries中,該entries由不同數據塊生成的MemoryEntry構成;圖中下半部分為可用內存,這些內存用於嘗試展開數據塊(展開的動作並不是直接寫數據,而是占位置),這里面並不是一下子將所有數據展開到內存中,而是進行每一步操作時,先檢查內存大小是否足夠,如果內存大小不夠,則需要先對內存的某些數據釋放,給准備寫入的數據提供充足的空間,而釋放的數據則被寫入到磁盤中。那么選擇哪些內存數據進行釋放呢?在內存中的數據,是以LinkedHashMap保存,保存了數據的插入順序,根據FIFO策略進行計算釋放,釋放的空間足夠時,就將內存釋放的數據寫入到磁盤中。

數據寫入內存過程:

  1. 獲取信息:在數據塊展開前,為該展開線程獲取初始化內存,該內存大小為unrollMemoryThreshold,獲取完畢后返回是否成功的結果keepUnrolling;
  2. 遍歷:如果Iterator[T]存在元素且keepUnrolling為true,則繼續向前遍歷Iterator[T],內存展開元素的數量elementsUnrolled自增1。如果Iterator[T]到頭或者keepUnrolling為false,則調到步驟4;
  3. 檢測內存:每當memoryCheckPeriod繼16次展開之后,就進行一次內存檢測,檢測展開的內存大小是否超過當前分配的內存。如果沒有超過,則繼續展開;如果內存不足,則根據增長因子計算需要增加的內存大小。如果申請成功,則將申請得到的內存加入到CurrentUnrollMemory,而展開線程獲取的內存大小為:當前展開大小*內存增長因子;
  4. 展開內存:判斷數據塊是否展開成功,如果失敗,則記錄為內存不足,並退出;如果成功,則繼續下一步;
  5. 申請內存:先估算該數據塊在內存中的存儲大小,然后比較數據塊展開的內存和數據塊在內存中的存儲大小,如果數據塊展開的內存<=數據塊存儲大小,說明內存不足,需要申請他們之間的差值大小的內存。如果申請成功,則調用transferUnrollToStorage方法處理;如果數據塊展開的內存>數據塊存儲的大小,那么先釋放多余內存,在調用transferUnrollToStorage方法;
  6. 在transferUnrollToStorage方法中釋放該數據塊在內存展開的空間,然后判斷內存是否足夠寫入數據。如果足夠,則將數據塊放到內存里,否則,發送寫入內存失敗的信息。

二、寫入磁盤

        寫入磁盤的邏輯就相對簡單得多,調用DiskStore的put方法,該方法提供了寫文件的回調方法writeFunc。該方法先獲取寫入文件句柄,然后將數據序列化為數據流,最后根據回調方法寫入文件中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM