使用Apache Arrow助力PySpark數據處理
開源大數據EMR 2019-05-30 1802瀏覽量
Apache Arrow從Spark 2.3版本開始被引入,通過列式存儲,zero copy等技術,JVM 與Python 之間的數據傳輸效率得到了大量的提升。本文主要介紹一下Apache Arrow以及Spark中的使用方法。
列式存儲簡介
在介紹Spark中使用Apache Arrow之前,先簡單的介紹一下Apache Arrow以及他背后的一些技術背景。
在大數據時代之前,大部分的存儲引擎使用的是按行存儲的形式,很多早期的系統,如交易系統、ERP系統等每次處理的是增、刪、改、查某一個實體的所有信息,按行存儲的話能夠快速的定位到單個實體並進行處理。如果使用列存儲,對某一個實體的不同屬性的操作就需要進行多次隨機讀寫,效率將會是非常差的。
隨着大數據時代的到來,尤其是數據分析的不斷發展,任務不需要一次讀取實體的所有屬性,而只關心特定的某些屬性,並對這些屬性進行aggregate等復雜的操作等。這種情況下行存儲將需要讀取額外的數據,形成瓶頸。而選擇列存儲將會減少額外數據的讀取,對相同屬性的數據還可以進行壓縮,大大的加快了處理速度。
以下是行存儲和列存儲的對比說明,摘自Apache Arrow 官網,上面是一個二維表,由三個屬性組成,分別是session_id, timestamp和source_ip。左側為行存儲在內存中的表示,數據按行依次存儲,每一行按照列的順序存儲。右側為列存儲在內存中的表示,每一列單獨存放,根據batch size等屬性來控制一次寫入的列簇大小。這樣當查詢語句只涉及少數列的時候,比如圖中SQL查詢,只需要過濾session_id列,避免讀取所有數據列,減少了大量的I/O損耗,同時考慮到CPU pipeline以及使用CPU SIMD技術等等,將大大的提升查詢速度。
Apache Arrow
在大數據領域,列式存儲的靈感來自Google於2010年發表的Dremel論文,文中介紹了一種支持嵌套結構的存儲格式,並且使用了列式存儲的方式提升查詢性能,在Dremel論文中還介紹了Google如何使用這種存儲格式實現並行查詢的。這篇論文影響了Hadoop生態系統發展,之后的Apache Parquet和Apache ORC作為列式存儲格式已經被廣大的Hadoop生態系統使用,如Spark、Hive、Impala等等。
Apache Arrow在官網上是這樣定義的,Apache Arrow是一個跨語言、跨平台的內存數據結構。從這個定義中我們可以看到Apache Arrow與Apache Parquet以及Apache ORC的區別。Parquet與ORC設計的目的針對磁盤數據,在列存儲的基礎上使用了高效率的壓縮算法進行壓縮,比如使用snappy、gzip和zlib等算法對列數據進行壓縮。所以大部分情況下在數據讀取的時候需要首先對數據進行反壓縮,並有一定的cpu使用損耗。而Arrow,作為在內存中的數據,並不支持壓縮(當然寫入磁盤是支持壓縮的),Arrow使用dictionary-encoded來進行類似索引的操作。
除了列存儲外,Arrow在數據在跨語言的數據傳輸上具有相當大的威力,Arrow的跨語言特性表示在Arrow的規范中,作者指定了不同數據類型的layout,包括不同原始數據類型在內存中占的比特數,Array數據的組成以及Null值的表示等等。根據這些定義后,在不同的平台和不同的語言中使用Arrow將會采用完全相同的內存結構,因此在不同平台間和不同語言間進行高效數據傳輸成為了可能。在Arrow之前如果要對不同語言數據進行傳輸必須要使用序列化與反序列化技術來完成,耗費了大量的CPU資源和時間,而Arrow由於根據規范在內存中的數據結構一致,可以通過共享內存, 內存映射文件等技術來共享Arrow內存結構,省去了序列化與反序列過程。
Spark與Apache Arrow
介紹完Arrow的背景后,來看一下Apache Spark如何使用Arrow來加速PySpark處理的。一直以來,使用PySpark的客戶都在抱怨python的效率太低,導致了很多用戶轉向了使用Scala進行開發。這主要是由於Spark使用Scala語言開發,底層啟動的是JVM,而PySpark是Scala中PythonRDD對象啟動的一個Python子進程,Python與JVM的通信使用了Py4J, 通過Py4J Python程序能夠動態的訪問JVM中的Java對象,這一過程使用了linux pipe,在底層JVM需要對RDD進行序列化,在Python端需要對RDD進行反序列化,當數據量較大的時候效率遠不如直接使用Scala。流程如下圖。
很多數據科學家以及分析人員習慣使用python來進行處理,尤其是使用Pandas和Numpy庫來對數據進行后續處理,Spark 2.3以后引入的Arrow將會大大的提升這一效率。我們從代碼角度來看一下實現,在Spark 2.4版本的dataframe.py代碼中,toPandas的實現為:
if use_arrow: try: from pyspark.sql.types import _check_dataframe_convert_date, \ _check_dataframe_localize_timestamps import pyarrow batches = self._collectAsArrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) pdf = table.to_pandas() pdf = _check_dataframe_convert_date(pdf, self.schema) return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) except Exception as e: # We might have to allow fallback here as well but multiple Spark jobs can # be executed. So, simply fail in this case for now. msg = ( "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true, but has reached " "the error below and can not continue. Note that " "'spark.sql.execution.arrow.fallback.enabled' does not have an effect " "on failures in the middle of computation.\n %s" % _exception_message(e)) warnings.warn(msg) raise
如果使用了Arrow(Spark 2.4默認使用),比較重要的一行是_collectAsArrow(),_collectAsArrow()實現為:
def _collectAsArrow(self): """ Returns all records as a list of ArrowRecordBatches, pyarrow must be installed and available on driver and worker Python environments. .. note:: Experimental. """ with SCCallSiteSync(self._sc) as css: sock_info = self._jdf.collectAsArrowToPython() return list(_load_from_socket(sock_info, ArrowStreamSerializer()))
這里面使用了ArrowStreamSerializer(),而ArrowStreamSerializer定義為
class ArrowStreamSerializer(Serializer): """ Serializes Arrow record batches as a stream. """ def dump_stream(self, iterator, stream): import pyarrow as pa writer = None try: for batch in iterator: if writer is None: writer = pa.RecordBatchStreamWriter(stream, batch.schema) writer.write_batch(batch) finally: if writer is not None: writer.close() def load_stream(self, stream): import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: yield batch def __repr__(self): return "ArrowStreamSerializer"
可以看出在這里面,jvm對數據根據Arrow規范設置好內存數據結構進行列式轉化后,Python層面並不需要任何的反序列過程,而是直接讀取,這也是Arrow高效的原因之一。
對比看那一下如果不使用Arrow方法為:
def collect(self): """Returns all the records as a list of :class:`Row`. >>> df.collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] """ with SCCallSiteSync(self._sc) as css: sock_info = self._jdf.collectToPython() return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
序列化方法為PickleSerializer,需要對每一條數據使用PickleSerializer進行反序列化。
那如何通過這一特性來進行我們的開發呢,Spark提供了Pandas UDFs功能,即向量化UDF,Pandas UDF主要是通過Arrow將JVM里面的Spark DataFrame傳輸給Python生成pandas DataFrame,並執行用於定義的UDF。目前有兩種類型,一種是Scalar,一種是Grouped Map。
這里主要介紹一下Scalar Python UDFs的使用,以及可能的場景。 Scalar Python UDFs可以在select和withColumn中使用,他的輸入參數為pandas.Series類型,輸出參數為相同長度的pandas.Series。Spark內部會通過Arrow將列式數據根據batch size獲取后,批量的將數據轉化為pandas.Series類型,並在每個batch都執行用戶定義的function。最后將不同batch的結果進行整合,獲取最后的數據結果。
以下是官網的一個例子:
import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType # Declare the function and create the UDF def multiply_func(a, b): return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) # The function for a pandas_udf should be able to execute with local Pandas data x = pd.Series([1, 2, 3]) print(multiply_func(x, x)) # 0 1 # 1 4 # 2 9 # dtype: int64 # Create a Spark DataFrame, 'spark' is an existing SparkSession df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF df.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # | 1| # | 4| # | 9| # +-------------------+
首先定義udf,multiply_func,主要功能就是將a、b兩列的數據對應行數據相乘獲取結果。然后通過pandas_udf裝飾器生成Pandas UDF。最后使用df.selecct方法調用Pandas UDF獲取結果。這里面要注意的是pandas_udf的輸入輸出數據是向量化數據,包含了多行,可以根據spark.sql.execution.arrow.maxRecordsPerBatch來設置。
可以看出Pandas UDF使用非常簡單,只需要定義好Pandas UDF就可以了。有了Pandas UDF后我們可以很容易的將深度學習框架和Spark進行結合,比如在UDF中使用一些深度學習框架,比如scikit-learn,我們可以對批量的數據分別進行訓練。下面是一個簡單的例子,利用Pandas UDF來進行訓練:
# Load necessary libraries from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import * import pandas as pd from scipy.optimize import leastsq import numpy as np # Create the schema for the resulting data frame schema = StructType([StructField('ID', LongType(), True), StructField('p0', DoubleType(), True), StructField('p1', DoubleType(), True)]) # Define the UDF, input and outputs are Pandas DFs @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(sample_pd): # return empty params in not enough data if (len(sample_pd.shots) <= 1): return pd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [ 0 ], 'p1': [ 0 ]}) # Perform curve fitting result = leastsq(fit, [1, 0], args=(sample_pd.shots, sample_pd.hits)) # Return the parameters as a Pandas DF return pd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [result[0][0]], 'p1': [result[0][1]]}) # perform the UDF and show the results player_df = df.groupby('player_id').apply(analyze_player) display(player_df)
除此之外還可以使用TensorFlow和MXNet等與Spark進行融合,近期阿里雲EMR Data Science集群將會推出相應的功能,整合EMR Spark與深度學習框架之間調度與數據交換功能,希望大家關注。
https://www.codercto.com/a/68911.html
用ApacheArrow加速PySpark
內容簡介:Pandas、Numpy是做數據分析最常使用的Python包,如果數據存在Hadoop又想用Pandas做一些數據處理,通常會使用PySpark的可以看到,近500w數據的toPandas操作,開啟arrow后,粗略耗時統計從39s降低為2s。如何開啟arrow,就是spark.sql.execution.arrow.enabled=true這個配置了,spark2.3開始支持。
本文轉載自:http://tech.dianwoda.com/2019/04/01/yong-apachearrowjia-su-pyspark/,本站轉載出於傳遞更多信息之目的,版權歸原作者或者來源機構所有。
用ApacheArrow加速PySpark
https://www.codercto.com/a/68911.html
Pandas、Numpy是做數據分析最常使用的 Python 包,如果數據存在Hadoop又想用Pandas做一些數據處理,通常會使用PySpark的 DataFrame.toPandas() 這個方法。讓人不爽的是,這個方法執行很慢,數據量越大越慢。
做個測試
Using Python version 2.7.14 (default, Oct 5 2017 02:28:52) SparkSession available as 'spark'. >>> def test(): ... from pyspark.sql.functions import rand ... from better_utils import TimeUtil ... start = TimeUtil.now_unix() ... df = spark.range(1 << 22).toDF('id').withColumn("x", rand()) ... df.toPandas() ... cost = TimeUtil.now_unix() - start ... print "耗時:{}s".format(cost) ... >>> test() 耗時:39s >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> test() /Users/yulian/anaconda3/envs/python2/lib/python2.7/site-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " 耗時:2s >>>
可以看到,近500w數據的toPandas操作,開啟arrow后,粗略耗時統計從39s降低為2s。
如何開啟arrow,就是spark.sql.execution.arrow.enabled=true這個配置了,spark2.3開始支持。
另外需要安裝pip install pyarrow。
是什么
Arrow是一種跨語言的基於內存的列式數據結構。
在分布式系統內部,每個系統都有自己的內存格式,大量的 CPU 資源被消耗在序列化和反序列化過程中,並且由於每個項目都有自己的實現,沒有一個明確的標准,造成各個系統都在重復着復制、轉換工作,這種問題在微服務系統架構出現之后更加明顯,Arrow 的出現就是為了解決這一問題。作為一個跨平台的數據層,我們可以使用 Arrow 加快大數據分析項目的運行速度。

需要明確的是,Apache Arrow 不是一個引擎,也不是一個存儲系統,它是用來處理分層的列式內存數據的一系列格式和算法。
為什么
PySpark中使用DataFrame.toPandas()將數據從Spark DataFrame轉換到Pandas中是非常低效的。
Spark和Python基於Socket通信,使用serializers/deserializers交換數據。
Python的反序列化pyspark.serializers.PickleSerializer使用cPickle模塊的標准pickle格式。
Spark先把所有的行匯聚到driver上,然后通過初始轉換,以消除Scala和 Java 之間的任何不兼容性,使用Pyrolite庫的org.apache.spark.api.python.SerDeUtil.AutoBatchedPickler去把Java對象序列化成pickle格式。
然后序列化后的數據分批發送個Python的worker子進程,這個子進程會反序列化每一行,拼成一個大list;最后利用 pandas.DataFrame.from_records() 從這個list來創建一個Pandas DataFrame。
上面的過程有兩個明顯問題:
1)即使使用CPickle,Python的序列化也是一個很慢的過程。
2)利用 from_records 來創建一個 pandas.DataFrame 需要遍歷Python list,將每個value轉換成Pandas格式。
Arrow可以優化這幾個步驟:
1)一旦數據變成了Arrow的內存格式,就不再有序列化的需要,因為Arrow數據可以直接發送到Python進程。
2)當在Python里接收到Arrow數據后,pyarrow可以利用zero-copy技術,一次性的從整片數據來創建 pandas.DataFrame,而不需要輪詢去處理每一行記錄。另外轉換成Arrow數據的過程可以在JVM里並行完成,這樣可以顯著降低driver的壓力。
Arrow有點可以總結為:
* 序列化友好
* 向量化
序列化友好指的是,Arrow提供了一個內存格式,該格式本身是跨應用的,無論你放到哪,都是這個格式,中間如果需要網絡傳輸這個格式,那么也是序列化友好的,只要做下格式調整(不是序列化)就可以將數據發送到另外一個應用里。這樣就大大的降低了序列化開銷。
向量化指的是,首先Arrow是將數據按block進行傳輸的,其次是可以對立面的數據按列進行處理的。這樣就極大的加快了處理速度。
感興趣的話可以看下Python各種序列化方案的對比:
http://satoru.rocks/2018/08/fastest-way-to-serialize-array/