楔子
現在相信你已經對DAG的工作原理有了基本的理解,那么下面來看看Dask如何使用DAG來創建健壯的、可擴展的workload(控制器)。
下面我們要完成兩件事:使用Dask的DataFrame API來分析結構化數據集;研究一些有用的診斷工具,並使用low-level Delayed API來創建一個簡單的自定義任務圖。
但是我們需要先安裝Dask,直接pip install dask[complete] -i https://pypi.tuna.tsinghua.edu.cn/simple即可,Dask是Python的一個第三方庫。
import sys
import dask
print(dask.__version__) # 2.28.0
print(sys.version[: 5]) # 3.8.1
但是我們需要先安裝Dask,直接pip install dask -i https://pypi.tuna.tsinghua.edu.cn/simple即可,Dask是Python的一個第三方庫。
你好Dask:看一下你的DataFrame API
任何數據科學項目的一個基本步驟都是對數據集執行探索性分析,在探索性分析期間,你需要檢查數據是否存在缺失值、異常值和任何其他數據質量問題。如果出現了臟數據,需要對其進行清洗,以確保對數據所做的任何結論不會受到錯誤或異常數據的影響。在使用Dask DataFrame API的第一個示例中,我們將逐步讀取數據文件,並掃描數據以查找缺失值,並刪除由於丟失太多數據或者對分析沒有幫助的列。
檢測Dask對象的元數據(metadata)
下面我們首先需要讀取數據集,關於數據集,書中使用的是上一篇博客中說的NYC停車罰單數據,但是我們這里暫時不使用那個數據集。我們可以在kaggle上下載其它的數據集,這里我下載的是:https://www.kaggle.com/kenshoresearch/kensho-derived-wikimedia-data
里面的item.csv。
沒有特殊說明,我們的代碼都是在jupyter notebook上面運行。
import dask.dataframe as dd
df = dd.read_csv(r"C:\Users\satori\Desktop\item\item.csv")
df
如果你是一個經驗豐富的pandas使用者,那么你會發現上面的代碼非常的熟悉,因為它們在語法上是等價的。但是結果卻不一定是你熟悉的,pandas的DataFrame對象在打印的時候會嘗試將數據的開頭和結尾顯示出來,最上方是列名,左側是索引。但是我們看到Dask DataFrame在打印的時候會顯示元數據,列名在頂部,而下方則是每個列各自的數據類型,因為Dask會非常努力、並且智能地從數據中推斷數據類型,就像pandas所做的那樣。但是Dask在這方面的能力也是會受到限制的,Dask是用來處理不能被單機讀取的中大型數據集的,而pandas是完全基於內存操作,所以它可以輕松而快速地掃描整個數據集,從而判斷每個列的最佳數據類型。另一方面,Dask必須能夠很好的處理分散在分布式文件系統中多個機器的數據集,因此Dask DataFrame使用隨機抽樣的方法從一小部分數據樣本中分析和推斷數據類型。但是這樣有一個問題,如果數百萬或數十億行中只有一個異常行,那么這個異常行就不太可能被隨機挑選出來,這將導致Dask選擇不兼容的數據類型,可能會導致在后面的執行計算中出現錯誤。因此避免這種情況的最佳實踐是顯式地設置數據類型,而不是依賴於Dask的推斷。
事實上這一點很好理解,因為pandas是將數據全部讀取到內存中,即便是分塊,它最終也是可以選擇一個合適的類型。但是對於Dask而言,顯然是沒有辦法這么做的,因為數據量就已經決定了光靠隨機采樣是達不到百分之百的准確率。
或者更好的做法是使用能夠表示數據類型的二進制文件(比如:Parquet),類型是什么在文件中進行體現,這樣就完全可以避免類型推斷帶來的問題。我們將在后續系列來討論這個問題,目前我們就讓Dask自己推斷數據類型。
顯然我們來看看Dask DataFrame的輸出,因為這和pandas DataFrame的輸出還是有很大差別的。Dask DataFrame顯示的是元數據信息,它告訴了我們Dask的任務調度器在處理該文件時是如何對任務進行分解的。
npartitions顯示了將DataFrame分隔成多少個分區,這里是65個,由於該文件的大小是3.85GB,在65個分區中,每個分區的大小大概是60.65MB。這意味着Dask不是一次性將文件加載到內存中,而是每個Dask工作線程一次處理一個60.65MB的文件塊。而且dd.read_csv在讀取這個大文件時,幾乎是瞬間完成的,這也說明了該函數在讀取csv文件時會執行懶加載。
Dask將文件分成可以獨立處理的多個小塊,而不是馬上將整個文件讀取到內存中,而這些塊就叫做分區。而Dask DataFrame中的每一個分區,都相當於是一個較小的pandas DataFrame。
所以Dask是將大文件分割成多個分區,一次處理一個分區。
圖中的Dask DataFrame是由兩個分區組成,因此單個Dask DataFrame由兩個較小的pandas DataFrame組成,每個分區都可以加載到內存中進行處理。工作節點先拾取分區1進行處理,然后將結果保存在臨時存儲空間中。然后拾取分區2進行處理,也將結果保存在一個臨時空間中,然后將兩個結果合並並返回給我們。因為工作節點一次可以處理較小的數據片段,所以可以將工作分配個多個機器,或者在本地的情況下,也可以在非常大的數據集上繼續工作,而不會導致內存不足產生的錯誤(內存溢出)。
注意df返回的元數據,我們還有一個信息沒有說,就是最下面的65 tasks,這表示Dask DataFrame由65個任務組成。這表示Dask創建了一個有65個節點的有向無環圖來處理數據。而我們恰好也有65個分區,表示每個分區里面有一個任務,注意:分區數和任務數不一定是一致的,因為一個分區可以對應多個任務。我們這里是65個分區,所以如果有65個worker的話,那么可以同時處理整個文件,但如果只有一個worker,那么Dask將依次循環遍歷每一個分區。現在讓我們嘗試計算一下,整個文件中每個列的缺失值。
# 可以看到使用的api和pandas的DataFrame基本是一致的
missing_value = df.isnull().sum()
"""
Dask Series Structure:
npartitions=1
en_description int64
item_id ...
dtype: int64
Dask Name: dataframe-sum-agg, 196 tasks
"""
雖然在計算的方法上和pandas是一致的,但是結果和之前一樣,返回的Series對象並沒有給出我們期望的輸出。返回的不是缺失值的統計值,而是一些關於預期結果的元數據信息。而且通過輸出信息我們發現返回的結果看起來像是一系列int64,但是實際數據在哪里呢?其實Dask還沒有進行處理,因為采用的是惰性計算。這意味着Dask實際上做的是准備一個DAG,然后存儲在missing_value變量中,在顯式執行任務圖之前,不會計算數據。這種行為使得快速構建復雜任務圖成為可能,而不必等待每個中間步驟完成,通過返回信息的最后一行我們知道此時的任務數量已經增加到了196。Dask從DAG中獲取了前65個任務,這些任務用於讀取數據文件創建DataFrame,然后DataFrame又添加了131個任務來檢查null值以及計算sum,最終將所有部分收集到一個單一的Series對象中並返回答案。
missing_count = ((missing_value / df.index.size) * 100)
missing_count
"""
Dask Series Structure:
npartitions=1
en_description float64
item_id ...
dtype: float64
Dask Name: mul, 329 tasks
"""
在執行計算之前,我們要求Dask將缺失值的數量轉成百分比,顯然要除以DataFrame的總行數,再乘以100。注意:任務數量增加的同時,返回的Series對象的數據類型也從int64變成了float64。這是因為觸發操作的結果不是整數,因此Dask自動將結果轉換為浮點數,正如Dask嘗試從文件中推斷數據類型一樣,它也會嘗試推斷某個操作如何影響輸出的數據類型。由於我們已經像DAG中添加了一個用於兩個數相除的操作,Dask推斷我們可能會從整數移動到浮點數,並相應的改變元數據。
用compute方法運行計算
現在我們准備運行並生成結果了。
from dask.diagnostics import ProgressBar # 繪制進度條
# 在計算的時候會自動顯示進度條
with ProgressBar():
missing_count_pct = missing_count.compute()
missing_count_pct
結果得到的是一個pandas的Series對象,也和我們平時使用pandas得到的結果也是一樣的,我們看到總共花費了37.7s。所以當你想要得到計算結果時,需要調用compute方法,這會告訴Dask我們需要你開始真正地執行了。我們看到這個過程類似於Spark中的RDD,每一步的操作都是一個懶加載(transorm),當遇見action操作時才會計算結果,如果你不了解Spark也沒有關系,因為這很好理解。總之df可以經歷很多很多的操作,每一個操作的結果也可以使用變量進行保存,但它們都是一個懶加載,不會立即執行。所以相當於是記錄了前前后后的血緣關系,"誰"通過"什么操作"得到了"誰",有人發現了這不就是DAG嗎?是的,我們一開始就說了任務調度器就是使用了DAG的概念,而且Spark也是如此,所以這兩者在某種程度上是比較相似的。當執行compute方法時,才會真正從頭開始計算。我們還使用了ProgressBar來顯示任務進度條,以及花費的時間,這是Dask提供的幾種跟蹤手段之一,用於跟蹤運行中的任務,在使用本地任務調度器尤其方便。由於我們目前是在本地、沒有使用集群,所以調度器就是本地任務調度器。
select = missing_count_pct[missing_count_pct != 0].index # 篩選出缺失值百分比不為零的列
with ProgressBar():
df_select = df[select].persist()
df_select
有趣的是select是一個pandas里的對象,但是我們可以將它和Dask DataFrame的方法一起使用,因為Dask DataFrame的每個分區都是一個pandas DataFrame。在這種情況下,pandas里的對象對所有線程都可用,因此它們可以在計算中使用它。在集群上運行時,pandas Series對象會被序列化並廣播到所有工作節點。
另外我們看到除了compute之外,還有一個persist。這里調用compute返回是pandas中的對象,調用persist返回的依舊是Dask中的對象,這里是Dask DataFrame。但是這兩者確實都發生了計算,從進度條我們也可以看出來,df[select].persist()返回的Dask DataFrame只有兩個字段,證明Dask的確真正執行了df[select]邏輯。
另外,persist根據任務調度器的不同還會有不同的表現,如果任務調度器支持異步計算,那么persist會立即返回,返回值包含一個Dask Future對象;但如果任務調度器只支持阻塞式計算,那么persist也會處於阻塞狀態,並且在計算之后依舊返回Dask中的對象,這里是Dask DataFrame。我們下面還會啰嗦一下persist,但是相比你已經猜到persist的使用場景了。
使用persist讓復雜的計算更高效
這里我們再來啰嗦一下persist,有些時候我們不需要全部的列,因此我們需要將不要的列過濾掉,否則每次計算時都要加載額外的列。但是我們不能使用compute,因為這樣就直接得到pandas中的對象了,所以返回的依舊得是Dask中的對象。回想一下,任務圖中的節點一旦執行,它的中間結果就會被丟棄,因為要最小化內存使用,沒有persist的話,這意味着想對做一些額外的事情(查看DataFrame的前五行)的話,我們將不得不再次重新運行整個轉換鏈。為了避免多次重復的計算,Dask允許我們存儲計算的中間結果,以便重用它們,這樣的話就不需要重頭計算了,而這一步就通過persist來完成。此外,如果Dask需要內存時,可能會從內存中刪除一些分區,這些被刪除的分區將在需要時被動態計算。盡管重新計算分區需要花些時間,但是它仍然比重新計算整個任務圖要快的多。所以如果你有一個需要多次重用、並且非常大非常復雜的DAG,那么適當地使用persist進行持久化對於加速計算是非常有用的。
以上我們便結束了對Dask DataFrame的基本了解,現在你已經知道了如何通過幾行代碼讀取數據並開始為探索性分析做准備。而上面的代碼有一個最讓人興奮的特點,那就是無論你在單台機器處理還是在多台機器處理,無論分析的是幾MB的數據還是幾PB的數據,它們的工作原理都是一樣的。另外由於它和pandas的代碼非常相似,你可以對之前的代碼進行很少的修改即可實現通過Dask執行並行計算。在后續系列我們將更深入地分析,但是目前我們有一個當務之急,我們需要挖掘一下Dask如何使用DAG來管理我們剛才說的任務分發。
可視化DAG
到目前為止,你已經知道了有向無環圖(DAG)是如何工作的,並且了解了Dask是使用DAG來安排DataFrame的分布式計算。只不過我們還沒有看到調度器創建DAG的具體過程,然而我們可以通過某些手段實現,Dask可以使用第三方庫graphviz對任務調度器創建的DAG可視化,安裝了這個庫之后,我們便可以檢查所有支持Dask Delayed對象的DAG,直接調用一個visualize函數即可。當然在安裝這個庫之后還不夠,我們還需要去https://graphviz.org/download/
網站下載相應的graphviz程序,如果是Windows直接進入到https://www2.graphviz.org/Packages/stable/windows/
里面下載即可,我這里是一個zip包,解壓之后將bin目錄配置到環境變量即可(最好重啟一下機器,我這里是需要重啟的)。
使用Dask Delayed對象可視化一個簡單的DAG
我們之前使用的是Dask DataFrame,現在我們后退一步,降低一個級別:Dask Delayed對象。之所以這么做的原因是,即使是簡單的DataFrame對象,Dask為其創建的DAG中的節點也會有很多,這會加大可視化的難度。
import dask.delayed as delayed
def inc(i):
return i + 1
def add(x, y):
return x + y
# 關於delayed我們后面會說,這里可以認為將函數變成一個Delayed對象
# 然后傳參方式依舊不變
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
# 進行可視化
z.visualize()
這圖是程序幫我們畫的,自下而上分析的話,還是很形象的,兩個inc函數執行得到的結果傳遞給add函數。邏輯很簡單,重點是里面的delayed函數,它是負責將普通的函數轉化為一個Delayed對象。而且我們看到Delayed對象在調用時,可以接收普通的值,也可以接收別的Delayed對象在調用之后的返回值(當然最終得到的也是一個普通的值)。
type(x) # dask.delayed.Delayed
這圖是程序幫我們畫的,自下而上分析的話,還是很形象的,兩個inc函數執行得到的結果傳遞給add函數。邏輯很簡單,重點是里面的delayed函數,它是負責將普通的函數轉化為一個Delayed對象。而Delayed對象代表了DAG中的節點(task),像我們之前的DataFrame是在Delayed之上的更高一級的對象,它對應多個Delayed對象。代碼中的x表示函數inc的延遲求值,因為它不是被立刻執行的,當然Delayed對象還可以引用其它的Delayed對象,這一點從z的定義上也能看出來。這些Delayed對象連接在一起最終構成了一個圖,對於要求值的z,首先要求x和y被計算出來。所以這便是一個簡單的DAG示意圖,對z的求值有一個很明顯的依賴鏈,需要按照順序求值,並且有一個准確定義的起點和終點。
從圖中我們看到add函數是有依賴的,但是inc函數沒有,因此如果inc函數執行時間比較的話,那么並行計算就很有意義。
使用循環和容器對象可視化一個復雜的DAG
讓我們看一個稍微復雜的栗子。
import dask.delayed as delayed
def add_two(x):
return x + 2
data = [1, 5, 8, 10]
# step_1是一個列表, 里面是對add_two函數的延遲求值
step_1 = [delayed(add_two)(i) for i in data]
# 這里將內置函數sum也變成了Delayed對象
total = delayed(sum)(step_1)
total.visualize()
整體沒什么難度,現在我們弄得再復雜一些。
import dask.delayed as delayed
def add_two(x):
return x + 2
def sum_two_numbers(x,y):
return x + y
def multiply_four(x):
return x * 4
data = [1, 5, 8, 10]
step_1 = [delayed(add_two)(i) for i in data]
step_2 = [delayed(multiply_four)(j) for j in step_1]
total = delayed(sum)(step_1)
total.visualize()
所以我們看到可以將多個計算連在一起,而無需立即計算中間結果。
使用persist降低DAG的復雜性
我們繼續,將上一步計算出的結果和自己本身再加起來,然后再求和。
import dask.delayed as delayed
def add_two(x):
return x + 2
def multiply_four(x):
return x * 4
def sum_two_numbers(x,y):
return x + y
data = [1, 5, 8, 10]
step_1 = [delayed(add_two)(i) for i in data]
step_2 = [delayed(multiply_four)(j) for j in step_1]
total = delayed(sum)(step_1)
# 將data中的每一個值都和total進行相加,得到data2
data2 = [delayed(sum_two_numbers)(k, total) for k in data]
# 然后對data2再進行求和
total2 = delayed(sum)(data2)
total2.visualize()
當然我相信邏輯依舊是很簡單的,但問題是如果我們重復幾次的話,那么這個DAG就會變得很大。類似的,如果我們的原始列表中有100個數字,不是4個,那么DAG也會變得非常非常的大,可以嘗試一下。不過問題是為什么大型DAG難以處理,原因就是持久性。
正如之前提到的,每次在延遲對象上調用compute方法時,Dask都會逐步遍歷完整的DAG來生成結果,這對於簡單的計算來說是可以的。但如果處理的是非常大的分布式數據集, 那么一次又一次的重復計算很快會變得效率低下,而解決的一種辦法就是持久化想要重用的中間結果,但是這對DAG來說會有什么影響呢。
import dask.delayed as delayed
def add_two(x):
return x + 2
def multiply_four(x):
return x * 4
def sum_two_numbers(x,y):
return x + y
data = [1, 5, 8, 10]
step_1 = [delayed(add_two)(i) for i in data]
step_2 = [delayed(multiply_four)(j) for j in step_1]
total = delayed(sum)(step_1)
# 將total持久化
total_persist = total.persist()
total_persist.visualize()
某個節點想要開始執行,必須要保證所有指向它的節點都完成,total_persist是由total持久化得到的,所以沒有任何節點指向它,對於此時得到的DAG是一個空的。
import dask.delayed as delayed
def add_two(x):
return x + 2
def multiply_four(x):
return x * 4
def sum_two_numbers(x,y):
return x + y
data = [1, 5, 8, 10]
step_1 = [delayed(add_two)(i) for i in data]
step_2 = [delayed(multiply_four)(j) for j in step_1]
total = delayed(sum)(step_1)
# 將total持久化
total_persist = total.persist()
data2 = [delayed(sum_two_numbers)(k, total_persist) for k in data]
total2 = delayed(sum)(data2)
total2.visualize()
我們看懂這個DAG只有一半部分,這是因為total_persist已經被計算、並且持久化了,所以Dask可以使用持久化數據,而不是重新計算整個DAG,從而減少生成結果所需要的計算數量。
當然了你可以嘗試繪制出更大的DAG,盡管不適合顯示,但是卻能讓你知道:Dask可以非常優雅地處理復雜的應用。
任務調度
正如我們之前提到的,Dask在其API中使用了惰性計算的概念,而它帶來的效果我們也已經看到了,當我們想看到結果時,必須調用compute方法。考慮到處理pb級數據會花費很長時間,這是非常有利的,因為在請求結果之前不會發生任何的計算,我們可以連續定義很多的操作,而不必等待一個操作完成后再定義下一個操作。
惰性計算
惰性(延遲)計算還允許Dask將工作分割為更小的邏輯塊,這可以避免將操作的整個數據結構都加載到內存中,正如我們之前讀取的item.csv文件,大小是3.85GB,分成了65個分區,平均每個分區處理的大小是60.65MB。
但是當我們像Dask請求結果時究竟發生了什么呢?首先你定義的所有計算都是由DAG表示,它是對你想要的結果進行計算的每一步計划。然而這些計划並沒有指定到底該使用哪些資源執行計算,因此我們必須要考慮兩件事:計算要在哪里發生,以及必要時計算的結果又要發送到哪里。和關系型數據庫不同,在工作開始前,Dask不會預先確定每個任務的精確運行時位置。相反,任務調度器會動態地評估哪些工作已經完成,哪些工作還沒有完成,以及哪些資源可以實時地接收額外的工作。這將允許Dask優雅地處理分布式計算中出現的大量問題,包括worker故障恢復,網絡不可靠性,以及worker完成任務的速度不一致等問題。此外,任務調度器可以跟蹤中間結果的存儲位置,減少數據的傳輸以及從頭計算,在集群上操作Dask,這將大大的提高效率。
數據本地性
由於Dask可以很容易地將我們的代碼從單機擴展到數百、數千個服務器上,因此任務調度器必須做出明智的決定,判斷哪些任務應該在哪些機器上執行。Dask使用一個集中的任務調度器來協調這些工作,為此每個Dask工作節點都要像任務調度器報告它有哪些可用數據以及它正在做哪些工作,然后任務調度器不斷評估地集群的狀態,為用戶提交的計算提供公平、高效的執行計划。在大多數情況下,如果任務調度器在集群中的機器之間可以均勻地分配工作,那么計算就可以很快速和高效地完成。但很多時候卻並不是這樣的,比如一台服務器比其他服務器的任務更重,或者硬件不如其他服務器強大,或者不能快速地訪問數據等等,這些情況都會導致該服務器的任務執行落后於其它服務器,有可能別人服務器都執行完畢了,該服務器還在執行中,那么為了保證高效,應該將該服務器的任務適當的減少,以避免成為瓶頸。而任務調度器的動態特性便允許它在無法避免的情況下,對這些情況作出反應。
為了獲得最佳性能,Dask集群應該使用分布式文件系統,比如:S3或者HDFS來負責數據存儲。那么這是為什么呢?來考慮一下反例,如果一個文件只存儲在一台機器上,那么另一台機器如果想計算的話是不是要將數據發送到該服務器中呢?而且如果你用過Spark的話,那么你應該聽過一句話:移動數據不如移動計算,因為數據(大型)的移動是非常耗費時間的。
所以兩台服務器,如果數據都在一台服務器上,那么另一台服務器肯定要先將數據傳輸到本地,然后才可以讀取,而一旦涉及到網絡間的數據傳輸,那么耗時是少不了的。
而解決這個問題的辦法就是將文件進行分割,切分成多個小塊,不同機器上存儲不同的塊,而這也是分布式文件系統所做的。而且Dask的任務調度器也會考慮數據局部性,或者數據的物理位置,判斷計算應該在哪里進行。
但是完全地避免數據間的移動也是不太可能的,一些數據必須通過廣播到集群中的所有機器,不過任務調度器會盡最大努力讓移動的數據量達到最小化。因為當數據量比較小的時候,這可能沒有太大影響,但是當數據量非常大的時候,在網絡中移動數據的影響就會大很多,所以最小化數據移動通常會帶來更高的計算性能。
希望你現在對DAG在Dask將大量工作分解為更易於管理的部分時所扮演的角色,有一個更好的理解。我們在后續系列中還會回顧Delayed API,但請記住:整個系列中,我們看到的Dask的每一部分都是由Dealyed對象支持的,並且你還可以隨時對DAG進行可視化。盡管在實踐中,你可能不需要如此詳細地對計算進行故障排除,但是理解Dask的底層機制將會幫助你更好地識別任務中的潛在問題和瓶頸。
總結
- Dask DataFrame上的計算是由使用DAG的任務調度器所支持的。
- 計算是惰性的,需要調用compute方法來執行計算並檢驗結果。
- 可以調用任何Dask對象上的visualize方法來對可視化底層的DAG。
- 可以通過persist對計算的中間結果進行存儲和重用,從而簡化計算。
- 應該將計算往數據靠近,而不是將數據往計算靠近,因為移動數據所以造成的網絡和IO延遲對整個程序的影響是很大的。