作者|Andrea Ialenti
編譯|VK
來源|Towards Datas Science
正如在我幾乎所有關於這個工具的文章中都寫到,Spark和SQL一樣非常容易使用。但不管我花多少時間寫代碼,我只是無法在我的大腦中永久性地存儲Spark API(有人會說我的記憶就像RAM一樣,小而易失)。
無論你是想快速入門介紹sparksql,還是急於編寫你的程序,還是像我一樣需要一份備忘單,我相信你會發現這篇文章很有用。
這篇文章的目的是介紹sparksql的所有主要函數/特性,在片段中,你將始終看到原始的SQL查詢及其在PySpark中的翻譯。
我將在這個數據集上執行我的代碼:https://drive.google.com/file/d/1kCXnIeoPT6p9kS_ANJ0mmpxlfDwK1yio/view
在幾個月前,我為另一篇文章創建了這個數據集,它由三個簡單的表組成:
基礎知識
Apache Spark是一個用於大規模並行數據處理的引擎。這個框架的一個令人驚奇的特性是它以多種語言公開api:我通常使用Scala與它交互,但是也可以使用SQL、Python甚至Java和R。
當我們編寫Spark程序時,首先要知道的是,當我們執行代碼時,我們不一定要對數據執行任何操作。實際上,該工具有兩種類型的API調用:轉換和操作。
Spark轉換背后的范例被稱為“延后計算”,這意味着實際的數據計算在我們要求采取行動之前不會開始。
為了理解這一概念,設想一下你需要對一個列執行SELECT和重命名的情況:如果不調用某個操作(例如collect或count),那么你的代碼只不過是定義了所謂的Spark執行計划。
Spark以有向無環圖(非常著名的DAG)組織執行計划。此結構描述將要執行的確切操作,並使調度器能夠決定在給定時間執行哪個任務。
正如Miyagi先生告訴我們的:
-
上蠟:定義DAG(變換)
-
脫蠟:執行DAG(動作)
與Spark交互
太好了,我們從哪里開始交互?使用Spark有多種方法:
-
使用IDE:我建議使用IntelliJ或PyCharm,但我想你可以選擇任何你想要的東西。查看附錄中的PyCharm快速入門(在本地運行查詢)。我認為可以從你的本地環境使用遠程Spark executor,但說實話,我從來沒有進行過這種配置。
-
Jupyter Notebooks+Sparkmagic:Sparkmagic是一組工具,用於通過Spark REST服務器Livy與遠程Spark集群交互工作[1]。這是在AWS、Azure或googlecloud等雲系統上工作時使用Spark的主要方式。大多數雲提供商都有一項服務,可以在大約10分鍾內配置集群和notebooks 。
-
通過使用spark shell的終端:有時你不希望在你和數據之間有任何東西(例如,對一個表進行超級快速的檢查);在這種情況下,你只需打開一個終端並啟動spark shell。
文章的代碼主要用於IDE。
在編寫任何查詢之前,我們需要導入一些庫並啟動一個Spark會話(使用Dataset和DataFrame 的API編程)。下面的PySpark和Scala代碼段將加載你需要的所有內容(假設你已經配置了系統)。之后,為了簡單起見,我們將只看到PySpark代碼。除了一些細微差別外,scalaapi基本相同。
PySpark
# 導入Spark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 初始化Spark會話
spark = SparkSession.builder \
.master("local") \
.appName("SparkLikeABoss") \
.getOrCreate()
Scala
// 導入Spark
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
// 初始化Spark會話
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
解釋數據集、數據幀和RDD之間的差異篇幅將過長,所以我跳過這一部分,假裝它不存在。
基本操作
你能寫的最簡單的查詢可能是你所用過的最重要的查詢。讓我們看看如何使用Sales表進行基本操作。
簡單的Select語句和顯示數據
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT *
FROM sales_table
'''
# 執行計划
sales_table_execution_plan = sales_table.select(col("*"))
# Show (Action) - 顯示5行,列寬不受限制
sales_table_execution_plan.show(5, True)
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id AS the_order_id,
seller_id AS the_seller_id,
num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
# 以一行代碼執行計划和顯示出來
sales_table_execution_plan = sales_table.select(
col("order_id").alias("the_order_id"),
col("seller_id").alias("the_seller_id"),
col("num_pieces_sold").alias("the_number_of_pieces_sold")
).show(5, True)
我們在代碼片段中所做的第一件事是定義執行計划;只有當我們獲得show操作時,才會執行該計划。
我們可以在Spark計划中調用的其他操作示例包括:
-
collect()—返回整個數據集
-
count()—返回行數
-
take(n)-從數據集中返回n行
-
show(n,truncate=False)-顯示n行。你可以決定截斷結果或顯示字段的所有長度
另一個值得注意的有趣的事情是列是由col對象標識的。在本例中,我們讓Spark推斷這些列屬於哪個數據幀。
我們可以使用語法execution_plan_variable[“column_name”]來指定列來自哪個執行計划。使用此替代語法,我們可以得到:
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id AS the_order_id,
seller_id AS the_seller_id,
num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
# 以一行代碼執行計划和顯示出來
sales_table_execution_plan = sales_table.select(
sales_table["order_id"].alias("the_order_id"),
sales_table["seller_id"].alias("the_seller_id"),
sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")
).show(5, True)
在處理連接時,限定字段的源表尤為重要(例如,兩個表可能有兩個同名字段,因此僅使用col對象不足以消除歧義)。Scala中的語法略有不同:
// Qualify the source execution plan in Scala
sales_table.col("order_id")
重命名和添加列
有時我們只想重命名一個列,或者我們想添加一個新的列並進行一些計算(例如,在以下情況下):
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id,
product_id,
seller_id,
date,
num_pieces_sold AS pieces,
bill_raw_text
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
withColumnRenamed("num_pieces_sold", "pieces")
sales_table_execution_plan.show()
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id,
product_id,
seller_id,
date,
num_pieces_sold,
bill_raw_text,
num_pieces_sold % 2 AS num_pieces_sold_is_even
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)
sales_table_execution_plan.show()
簡單聚合
Spark支持所有主要的聚合函數。以下示例僅指“簡單”的示例(例如平均值、總和、計數等)。稍后將介紹數組的聚合。
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT product_id,
SUM(num_pieces_sold) AS total_pieces_sold,
AVG(num_pieces_sold) AS average_pieces_sold,
MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders,
MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders,
COUNT(num_pieces_sold) AS num_times_product_sold
FROM sales_table
GROUP BY product_id
'''
sales_table_execution_plan = sales_table.groupBy(
col("product_id")
).agg(
sum("num_pieces_sold").alias("total_pieces_sold"),
avg("num_pieces_sold").alias("average_pieces_sold"),
max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"),
min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"),
count("num_pieces_sold").alias("num_times_product_sold")
)
sales_table_execution_plan.show()
顯示架構
顯示命令的“table”有點誤導人;更精確的定義是“顯示執行計划”。使用Spark API,我們可以一個接一個地傳遞多個操作;使用printSchema API,如果在磁盤上寫入執行計划的結果,我們將輸出最終表的樣子。
在下面的示例中,我們重命名一些列,進行聚合,然后添加另一列。
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
-- 創建一個臨時表,進行一些重命名
CREATE TABLE temp_1 AS
SELECT seller_id AS the_seller,
num_pieces_sold AS pieces,
product_id
FROM sales_table;
--對新表進行聚合
CREATE TABLE temp_2 AS
SELECT product_id,
SUM(pieces) AS total_pieces
FROM temp_1
GROUP BY product_id;
-- 添加列
SELECT a.*,
1 AS fake_column
FROM temp2 a;
'''
sales_table_execution_plan = sales_table. \
withColumnRenamed("seller_id", "the_seller"). \
withColumnRenamed("num_pieces_sold", "pieces").\
groupBy(
col("product_id")
).agg(
sum("pieces").alias("total_pieces")
).withColumn("fake_column", lit(1))
# 輸出 Schema
sales_table_execution_plan.printSchema()
printSchema的輸出是:
root
|-- product_id: string (nullable = true)
|-- total_pieces: double (nullable = true)
|-- fake_column: integer (nullable = false)
請注意,printSchema不會觸發操作;相反,Spark會評估執行計划,以了解DAG在輸出列中的位置。由於這個原因,這個操作比show快得多,show會觸發DAG的執行。
解釋執行計划
可以通過explain API獲得有關觸發操作時引擎將執行的操作的更詳細的說明。在這種情況下,我們將獲得Spark將執行的操作的詳細說明。讓我們對上一個查詢調用explain:
# 輸出 Schema
sales_table_execution_plan.printSchema()
== Physical Plan ==
*(2) HashAggregate(keys=[product_id#361], functions=[sum(cast(pieces#379 as double))])
+- Exchange hashpartitioning(product_id#361, 200)
+- *(1) HashAggregate(keys=[product_id#361], functions=[partial_sum(cast(pieces#379 as double))])
+- *(1) Project [product_id#361, num_pieces_sold#364 AS pieces#379]
+- *(1) FileScan parquet [product_id#361,num_pieces_sold#364] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<PATH_TO_FILE>/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,num_pieces_sold:string>
老實說,我從來沒有發現explain API太有用,尤其是當DAG開始變得龐大和復雜時。在Spark UI中可以找到一個更好的視圖,它公開了相同信息的圖形表示。
Select Distinct
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT DISTINCT seller_id,
date
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(
col("seller_id"), col("date")
).distinct()
# 輸出 Schema
sales_table_execution_plan.show()
Case When
在Spark中很好地實現了該操作(不需要特殊的udf);讓我們簡單地用sales_table將每一行插入到不同的bucket中,具體取決於num_pieces_selled:
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT seller_id,
CASE WHEN num_pieces_sold < 30 THEN 'Lower than 30',
WHEN num_pieces_sold < 60 THEN 'Between 31 and 60'
WHEN num_pieces_sold < 90 THEN 'Between 61 and 90'
ELSE 'More than 91' AS sales_bucket
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(
col("seller_id"),
when(col("num_pieces_sold") < 30, "Lower than 30").
when(col("num_pieces_sold") < 60, "Between 31 and 60").
when(col("num_pieces_sold") < 90, "Between 61 and 90").
otherwise("More than 91").alias("sales_bucket")
)
sales_table_execution_plan.show()
Union All
有時我們需要將流分成多個部分,然后將所有內容合並到一個表中;在SQL中,這是用UNION ALL表示的。在spark2.1中,在執行union all操作之前必須對列進行排序。
幸運的是,spark2.3使用列名來對齊合並的執行計划。在下面的示例中,我們首先將表拆分為兩部分,然后將這些部分合並在一起(完全沒有必要,但它將演示如何使用API):
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
CREATE TABLE part_1 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold > 50;
CREATE TABLE part_2 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold <= 50;
SELECT *
FROM part_1
UNION ALL
SELECT *
FROM part_2
'''
# 分離part1
sales_table_execution_plan_part_1 = sales_table.where(col("num_pieces_sold") > 50)
# 分離part2
sales_table_execution_plan_part_2 = sales_table.where(col("num_pieces_sold") <= 50)
# 合並
sales_table_execution_plan = sales_table_execution_plan_part_1.unionByName(sales_table_execution_plan_part_2)
sales_table_execution_plan.explain()
讓我們看看解釋,看看幕后發生了什么:
Union
:- *(1) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
: +- *(1) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) > 50))
: +- *(1) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
+- *(2) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
+- *(2) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) <= 50))
+- *(2) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
表正在合並。
Spark的Join
當代碼出現性能問題時,連接通常是我們首先要查看的地方。Spark引擎在並行化非連接操作方面相當出色,但在連接任務時可能需要進行調整。
我寫了一整篇關於這個主題的文章,所以我不會再深入討論這個問題:如果你想知道更多,或者你遇到了一些連接性能問題,我建議你看看:https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c
同時,這里是連接的語法。在示例中,我們將連接Sales和Sellers表。
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
'''
SELECT a.*,
b.*
FROM sales_table a
LEFT JOIN sellers_table b
ON a.seller_id = b.seller_id
'''
# 左連接
left_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="left")
# 內連接
inner_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="inner")
# 右連接
right_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="right")
# 全外連接
full_outer_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="full_outer")
除了傳統的連接類型(左、右、內、交叉等),Spark還支持半連接和反連接;這兩個基本上是在Spark中表示操作和不表示操作的一種方式:
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
'''
SELECT *
FROM sales_table
WHERE seller_id IN (SELECT seller_id FROM sellers_table)
'''
# 左半連接是在SQL中表示IN操作的一種方式
semi_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="left_semi")
semi_join_execution_plan.show()
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
'''
SELECT *
FROM sales_table
WHERE seller_id NOT IN (SELECT seller_id FROM sellers_table)
'''
# 左反連接是在SQL中表示NOT IN操作的一種方式
anti_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="left_anti")
anti_join_execution_plan.show()
Window函數
window函數對定義為frame或window的特定行子集執行計算。典型的例子是子群的排序。在我們的玩具數據集中,假設我們想知道,對於每個賣家來說,什么是銷售最多的產品。要提取這些信息,我們需要:
-
定義我們將應用排序函數的“分區”:我們需要對每個賣家的產品執行一次排序操作
-
應用我們的首選排序函數:dense_rank
, ``rank
,row_number
。下面是Spark中的窗口函數列表。
下圖是我們希望如何分區數據的示例:
# 導入 Window
from pyspark.sql.window import Window
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT seller_id,
product_id,
total_pieces,
dense_rank() OVER (PARTITION BY seller_id ORDER BY total_pieces DESC) as rank
FROM (
SELECT seller_id,
product_id,
SUM(total_pieces_sold) AS total_pieces
FROM sales_table
GROUP BY seller_id,
product_id
)
'''
sales_table_agg = sales_table.groupBy(col("seller_id"), col("product_id")).agg(sum("num_pieces_sold").alias("total_pieces"))
# 定義窗口:在賣方ID上對表進行分區,並根據銷售的總塊對每個組進行排序
window_specifications = Window.partitionBy(col("seller_id")).orderBy(col("total_pieces").asc())
# 應用dense_rank函數,根據上面的規范創建窗口
sales_table_agg.withColumn('dense_rank', dense_rank().over(window_specifications)).show()
字符串
數據科學家在處理數據時面臨的另一組非常常見的操作,包括從字符串中提取信息。當然,有很多Spark API可以對文本數據進行幾乎任何(基本)操作。
讓我們先從簡單的LIKE運算符開始,然后再討論正則表達式的用法。對於API的完整列表,我將參考文檔;下面是可能使用最多的API。
Like
在下面的示例中,我們希望使用sales表來選擇bill_raw_text類似於“ab%cd%”的所有字符串(即,以字符串ab開頭,中間有一個字符串cd。
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT *
WHERE bill_raw_text LIKE 'ab%cd%'
'''
sales_table_execution_plan = sales_table.where(
col('bill_raw_text').like("ab%cd%")
)
sales_table_execution_plan.show()
有時我們想要找到的模式更復雜,無法用簡單的通配符來表達。在這種情況下,我們需要使用正則表達式。讓我們深入研究幾個函數。在下面的示例中,我們將始終應用相同的正則表達式。
(ab[cd]{2,4})|(aa[abcde]{1,2})
Like的正則表達式(Regex)
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT *
FROM sales_table
WHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})'
'''
sales_table_execution_plan = sales_table.where(
col('bill_raw_text').rlike("(ab[cd]{2,4})|(aa[abcde]{1,2})")
)
sales_table_execution_plan.show()
用正則表達式提取模式
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT DISTINCT REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') AS extracted_pattern
WHERE REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') <> "
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(
# 最后一個整數表示要提取哪一組
regexp_extract(col('bill_raw_text'), "(ab[cd]{2,4})|(aa[abcde]{1,2})", 0).alias("extracted_pattern")
).where(col("extracted_pattern") != "").distinct()
sales_table_execution_plan.show(100,False)
數組操作
數組是一種數據類型,。Spark實現了很多函數來操作數組(准確地說,從2.4版開始就是這樣)。讓我們深入了解基本情況。
數組聚合
將列轉換為數組與調用聚合函數一樣簡單。Spark 2.3有兩種主要的數組聚合函數collect_set和collect_list:第一種只包含唯一的元素,而后一種只是將組轉換為列表。
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
COLLECT_LIST(num_pieces_list) AS num_pieces_sold_list,
seller_id
FROM sales_table
GROUP BY seller_id
'''
sales_table_execution_plan = sales_table.groupBy(col("seller_id")).agg(
collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"),
collect_list(col("num_pieces_sold")).alias("num_pieces_sold_list"),
)
sales_table_execution_plan.show(10, True)
分解陣列
聚合的逆操作是“數組分解”,即從水平數組生成“垂直”列。為此,我們可以使用explode函數。
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
CREATE TABLE sales_table_aggregated AS
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
seller_id
FROM sales_table
GROUP BY seller_id;
SELECT EXPLODE(num_pieces_sold_set) AS exploded_num_pieces_set
FROM sales_table_aggregated;
'''
sales_table_execution_aggregated = sales_table.groupBy(col("seller_id")).agg(
collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set")
)
sales_table_execution_exploded = sales_table_execution_aggregated.select(
explode(col("num_pieces_sold_set")).alias("exploded_num_pieces_set")
)
sales_table_execution_exploded.show(10, True)
其他使用數組的操作(從Spark 2.4開始)
不幸的是,Spark 2.3不支持對數組執行太多操作。幸運的是,Spark 2.4可以!Spark 2.4之后提供的一些功能包括:
-
array_except(array1,array2)-返回array1中的元素數組,而不是array2中的元素,沒有重復項。
-
array_intersect(array1,array2)-返回array1和array2相交的元素數組,不包含重復項。
-
array_join(array,delimiter[,nullReplacement])-使用分隔符和可選字符串連接給定數組的元素。
-
array_max(array)-返回數組中的最大值。跳過空元素。
-
array_min(array)-返回數組中的最小值。跳過空元素。
-
array_sort(array)-按升序對輸入數組進行排序。輸入數組的元素必須是可排序的。空元素將放在返回數組的末尾。
等等。以上定義直接取自參考文獻。我建議你查一下,以便有更多的細節!
UDFs
最后,用戶定義函數。當我們在默認的api中找不到轉換時,udf就是一種方法。
UDF是一個定制函數,程序員可以像我們目前看到的所有api一樣定義並應用於列。它們提供了最大的靈活性(我們幾乎可以在其中編寫任何代碼);缺點是Spark將它們視為黑匣子,因此內部的Spark引擎優化器(Catalyst)無法進行任何優化:udf可能會減慢我們的代碼速度。
作為一個示例,讓我們實現一個UDF,它模擬函數array_repeat(element,count),該函數返回一個包含元素count次的數組。
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
# 創建將在UDF中使用的函數
def array_repeat_custom(element, count):
list = ["{}".format(element) for x in range(0, count)]
return list
# 將函數轉換為UDF。指出UDF的返回類型是一種很好的做法
# 在本例中,返回類型是字符串數組
array_repeat_custom_udf = udf(array_repeat_custom, ArrayType(StringType()))
# 以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
# 調用UDF
sales_table_execution_plan = sales_table.select(
array_repeat_custom_udf(col("num_pieces_sold"), lit(3)).alias("sample_array")
)
sales_table_execution_plan.show()
除了UDF的語法之外,我建議你關注上面使用的lit函數。有些Spark函數只接受列作為輸入:如果需要使用常量,則可能需要將該常量轉換為“列”。lit函數會創建一列文字值。
下一步
我希望我能夠證明Spark並不比SQL更難,他們基本上是一樣的。
你可以想象,這篇文章的標題有點誇張:實際上精通這個工具需要15分鍾以上的時間;但我相信以上是一個很好的快速入門!
我的建議是開始使用上面的api,因為它們將覆蓋70%的用例。當你對基礎知識有信心時,我建議你寫下面兩篇文章,那是一位值得信賴的作者(lol)幾個月前寫的。第一個問題將挑戰你在使用此工具進行開發時遇到的一些經典問題,而第二個問題是對Spark Joins的深入研究。
https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c
https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565
附錄-配置PyCharm
在本地(非分布式)環境中安裝Spark是一項非常簡單的任務。在本附錄中,我將向你展示PyCharm Community Edition的基本配置,以便使用Python運行Spark。有五個簡單步驟:
-
下載PyCharm社區版
-
下載Spark
-
安裝PySpark
-
配置PyCharm以執行正確的Spark executor
-
測試是否一切正常
兩個注意事項:
-
我假設你的系統中正確安裝了Java。
-
在Windows上,需要安裝Winutils,這是運行Hadoop所需的一組二進制文件。查看此Git repo了解更多信息:https://github.com/steveloughran/winutils。
下載PyCharm社區版
幸運的是,JetBrains有一個PyCharm的開源版本。我們可以簡單地從他們的網站下載最新版本。安裝很簡單。
下載Spark
我們只需要從Spark官方網站下載一個壓縮文件。在我寫作時,有兩個主要版本可用:3.0.1和2.4.7。對於文章的范圍,我們可以選擇其中之一。
一旦下載完成,我們只需要在一個合適的位置解壓包。
安裝PySpark
現在是運行PyCharm並安裝所需的所有軟件包的時候了。首先,讓我們打開PyCharm,創建一個新項目和一個新的虛擬環境。
最后,直接從PyCharm安裝PySpark:
注意,為了啟用提示,我們還應該安裝pyspark-stubs包。
配置PyCharm以執行正確的Spark executor
希望我們沒有出現任何錯誤,所以我們只需要指示PyCharm運行正確的Spark執行器。它位於我們解壓縮Spark本身的文件夾中。讓我們為PyCharm項目創建一個運行配置。
測試是否一切正常
要測試Spark是否正常工作,只需運行以下代碼片段
# 導入庫
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 初始化Spark會話
spark = SparkSession.builder \
.master("local") \
.appName("SparkLikeABoss") \
.getOrCreate()
print(spark.version)
原文鏈接:https://towardsdatascience.com/15-minutes-to-spark-89cca49993f0
歡迎關注磐創AI博客站:
http://panchuang.net/
sklearn機器學習中文官方文檔:
http://sklearn123.com/
歡迎關注磐創博客資源匯總站:
http://docs.panchuang.net/