Spark PySpark數據類型的轉換原理—Writable Converter


Spark目前支持三種開發語言:Scala、Java、Python,目前我們大量使用Python來開發Spark App(Spark 1.2開始支持使用Python開發Spark Streaming App,我們也准備嘗試使用Python開發Spark Streaming App),在這期間關於數據類型的問題曾經困擾我們很長時間,故在此記錄一下心路歷程。
 
Spark是使用Scala語言開發的,Hadoop是使用Java語言開發的,Spark兼容Hadoop Writable,而我們使用Python語言開發Spark (Streaming) App,Spark Programming Guides(Spark 1.5.1)其中有一段文字說明了它們相互之間數據類型轉換的關系:
 
 
也說是說,我們需要處理兩個方向的轉換:
 
(1)Writable => Java Type => Python Type;
(2)Python Type => Java Type => Writable;
 
其中Java Type與Python Type之間數據類型的轉換依賴開源組件Pyrolite,相應的數據類型轉換如下:
 
(1)Python Type => Java Type;
 
 
(2)Java Type => Python Type;
 
 
也就是說,Pyrolite已經為Java Type與Python Type之間的數據類型轉換建立了“標准”,我們僅僅需要處理Writable與Java Type之間的數據轉換就可以了。
 
從上圖“Writable Support”中可以看出PySpark已經為我們解決了常用的數據類型轉換問題,但可以理解為“基本”數據類型,遇到復雜的情況,還是需要我們特殊處理,PySpark已經為我們考慮到了這種業務場景,為我們提供接口Converter(org.apache.spark.api.python.Converter),使得我們可以根據自己的需要擴展數據類型轉換機制:
 
 
接口Converter僅僅只有一個方法convert,其中T表示源數據類型,U表示目標數據類型,參數obj表示源數據值,返回值表示目標數據值。
 
Spark Programming Guides(Spark 1.5.1)也為我們舉例說明了一個需要自定義Converter的場景:
 
 
ArrayWritable是Hadoop Writable的一種,因為Array涉及到元素數據類型的問題,因此使用時需要實現相應的子類,如元素數據類型為整型:
 
 
從上面的描述可知,PySpark使用ArrayWritable時涉及到如下兩個方向的數據類型轉換:
 
(1)Tuple => Object[] => ArrayWritable;
(2)ArrayWritable => Object[] => Tuple;
 
我們以IntArrayWritable為例說明如何自定義擴展Converter,同理也需要處理兩個方向的數據類型轉換:Tuple => Object[] => ArrayWritable、ArrayWritable => Object[] => Tuple。
 
(1)Tuple => Object[] => IntArrayWritable;
 
假設我們有一個list,list的元素類型為tuple,而tuple的元素類型為int,我們需要將這個list中的所有數據以SequenceFile的形式保存至HDFS。對於list中的每一個元素tuple,Pyrolite可以幫助我們完成Tuple => Object[]的轉換,而Object[] => IntArrayWritable則需要我們自定義Converter實現。
 
 
PySpark中使用這個Converter寫入數據:
 
 
注意:SequenceFile的數據結構為<key, value>,為了簡單起見,key指定為com.sina.dip.spark.converter.IntArrayWritable,value指定為org.apache.hadoop.io.NullWritable(即空值)。
 
運行上述程序時,因為有使用到我們自定義的類,因此需要將com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter編譯打包為獨立的Jar:converter.jar,並通過參數指定,如下:
 
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_save_data_to_seqfile.py
 
(2)IntArrayWritable => Object[] => Tuple;
 
我們需要將(1)中寫入SequenceFile的Key(IntArrayWritable)還原為list,其中list的元素類型為tuple,tuple的元素類型為int,IntArrayWritable => Object[]也需要用到我們自定義的Converter(Object[] => Tuple由Pyrolite負責):
 
 
PySpark使用這個Converter讀取數據:
 
 
同(1),我們需要將com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter編譯打包為獨立的Jar:converter.jar,並通過參數指定,如下:
 
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_read_data_from_seqfile.py
 
輸出結果:
 
 
可以看出,通過自定義擴展的Converter:com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter,我們實現了IntArrayWritable(com.sina.dip.spark.converter.IntArrayWritable)與Tuple(Python)之間的轉換。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM