《使用Python和Dask實現分布式並行計算》3. Introducing Dask DataFrame(介紹Dask DataFrame)


楔子

前面我們探索了Dask是如何使用DAG在多台機器上協調和管理復雜任務的,但我們當時只是為了說明Dask和DAG之間的關聯,而舉了一些使用了Delayed API的簡單示例罷了。而這次,我們將更深入地了解DataFrame的API。

Dask DataFrame是構建在Delayed對象上的更高級別的對象,它是圍繞着pandas DataFrame對象對Delayed對象進行了包裝。DataFrame API不需要你自己編寫復雜的函數,因為它本身就包含了一整套轉換方法,比如:笛卡爾積、join、聚合、分組等等,可以說是非常普遍的操作了,我們在后續系列會深入討論這些操作。

為什么使用DataFrame

首先我們一般會將數據分為兩種:結構化數據和非結構化數據,其中結構化數據是由行和列組成,從簡單的電子表格到復雜的關系數據庫系統,結構化數據是存儲信息的一種直觀方式。

作為數據科學家,我們非常喜歡結構化數據,因為很容易將相關信息放在一個可視空間中。至於結構化數據我想完全沒必要在概念上多費口舌,直接把它想象成數據庫的一張表即可。

因此,由於結構話數據的組織和存儲方式,很容易想到由許多不同的方法來操作數據。比如與人員信息的相關的結構化數據,我們可以很容易地找到最早的出身日期,過濾出與特定模式不匹配的人,根據姓名和姓氏將人員分組,或者根據年齡進行排序等等。

ids = [1, 2, 3]
names = ["夏色祭", "神樂mea", "碧居結衣"]
ages = [18, 38, 12]

我們創建了三個列表,使用pandas的我們肯定知道如何將其變成一個DataFrame,當然即使不使用DataFrame這種數據結構我們也依舊可以通過Python的方式實現相應的篩選、變換、聚合等操作,但是對於結構化數據,沒有一種結構能和DataFrame一樣直觀,當然更准確的說應該是一個二維表。

但是對於DataFrame來說,它除了具備二維表的特征之外,還有一些額外的術語:索引和軸。我們先來看看,上面的那幾個列表如果組成DataFrame的話是什么樣子。

一定要注意DataFrame的軸和索引,非常重要,DataFrame的行被引用為0軸,列被引用為1軸。在對DataFrame進行聚合、分隔、拼接的時候需要注意,DataFrame默認是沿着0軸操作的,除非你顯式地指定為1軸,pandas中如此,Dask中亦是如此。

關於0軸和1軸估計有人會一直犯迷糊,本來這里我們沒必要說,但還是提一下吧。

一句話:沿着某個軸操作,可以看成是在該軸所在的方向上進行維度上的伸縮。

光說不練假把式,我們來使用numpy操作一下,因為0軸和1軸的概念是在numpy中先出現的。

import numpy as np

arr1 = np.array([[1, 2], [1, 2], [1, 2]])
arr2 = np.array([[3, 4], [3, 4], [3, 4]])

print(arr1)
"""
[[1 2]
 [1 2]
 [1 2]]
"""
print(arr2)
"""
[[3 4]
 [3 4]
 [3 4]]
"""

如果我們想要得到如下結果,那么我們是應該沿着0軸合並、還是沿着1軸合並呢?

[[1 2 3 4]
 [1 2 3 4]
 [1 2 3 4]]

答案很簡單,沿着哪個軸操作,那么維度的變化就會體現在哪個軸上。這里顯然是1軸的維度發生變化了,那么說明我們要沿着1軸進行合並。

print(np.concatenate((arr1, arr2), axis=1))
"""
[[1 2 3 4]
 [1 2 3 4]
 [1 2 3 4]]
"""
# 顯然0軸還是之前的維度

如果我們對聚合之后的結果進行了sum(默認會降低一個維度),得到的一維數組的第一個元素是3,那么我們是沿着哪個軸進行的sum呢?很簡單,顯然是0軸,因為如果是1軸,那么sum之后第一個元素應該是1+2+3+4=10才對,所以是沿着0軸sum的,最終結果是[3, 6, 9, 12],我們看到1軸在維度上依舊沒有改變。

當然我們這里的維度發生了變化,所以很好觀察,但如果維度沒有變化呢?同樣的道理,指定哪個軸,就沿着哪個軸進行操作。

DataFrame還有一個特點就是具有索引,這個索引和數據庫中的索引是類似的,但是DataFrame的索引可以不是字段的一部分。對於索引,我們最好要保證唯一性(不唯一也可以,但最好唯一),我們可以將某一個或多個字段設置為索引,也可以使用默認的自增索引。索引非常重要,我們后面會繼續說,並且還會介紹常見的索引函數,總之先來看看索引是如何用來形成分區的。

Dask和pandas

正如前面提到的,pandas在分析結構化數據方面非常的流行和強大,但是它最大的限制就在於設計時沒有考慮到可伸縮性。pandas特別適合處理小型結構化數據,並且經過高度優化,可以對存儲在內存中的數據執行快速高效的操作。然而正如我們在一開始舉的廚師的例子一樣,隨着數據量的大幅度增加,單機肯定會讀取不下的,通過集群的方式來處理是最好的選擇。這就是Dask DataFrame API發揮作用的地方:通過為pandas提供一個包裝器,可以智能的將巨大的DataFrame分隔成更小的片段,並將它們分散到多個worker中,因此可以更快、更可靠地完成對巨大數據集的操作。

Dask DataFrame會被分割成多個部門,每個部分稱之為一個分區,每個分區都是一個相對較小的DataFrame,可以分配給任意的worker,並在需要復制時維護其完整血統。關於操作我們之前已經見到了,就是對每個分區單獨操作(多個機器的話則可以並行),然后再將結果合並,其實從直觀上也能推出Dask肯定是這么做的。

管理DataFrame分區

因為分區對性能可以產生如此大的影響,所以你或許會認為管理分區是一件非常苦難且乏味的事情。但是不要擔心:Dask會嘗試通過一些明智的默認值和啟發式方法,幫助你在不需要手動調優的情況下獲得盡可能多的性能。例如:當使用read_csv讀取數據時,每個分區的默認大小是64MB(這也是默認塊大小,當然早期的Hadoop也是這樣的)。對於現在機器來說,即便是個人使用的筆記本一般也是16G內存,64MB似乎有點小啊,但這里的64MB不是針對內存而言,而是因為它可以在網絡間快速的傳輸。如果數據量過大,那么在還沒等數據傳輸完畢,某台機器可能就處理完任務了,那么之后就只能傻等着。所以每一個塊默認是64MB,雖然小,但是源源不斷,細水長流。另外,分區的數量我們也可以自己指定,在創建DataFrame時通過npartitions參數指定即可。

from faker import Faker

f = Faker(locale="zh_CN")
df = pd.DataFrame({"name": [f.name() for _ in range(10)],
                   "province": [f.province() for _ in range(10)],
                   "job": [f.job() for _ in range(10)]})
df

這是一個pandas的DataFrame,想要操作的話都是對整體進行操作的,然后我們來將其轉成Dask中DataFrame。

dask_df = dd.from_pandas(df, npartitions=2)
dask_df

我們通過dd.from_pandas將pandas的DataFrame轉成了Dask的DataFrame,這里我們顯式的指定了兩個分區,但是通常Dask會將其放入到單個分區中,因為它非常小。

此外我們還可以通過其它API,來檢查Dask DataFrame的分區信息。

dask_df.divisions  # (0, 5, 9)
dask_df.npartitions  # 2

這里的divisions和npartitions是很有用的屬性,因為它可以檢測DataFrame是如何分區的。第一個屬性:divisions(0,5,9),顯示了分區的邊界(注意:分區是在索引上創建的)。可能你奇怪了,明明是兩個分區,為什么邊界里面會有三個值。其實聰明如你一定想到了,通過(0, 5, 9)可以得到[0, 5][5, 9],所以這表示第一個分區處理索引為0到索引為5(不包括)的行,第二個分區處理索引為5到索引為9(包括)的行。

很好理解,當然注意了:只有最后一個分區會包含兩個邊界,其它分區只包含上邊界、但是不包含下邊界。

第二個屬性:npartitions,這個沒啥可說的,就是返回分區數量。

dask_df.map_partitions(len).compute()
"""
0    5
1    5
dtype: int64
"""

通過map_partitions可以對每一個分區都作用相同的函數,類比pandas中的map。pandas中的map是對整體數據集,map_partitions是對每一個分區,同樣的道理。當然這個和spark也是類似的,spark中的RDD也是有分區的,對RDD使用map會作用在RDD的每一個分區上。所以我們上面相當於是計算每個分區中的行數,由於有兩個分區,所以返回的Series對象長度為2,而且結果都是5,表示Daks將DataFrame分成相等的兩部分。

但有時我們需要動態改變分區的數量,如果我們有一步過濾數據的操作的話,那么執行之后就會導致每個分區的數據量不一樣,從而對后續操作產生負面影響。因為如果某個分區包含大量的數據,那么並行性的優勢將會大大降低。

# 篩選出名字長度大於2的
dask_df_filter = dask_df[dask_df["name"].str.len() > 2] 
dask_df_filter.map_partitions(len).compute()
"""
0    2
1    4
dtype: int64
"""

此時一個分區有兩個數據,另一個分區有4個,此時就會不均衡。當然這個數據量比較少,所以看起來沒啥影響,但如果數據量非常大的話,在過濾的時候就會出現分區數據不均衡的情況。

# 調用repartition可以對分區進行重塑
dask_df_reduce = dask_df_filter.repartition(npartitions=1)
dask_df_reduce.map_partitions(len).compute()
"""
0    6
dtype: int64
"""

只需要指定分區的數量,Dask就知道自己該怎么做,並保證各個分區間數據量的平衡。如果指定的分區小於現有的分區數,那么Dask將通過連接將現有分區合並起來;如果指定的分區數量大於現有的分區數,那么Dask將把現有分區分割為更小的分區。我們可以在任意時刻調用repartition來重塑分區,但是它和其它的Dask操作一樣是屬於惰性計算。在我們進行compute、head之前,不會實際移動任何數據。

# 這個時候我們再將分區指定為2的話
dask_df_filter.repartition(npartitions=2).map_partitions(len).compute()
"""
0    2
1    4
dtype: int64
"""

如果再恢復到原來的分區數,我們看到每個分區內數據的行數會和原來保持一致。

什么是shuffle

事實上,如果你了解spark的話你會很熟悉這個概念,因為shuffle也是spark中出現的。shuffle是一個耗時的操作,至於為什么我們來介紹一下。在分布式計算中,shuffle是將所有分區廣播給所有worker的過程,當我們執行排序、分組、索引等操作,shuffle是必須的,因為DataFrame的每一行都要和其它行進行比較,來確定正確的相對位置。所以這是一個在時間上代價比較昂貴的操作,因為它需要在網絡上傳輸大量數據。

比如我們要按照某個字段進行聚合,顯然此時就不可以每個分區單獨進行了。假設我們要給salary這個字段的值加100,那么每個分區之間可以單獨操作,彼此之間是不受影響的;但如果要是按照salary進行聚合來計算count,那么不好意思,此時就不是每個分區單獨處理所能解決的了的,比如我們有五個分區,每個分區都有salary為8000的值,這個時候統計的話就需要將值在分區之間進行發送,所以它是一個比較昂貴的操作。而且從名字上也能看出來,shuffle有洗牌的意思,如果把每一條數據想象成一張撲克牌,那么shuffle操作是不是需要將多個分區的數據混合在一起呢。而一旦多個分區的數據需要進行交互,那么就意味着數據的傳輸,即網絡IO,所以它是比較耗時的。

再比如排序,顯然排序也是一個shuffle的操作,因為要涉及所有數據之間的對比。由於我們需要對數據進行各種操作,因此想完全避免shuffle是不太現實的,但是我們可以做一些事情來執行shuffle操作的數據量達到最小,比如確保數據在存儲的時候就是有序的,即可消除使用Dask對數據進行排序的需要。如果可能的話,我們可以在源系統(比如關系型數據庫)中進行排序,這樣會比在分布式系統中排序更快、更有效。其次,使用排序列作為DataFrame的索引將提高連接的效率。所以數據排序之后的查找速度會非常快,因為可以通過DataFrame上定義的分區輕松確定某一行的分區位置。最后,如果必須觸發shuffle的操作,那么在資源允許的情況下可以對結果持久化,這樣如果需要重新計算的話可以避免數據之間的再次移動。

Dask DataFrame的一些限制

現在相信你已經對DataFrame API的用途有了一個很好的了解,那么最后再介紹一下Dask DataFrame的一些限制吧。

首先也是最重要的,Dask DataFrame不會暴露pandas DataFrame的所有API,即使Dask DataFrame是由多個小型的pandas DataFrame組成的,因為pandas的一些功能雖然很好但卻並不適合分布式環境。例如:改變數據格式的函數,insert就不支持。但是pandas的DataFrame則不一樣,我們舉個栗子:

pandas是支持insert直接在本地插入字段的,但是對於Dask DataFrame則是不允許的,而且大型數據集也根本不適合這種靈巧的變換,不過雖然不支持insert,但pop是支持的。此外一些更復雜的窗口操作也不支持,比如:expanding和ewm方法,還有像stack和unstack這種方法也不支持,因為它們往往會導致大量的shuffle。通常這些昂貴的操作並不需要在完整的原始數據集上運行,你應該使用Dask來完成所有常規的數據准備、過濾和轉換,然后將最終的數據集交給pandas。然后你可以對轉化后的數據執行這些對於分布式來說非常昂貴的操作了(但對於pandas而言則沒有什么昂貴的),Dask DataFrame和pandas DataFrame之間的交互非常容易,因此在使用Dask DataFrame分析數據時這個操作將會非常有用。

第二個受限制的地方是關系型操作,比如:join、merge、group by,以及rolling。盡管這些操作Dask DataFrame也是支持的,它們可能會涉及大量的shuffle,而成為程序的性能瓶頸。因此可以讓Dask專注於其它操作,將數據量減小能夠交給pandas,然后讓pandas來執行這些操作;或者使用Dask執行這些操作的時候,只將它們作用在索引上。比如有兩個DataFrame,一個與人員有關,一個與交易有關,按照person ID進行merge,那么我們可以將person ID作為索引並排好序,這樣merge的時候速度會明顯加快。

第三個受限制的地方就是索引方面會有一些挑戰,如果你希望使用DataFrame中的列作為索引,來代替默認的數值索引的話,那么這個列最好是被排過序的,否則的話整個DataFrame會因為它進行大量的shuffle操作。因此最好的辦法就是我們在構建數據的時候,就保證它是有序的,這樣在計算時能夠節省大量的時間。

在Dask處理reset_index這個方法時,你可能會注意到和pandas之間有一個明顯的區別,pandas是在整個DataFrame中計算新的順序索引,而Dask DataFrame中reset_index則類似於map_partitions。這意味着每個分區都有自己的從0開始的順序索引,我們可以看一下Dask DataFrame在使用reset_index之后的樣子。

在reset_index之后,索引會變成從0開始的自增索引。但是對於Dask DataFrame而言,它是每一個分區都作用上reset_index,所以兩個分區的索引都是0 1 2 3 4,因為它們都有5行記錄。那么可不可以對整體所有分區進行reset_index呢?就像pandas那樣作用在所有數據集上,答案很不幸,沒有一種簡單的辦法能做到這一點。因此使用reset_index的時候一定要小心,使用reset_index就意味着你不打算使用索引來對DataFrame進行join、group、sort等操作。

最后,由於Dask DataFrame是由多個pandas DataFrame組成的,因此在pandas中效率低下的操作在Dask中效率也會同樣低下。例如:通過apply和iterrows方法進行迭代在pandas中效率非常低,因此在使用Dask DataFrame時遵循pandas DataFrame的優化原則的話將會給你帶來最佳的性能體驗。如果你已經熟悉了pandas,那么使用Dask將會是一件很輕松的事情,而且不僅可以讓你更好地理解pandas,還能讓你熟悉Dask和分布式原理。

總結

  • Dask DataFrame由於行(0軸)和列(1軸),以及索引組成。
  • DataFrame默認基於0軸進行操作。
  • 通過Dask DataFrame的divisions屬性,可以知道DataFrame是如何分區的。
  • 對Dask DataFrame執行過濾操作會導致每個分區之間的數據量不平衡,為了獲取最佳性能,分區大小應該一致。在過濾了大量數據之后,對DataFrame進行重分區是一個很好的做法。
  • 為了獲取最佳性能,DataFrame應該按照邏輯列進行索引,根據它們的索引進行分區,並按照索引進行預分類。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM