Spark—15分鍾教程


作者|Andrea Ialenti
編譯|VK
來源|Towards Datas Science

正如在我幾乎所有關於這個工具的文章中都寫到,Spark和SQL一樣非常容易使用。但不管我花多少時間寫代碼,我只是無法在我的大腦中永久性地存儲Spark API(有人會說我的記憶就像RAM一樣,小而易失)。

無論你是想快速入門介紹sparksql,還是急於編寫你的程序,還是像我一樣需要一份備忘單,我相信你會發現這篇文章很有用。

這篇文章的目的是介紹sparksql的所有主要函數/特性,在片段中,你將始終看到原始的SQL查詢及其在PySpark中的翻譯。

我將在這個數據集上執行我的代碼:https://drive.google.com/file/d/1kCXnIeoPT6p9kS_ANJ0mmpxlfDwK1yio/view

在幾個月前,我為另一篇文章創建了這個數據集,它由三個簡單的表組成:

基礎知識

Apache Spark是一個用於大規模並行數據處理的引擎。這個框架的一個令人驚奇的特性是它以多種語言公開api:我通常使用Scala與它交互,但是也可以使用SQL、Python甚至Java和R。

當我們編寫Spark程序時,首先要知道的是,當我們執行代碼時,我們不一定要對數據執行任何操作。實際上,該工具有兩種類型的API調用:轉換和操作。

Spark轉換背后的范例被稱為“延后計算”,這意味着實際的數據計算在我們要求采取行動之前不會開始。

為了理解這一概念,設想一下你需要對一個列執行SELECT和重命名的情況:如果不調用某個操作(例如collect或count),那么你的代碼只不過是定義了所謂的Spark執行計划。

Spark以有向無環圖(非常著名的DAG)組織執行計划。此結構描述將要執行的確切操作,並使調度器能夠決定在給定時間執行哪個任務。

正如Miyagi先生告訴我們的:

  1. 上蠟:定義DAG(變換)

  2. 脫蠟:執行DAG(動作)

與Spark交互

太好了,我們從哪里開始交互?使用Spark有多種方法:

  • 使用IDE:我建議使用IntelliJ或PyCharm,但我想你可以選擇任何你想要的東西。查看附錄中的PyCharm快速入門(在本地運行查詢)。我認為可以從你的本地環境使用遠程Spark executor,但說實話,我從來沒有進行過這種配置。

  • Jupyter Notebooks+Sparkmagic:Sparkmagic是一組工具,用於通過Spark REST服務器Livy與遠程Spark集群交互工作[1]。這是在AWS、Azure或googlecloud等雲系統上工作時使用Spark的主要方式。大多數雲提供商都有一項服務,可以在大約10分鍾內配置集群和notebooks 。

  • 通過使用spark shell的終端:有時你不希望在你和數據之間有任何東西(例如,對一個表進行超級快速的檢查);在這種情況下,你只需打開一個終端並啟動spark shell。

文章的代碼主要用於IDE。

在編寫任何查詢之前,我們需要導入一些庫並啟動一個Spark會話(使用DatasetDataFrame 的API編程)。下面的PySpark和Scala代碼段將加載你需要的所有內容(假設你已經配置了系統)。之后,為了簡單起見,我們將只看到PySpark代碼。除了一些細微差別外,scalaapi基本相同。

PySpark

# 導入Spark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 初始化Spark會話
spark = SparkSession.builder \
    .master("local") \
    .appName("SparkLikeABoss") \
    .getOrCreate()

Scala

//  導入Spark
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

//  初始化Spark會話
val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

解釋數據集、數據幀和RDD之間的差異篇幅將過長,所以我跳過這一部分,假裝它不存在。

基本操作

你能寫的最簡單的查詢可能是你所用過的最重要的查詢。讓我們看看如何使用Sales表進行基本操作。

簡單的Select語句和顯示數據

#  以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT *
FROM sales_table
'''
#   執行計划
sales_table_execution_plan = sales_table.select(col("*"))
#   Show (Action) - 顯示5行,列寬不受限制
sales_table_execution_plan.show(5, True)
#  以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id AS the_order_id,
       seller_id AS the_seller_id,
       num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
#  以一行代碼執行計划和顯示出來
sales_table_execution_plan = sales_table.select(
    col("order_id").alias("the_order_id"),
    col("seller_id").alias("the_seller_id"),
    col("num_pieces_sold").alias("the_number_of_pieces_sold")
).show(5, True)

我們在代碼片段中所做的第一件事是定義執行計划;只有當我們獲得show操作時,才會執行該計划。

我們可以在Spark計划中調用的其他操作示例包括:

  • collect()—返回整個數據集

  • count()—返回行數

  • take(n)-從數據集中返回n行

  • show(n,truncate=False)-顯示n行。你可以決定截斷結果或顯示字段的所有長度

另一個值得注意的有趣的事情是列是由col對象標識的。在本例中,我們讓Spark推斷這些列屬於哪個數據幀。

我們可以使用語法execution_plan_variable[“column_name”]來指定列來自哪個執行計划。使用此替代語法,我們可以得到:

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id AS the_order_id,
       seller_id AS the_seller_id,
       num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
#  以一行代碼執行計划和顯示出來
sales_table_execution_plan = sales_table.select(
    sales_table["order_id"].alias("the_order_id"),
    sales_table["seller_id"].alias("the_seller_id"),
    sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")
).show(5, True)

在處理連接時,限定字段的源表尤為重要(例如,兩個表可能有兩個同名字段,因此僅使用col對象不足以消除歧義)。Scala中的語法略有不同:

// Qualify the source execution plan in Scala
sales_table.col("order_id")

重命名和添加列

有時我們只想重命名一個列,或者我們想添加一個新的列並進行一些計算(例如,在以下情況下):

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id,
       product_id,
       seller_id,
       date,
       num_pieces_sold AS pieces,
       bill_raw_text
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
    withColumnRenamed("num_pieces_sold", "pieces")

sales_table_execution_plan.show()
#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id,
       product_id,
       seller_id,
       date,
       num_pieces_sold,
       bill_raw_text,
       num_pieces_sold % 2 AS num_pieces_sold_is_even
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
    withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)

sales_table_execution_plan.show()

簡單聚合

Spark支持所有主要的聚合函數。以下示例僅指“簡單”的示例(例如平均值、總和、計數等)。稍后將介紹數組的聚合。

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT product_id,
       SUM(num_pieces_sold) AS total_pieces_sold,
       AVG(num_pieces_sold) AS average_pieces_sold,
       MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders,
       MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders,
       COUNT(num_pieces_sold) AS num_times_product_sold
FROM sales_table
GROUP BY product_id
'''
sales_table_execution_plan = sales_table.groupBy(
    col("product_id")
).agg(
    sum("num_pieces_sold").alias("total_pieces_sold"),
    avg("num_pieces_sold").alias("average_pieces_sold"),
    max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"),
    min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"),
    count("num_pieces_sold").alias("num_times_product_sold")
)

sales_table_execution_plan.show()

顯示架構

顯示命令的“table”有點誤導人;更精確的定義是“顯示執行計划”。使用Spark API,我們可以一個接一個地傳遞多個操作;使用printSchema API,如果在磁盤上寫入執行計划的結果,我們將輸出最終表的樣子。

在下面的示例中,我們重命名一些列,進行聚合,然后添加另一列。

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
-- 創建一個臨時表,進行一些重命名
CREATE TABLE temp_1 AS
SELECT seller_id AS the_seller,
       num_pieces_sold AS pieces,
       product_id
FROM sales_table;
--對新表進行聚合
CREATE TABLE temp_2 AS
SELECT product_id,
       SUM(pieces) AS total_pieces
FROM temp_1
GROUP BY product_id;
-- 添加列
SELECT a.*,
       1 AS fake_column
FROM temp2 a;
'''
sales_table_execution_plan = sales_table. \
    withColumnRenamed("seller_id", "the_seller"). \
    withColumnRenamed("num_pieces_sold", "pieces").\
groupBy(
    col("product_id")
).agg(
    sum("pieces").alias("total_pieces")
).withColumn("fake_column", lit(1))

#   輸出 Schema
sales_table_execution_plan.printSchema()

printSchema的輸出是:

root
 |-- product_id: string (nullable = true)
 |-- total_pieces: double (nullable = true)
 |-- fake_column: integer (nullable = false)

請注意,printSchema不會觸發操作;相反,Spark會評估執行計划,以了解DAG在輸出列中的位置。由於這個原因,這個操作比show快得多,show會觸發DAG的執行。

解釋執行計划

可以通過explain API獲得有關觸發操作時引擎將執行的操作的更詳細的說明。在這種情況下,我們將獲得Spark將執行的操作的詳細說明。讓我們對上一個查詢調用explain:

#   輸出 Schema
sales_table_execution_plan.printSchema()
== Physical Plan ==
*(2) HashAggregate(keys=[product_id#361], functions=[sum(cast(pieces#379 as double))])
+- Exchange hashpartitioning(product_id#361, 200)
   +- *(1) HashAggregate(keys=[product_id#361], functions=[partial_sum(cast(pieces#379 as double))])
      +- *(1) Project [product_id#361, num_pieces_sold#364 AS pieces#379]
         +- *(1) FileScan parquet [product_id#361,num_pieces_sold#364] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<PATH_TO_FILE>/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,num_pieces_sold:string>

老實說,我從來沒有發現explain API太有用,尤其是當DAG開始變得龐大和復雜時。在Spark UI中可以找到一個更好的視圖,它公開了相同信息的圖形表示。

Select Distinct

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT DISTINCT seller_id,
       date
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(
    col("seller_id"), col("date")
).distinct()

#   輸出 Schema
sales_table_execution_plan.show()

Case When

在Spark中很好地實現了該操作(不需要特殊的udf);讓我們簡單地用sales_table將每一行插入到不同的bucket中,具體取決於num_pieces_selled:

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT seller_id,
       CASE WHEN num_pieces_sold < 30 THEN 'Lower than 30',
            WHEN num_pieces_sold < 60 THEN 'Between 31 and 60'
            WHEN num_pieces_sold < 90 THEN 'Between 61 and 90'
            ELSE 'More than 91' AS sales_bucket
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(
    col("seller_id"),
    when(col("num_pieces_sold") < 30, "Lower than 30").
    when(col("num_pieces_sold") < 60, "Between 31 and 60").
    when(col("num_pieces_sold") < 90, "Between 61 and 90").
    otherwise("More than 91").alias("sales_bucket")
)

sales_table_execution_plan.show()

Union All

有時我們需要將流分成多個部分,然后將所有內容合並到一個表中;在SQL中,這是用UNION ALL表示的。在spark2.1中,在執行union all操作之前必須對列進行排序。

幸運的是,spark2.3使用列名來對齊合並的執行計划。在下面的示例中,我們首先將表拆分為兩部分,然后將這些部分合並在一起(完全沒有必要,但它將演示如何使用API):

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
CREATE TABLE part_1 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold > 50;
CREATE TABLE part_2 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold <= 50;
SELECT *
FROM part_1
 UNION ALL
SELECT *
FROM part_2
'''
#   分離part1
sales_table_execution_plan_part_1 = sales_table.where(col("num_pieces_sold") > 50)

#   分離part2
sales_table_execution_plan_part_2 = sales_table.where(col("num_pieces_sold") <= 50)

#   合並
sales_table_execution_plan = sales_table_execution_plan_part_1.unionByName(sales_table_execution_plan_part_2)

sales_table_execution_plan.explain()

讓我們看看解釋,看看幕后發生了什么:

Union
:- *(1) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
:  +- *(1) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) > 50))
:     +- *(1) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
+- *(2) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
   +- *(2) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) <= 50))
      +- *(2) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...

表正在合並。

Spark的Join

當代碼出現性能問題時,連接通常是我們首先要查看的地方。Spark引擎在並行化非連接操作方面相當出色,但在連接任務時可能需要進行調整。

我寫了一整篇關於這個主題的文章,所以我不會再深入討論這個問題:如果你想知道更多,或者你遇到了一些連接性能問題,我建議你看看:https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c

同時,這里是連接的語法。在示例中,我們將連接Sales和Sellers表。

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

'''
SELECT a.*,
       b.*
FROM sales_table a
    LEFT JOIN sellers_table b
        ON a.seller_id = b.seller_id
'''
#   左連接
left_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="left")

#   內連接
inner_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="inner")

#   右連接
right_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="right")

#  全外連接
full_outer_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="full_outer")

除了傳統的連接類型(左、右、內、交叉等),Spark還支持半連接和反連接;這兩個基本上是在Spark中表示操作和不表示操作的一種方式:

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")


'''
SELECT *
FROM sales_table
WHERE seller_id IN (SELECT seller_id FROM sellers_table)
'''
# 左半連接是在SQL中表示IN操作的一種方式
semi_join_execution_plan = sales_table.join(sellers_table, 
                on=sales_table["seller_id"] == sellers_table["seller_id"], 
                how="left_semi")

semi_join_execution_plan.show()
#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

'''
SELECT *
FROM sales_table
WHERE seller_id NOT IN (SELECT seller_id FROM sellers_table)
'''
# 左反連接是在SQL中表示NOT IN操作的一種方式
anti_join_execution_plan = sales_table.join(sellers_table,
                on=sales_table["seller_id"] == sellers_table["seller_id"],
                how="left_anti")

anti_join_execution_plan.show()

Window函數

window函數對定義為frame或window的特定行子集執行計算。典型的例子是子群的排序。在我們的玩具數據集中,假設我們想知道,對於每個賣家來說,什么是銷售最多的產品。要提取這些信息,我們需要:

  1. 定義我們將應用排序函數的“分區”:我們需要對每個賣家的產品執行一次排序操作

  2. 應用我們的首選排序函數:dense_rank, ``rank, row_number。下面是Spark中的窗口函數列表。

下圖是我們希望如何分區數據的示例:

#   導入 Window
from pyspark.sql.window import Window

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT seller_id,
       product_id,
       total_pieces,
       dense_rank() OVER (PARTITION BY seller_id ORDER BY total_pieces DESC) as rank
FROM (
    SELECT seller_id,
           product_id,
           SUM(total_pieces_sold) AS total_pieces
    FROM sales_table
    GROUP BY seller_id,
           product_id
)
'''

sales_table_agg = sales_table.groupBy(col("seller_id"), col("product_id")).agg(sum("num_pieces_sold").alias("total_pieces"))

#  定義窗口:在賣方ID上對表進行分區,並根據銷售的總塊對每個組進行排序
window_specifications = Window.partitionBy(col("seller_id")).orderBy(col("total_pieces").asc())

# 應用dense_rank函數,根據上面的規范創建窗口
sales_table_agg.withColumn('dense_rank', dense_rank().over(window_specifications)).show()

字符串

數據科學家在處理數據時面臨的另一組非常常見的操作,包括從字符串中提取信息。當然,有很多Spark API可以對文本數據進行幾乎任何(基本)操作。

讓我們先從簡單的LIKE運算符開始,然后再討論正則表達式的用法。對於API的完整列表,我將參考文檔;下面是可能使用最多的API。

Like

在下面的示例中,我們希望使用sales表來選擇bill_raw_text類似於“ab%cd%”的所有字符串(即,以字符串ab開頭,中間有一個字符串cd。

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT *
WHERE bill_raw_text LIKE 'ab%cd%'
'''
sales_table_execution_plan = sales_table.where(
    col('bill_raw_text').like("ab%cd%")
)

sales_table_execution_plan.show()

有時我們想要找到的模式更復雜,無法用簡單的通配符來表達。在這種情況下,我們需要使用正則表達式。讓我們深入研究幾個函數。在下面的示例中,我們將始終應用相同的正則表達式。

(ab[cd]{2,4})|(aa[abcde]{1,2})

Like的正則表達式(Regex)

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT *
FROM sales_table
WHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})'
'''
sales_table_execution_plan = sales_table.where(
    col('bill_raw_text').rlike("(ab[cd]{2,4})|(aa[abcde]{1,2})")
)

sales_table_execution_plan.show()

用正則表達式提取模式

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT DISTINCT REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') AS extracted_pattern
WHERE REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') <> "
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(
    #  最后一個整數表示要提取哪一組
    regexp_extract(col('bill_raw_text'), "(ab[cd]{2,4})|(aa[abcde]{1,2})", 0).alias("extracted_pattern")
).where(col("extracted_pattern") != "").distinct()

sales_table_execution_plan.show(100,False)

數組操作

數組是一種數據類型,。Spark實現了很多函數來操作數組(准確地說,從2.4版開始就是這樣)。讓我們深入了解基本情況。

數組聚合

將列轉換為數組與調用聚合函數一樣簡單。Spark 2.3有兩種主要的數組聚合函數collect_set和collect_list:第一種只包含唯一的元素,而后一種只是將組轉換為列表。

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
       COLLECT_LIST(num_pieces_list) AS num_pieces_sold_list,
       seller_id
FROM sales_table
GROUP BY seller_id
'''
sales_table_execution_plan = sales_table.groupBy(col("seller_id")).agg(
    collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"),
    collect_list(col("num_pieces_sold")).alias("num_pieces_sold_list"),

)

sales_table_execution_plan.show(10, True)

分解陣列

聚合的逆操作是“數組分解”,即從水平數組生成“垂直”列。為此,我們可以使用explode函數。

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
CREATE TABLE sales_table_aggregated AS
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
       seller_id
FROM sales_table
GROUP BY seller_id;
SELECT EXPLODE(num_pieces_sold_set) AS exploded_num_pieces_set
FROM sales_table_aggregated;
'''
sales_table_execution_aggregated = sales_table.groupBy(col("seller_id")).agg(
    collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set")
)

sales_table_execution_exploded = sales_table_execution_aggregated.select(
    explode(col("num_pieces_sold_set")).alias("exploded_num_pieces_set")
)

sales_table_execution_exploded.show(10, True)

其他使用數組的操作(從Spark 2.4開始)

不幸的是,Spark 2.3不支持對數組執行太多操作。幸運的是,Spark 2.4可以!Spark 2.4之后提供的一些功能包括:

  • array_except(array1,array2)-返回array1中的元素數組,而不是array2中的元素,沒有重復項。

  • array_intersect(array1,array2)-返回array1和array2相交的元素數組,不包含重復項。

  • array_join(array,delimiter[,nullReplacement])-使用分隔符和可選字符串連接給定數組的元素。

  • array_max(array)-返回數組中的最大值。跳過空元素。

  • array_min(array)-返回數組中的最小值。跳過空元素。

  • array_sort(array)-按升序對輸入數組進行排序。輸入數組的元素必須是可排序的。空元素將放在返回數組的末尾。

等等。以上定義直接取自參考文獻。我建議你查一下,以便有更多的細節!

UDFs

最后,用戶定義函數。當我們在默認的api中找不到轉換時,udf就是一種方法。

UDF是一個定制函數,程序員可以像我們目前看到的所有api一樣定義並應用於列。它們提供了最大的靈活性(我們幾乎可以在其中編寫任何代碼);缺點是Spark將它們視為黑匣子,因此內部的Spark引擎優化器(Catalyst)無法進行任何優化:udf可能會減慢我們的代碼速度。

作為一個示例,讓我們實現一個UDF,它模擬函數array_repeat(element,count),該函數返回一個包含元素count次的數組。

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

#  創建將在UDF中使用的函數
def array_repeat_custom(element, count):
  list = ["{}".format(element) for x in range(0, count)]
  return list

#   將函數轉換為UDF。指出UDF的返回類型是一種很好的做法
#   在本例中,返回類型是字符串數組
array_repeat_custom_udf = udf(array_repeat_custom,  ArrayType(StringType()))

#   以Parquet格式讀取源表
sales_table = spark.read.parquet("./data/sales_parquet")

#   調用UDF
sales_table_execution_plan = sales_table.select(
    array_repeat_custom_udf(col("num_pieces_sold"), lit(3)).alias("sample_array")
)

sales_table_execution_plan.show()

除了UDF的語法之外,我建議你關注上面使用的lit函數。有些Spark函數只接受列作為輸入:如果需要使用常量,則可能需要將該常量轉換為“列”。lit函數會創建一列文字值。

下一步

我希望我能夠證明Spark並不比SQL更難,他們基本上是一樣的。

你可以想象,這篇文章的標題有點誇張:實際上精通這個工具需要15分鍾以上的時間;但我相信以上是一個很好的快速入門!

我的建議是開始使用上面的api,因為它們將覆蓋70%的用例。當你對基礎知識有信心時,我建議你寫下面兩篇文章,那是一位值得信賴的作者(lol)幾個月前寫的。第一個問題將挑戰你在使用此工具進行開發時遇到的一些經典問題,而第二個問題是對Spark Joins的深入研究。

https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c

https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565

附錄-配置PyCharm

在本地(非分布式)環境中安裝Spark是一項非常簡單的任務。在本附錄中,我將向你展示PyCharm Community Edition的基本配置,以便使用Python運行Spark。有五個簡單步驟:

  1. 下載PyCharm社區版

  2. 下載Spark

  3. 安裝PySpark

  4. 配置PyCharm以執行正確的Spark executor

  5. 測試是否一切正常

兩個注意事項:

  • 我假設你的系統中正確安裝了Java。

  • 在Windows上,需要安裝Winutils,這是運行Hadoop所需的一組二進制文件。查看此Git repo了解更多信息:https://github.com/steveloughran/winutils。

下載PyCharm社區版

幸運的是,JetBrains有一個PyCharm的開源版本。我們可以簡單地從他們的網站下載最新版本。安裝很簡單。

下載Spark

我們只需要從Spark官方網站下載一個壓縮文件。在我寫作時,有兩個主要版本可用:3.0.1和2.4.7。對於文章的范圍,我們可以選擇其中之一。

一旦下載完成,我們只需要在一個合適的位置解壓包。

安裝PySpark

現在是運行PyCharm並安裝所需的所有軟件包的時候了。首先,讓我們打開PyCharm,創建一個新項目和一個新的虛擬環境。

最后,直接從PyCharm安裝PySpark:

注意,為了啟用提示,我們還應該安裝pyspark-stubs包。

配置PyCharm以執行正確的Spark executor

希望我們沒有出現任何錯誤,所以我們只需要指示PyCharm運行正確的Spark執行器。它位於我們解壓縮Spark本身的文件夾中。讓我們為PyCharm項目創建一個運行配置。

測試是否一切正常

要測試Spark是否正常工作,只需運行以下代碼片段

# 導入庫
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 初始化Spark會話
spark = SparkSession.builder \
    .master("local") \
    .appName("SparkLikeABoss") \
    .getOrCreate()

print(spark.version)

原文鏈接:https://towardsdatascience.com/15-minutes-to-spark-89cca49993f0

歡迎關注磐創AI博客站:
http://panchuang.net/

sklearn機器學習中文官方文檔:
http://sklearn123.com/

歡迎關注磐創博客資源匯總站:
http://docs.panchuang.net/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM