Pandas | Spark | |
工作方式 | 單機single machine tool,沒有並行機制parallelism 不支持Hadoop,處理大量數據有瓶頸 |
分布式並行計算框架,內建並行機制parallelism,所有的數據和操作自動並行分布在各個集群結點上。以處理in-memory數據的方式處理distributed數據。 支持Hadoop,能處理大量數據 |
延遲機制 | not lazy-evaluated | lazy-evaluated |
內存緩存 | 單機緩存 | persist() or cache()將轉換的RDDs保存在內存 |
DataFrame可變性 | Pandas中DataFrame是可變的 | Spark中RDDs是不可變的,因此DataFrame也是不可變的 |
創建 | 從spark_df轉換:pandas_df = spark_df.toPandas() | 從pandas_df轉換:spark_df = SQLContext.createDataFrame(pandas_df) 另外,createDataFrame支持從list轉換spark_df,其中list元素可以為tuple,dict,rdd |
list,dict,ndarray轉換 | 已有的RDDs轉換 | |
CSV數據集讀取 | 結構化數據文件讀取 | |
HDF5讀取 | JSON數據集讀取 | |
EXCEL讀取 | Hive表讀取 | |
外部數據庫讀取 | ||
index索引 | 自動創建 | 沒有index索引,若需要需要額外創建該列 |
行結構 | Series結構,屬於Pandas DataFrame結構 | Row結構,屬於Spark DataFrame結構 |
列結構 | Series結構,屬於Pandas DataFrame結構 | Column結構,屬於Spark DataFrame結構,如:DataFrame[name: string] |
列名稱 | 不允許重名 | 允許重名 修改列名采用alias方法 |
列添加 | df[“xx”] = 0 | df.withColumn(“xx”, 0).show() 會報錯 from pyspark.sql import functions df.withColumn(“xx”, functions.lit(0)).show() |
列修改 | 原來有df[“xx”]列,df[“xx”] = 1 | 原來有df[“xx”]列,df.withColumn(“xx”, 1).show() |
顯示 | df 不輸出具體內容,輸出具體內容用show方法 輸出形式:DataFrame[age: bigint, name: string] |
|
df 輸出具體內容 | df.show() 輸出具體內容 | |
沒有樹結構輸出形式 | 以樹的形式打印概要:df.printSchema() | |
df.collect() | ||
排序 | df.sort_index() 按軸進行排序 | |
df.sort() 在列中按值進行排序 | df.sort() 在列中按值進行排序 | |
選擇或切片 | df.name 輸出具體內容 | df[] 不輸出具體內容,輸出具體內容用show方法 df[“name”] 不輸出具體內容,輸出具體內容用show方法 |
df[] 輸出具體內容, df[“name”] 輸出具體內容 |
df.select() 選擇一列或多列 df.select(“name”) 切片 df.select(df[‘name’], df[‘age’]+1) |
|
df[0] df.ix[0] |
df.first() | |
df.head(2) | df.head(2)或者df.take(2) | |
df.tail(2) | ||
切片 df.ix[:3]或者df.ix[:”xx”]或者df[:”xx”] | ||
df.loc[] 通過標簽進行選擇 | ||
df.iloc[] 通過位置進行選擇 | ||
過濾 | df[df[‘age’]>21] | df.filter(df[‘age’]>21) 或者 df.where(df[‘age’]>21) |
整合 | df.groupby(“age”) df.groupby(“A”).avg(“B”) |
df.groupBy(“age”) df.groupBy(“A”).avg(“B”).show() 應用單個函數 from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() 應用多個函數 |
統計 | df.count() 輸出每一列的非空行數 | df.count() 輸出總行數 |
df.describe() 描述某些列的count, mean, std, min, 25%, 50%, 75%, max | df.describe() 描述某些列的count, mean, stddev, min, max | |
合並 | Pandas下有concat方法,支持軸向合並 | |
Pandas下有merge方法,支持多列合並 同名列自動添加后綴,對應鍵僅保留一份副本 |
Spark下有join方法即df.join() 同名列不自動添加后綴,只有鍵值完全匹配才保留一份副本 |
|
df.join() 支持多列合並 | ||
df.append() 支持多行合並 | ||
缺失數據處理 | 對缺失數據自動添加NaNs | 不自動添加NaNs,且不拋出錯誤 |
fillna函數:df.fillna() | fillna函數:df.na.fill() | |
dropna函數:df.dropna() | dropna函數:df.na.drop() | |
SQL語句 | import sqlite3 pd.read_sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19”) |
表格注冊:把DataFrame結構注冊成SQL語句使用類型 df.registerTempTable(“people”) 或者 sqlContext.registerDataFrameAsTable(df, “people”) sqlContext.sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19”) |
功能注冊:把函數注冊成SQL語句使用類型 sqlContext.registerFunction(“stringLengthString”, lambda x: len(x)) sqlContext.sql(“SELECT stringLengthString(‘test’)”) |
||
兩者互相轉換 | pandas_df = spark_df.toPandas() | spark_df = sqlContext.createDataFrame(pandas_df) |
函數應用 | df.apply(f)將df的每一列應用函數f | df.foreach(f) 或者 df.rdd.foreach(f) 將df的每一列應用函數f df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 將df的每一塊應用函數f |
map-reduce操作 | map(func, list),reduce(func, list) 返回類型seq | df.map(func),df.reduce(func) 返回類型seqRDDs |
diff操作 | 有diff操作,處理時間序列數據(Pandas會對比當前行與上一行) | 沒有diff操作(Spark的上下行是相互獨立,分布式存儲的) |
轉載請注明:寧哥的小站 » Spark與Pandas中DataFrame對比(詳細)