Spark目前支持三種開發語言:Scala、Java、Python,目前我們大量使用Python來開發Spark App(Spark 1.2開始支持使用Python開發Spark Streaming App,我們也准備嘗試使用Python開發Spark Streaming App),在這期間關於數據類型的問題曾經困擾我們很長時間,故在此記錄一下心路歷程。
Spark是使用Scala語言開發的,Hadoop是使用Java語言開發的,Spark兼容Hadoop Writable,而我們使用Python語言開發Spark (Streaming) App,Spark Programming Guides(Spark 1.5.1)其中有一段文字說明了它們相互之間數據類型轉換的關系:

也說是說,我們需要處理兩個方向的轉換:
(1)Writable => Java Type => Python Type;
(2)Python Type => Java Type => Writable;
其中Java Type與Python Type之間數據類型的轉換依賴開源組件Pyrolite,相應的數據類型轉換如下:
(1)Python Type => Java Type;

(2)Java Type => Python Type;

也就是說,Pyrolite已經為Java Type與Python Type之間的數據類型轉換建立了“標准”,我們僅僅需要處理Writable與Java Type之間的數據轉換就可以了。
從上圖“Writable Support”中可以看出PySpark已經為我們解決了常用的數據類型轉換問題,但可以理解為“基本”數據類型,遇到復雜的情況,還是需要我們特殊處理,PySpark已經為我們考慮到了這種業務場景,為我們提供接口Converter(org.apache.spark.api.python.Converter),使得我們可以根據自己的需要擴展數據類型轉換機制:

接口Converter僅僅只有一個方法convert,其中T表示源數據類型,U表示目標數據類型,參數obj表示源數據值,返回值表示目標數據值。
Spark Programming Guides(Spark 1.5.1)也為我們舉例說明了一個需要自定義Converter的場景:

ArrayWritable是Hadoop Writable的一種,因為Array涉及到元素數據類型的問題,因此使用時需要實現相應的子類,如元素數據類型為整型:

從上面的描述可知,PySpark使用ArrayWritable時涉及到如下兩個方向的數據類型轉換:
(1)Tuple => Object[] => ArrayWritable;
(2)ArrayWritable => Object[] => Tuple;
我們以IntArrayWritable為例說明如何自定義擴展Converter,同理也需要處理兩個方向的數據類型轉換:Tuple => Object[] => ArrayWritable、ArrayWritable => Object[] => Tuple。
(1)Tuple => Object[] => IntArrayWritable;
假設我們有一個list,list的元素類型為tuple,而tuple的元素類型為int,我們需要將這個list中的所有數據以SequenceFile的形式保存至HDFS。對於list中的每一個元素tuple,Pyrolite可以幫助我們完成Tuple => Object[]的轉換,而Object[] => IntArrayWritable則需要我們自定義Converter實現。

PySpark中使用這個Converter寫入數據:

注意:SequenceFile的數據結構為<key, value>,為了簡單起見,key指定為com.sina.dip.spark.converter.IntArrayWritable,value指定為org.apache.hadoop.io.NullWritable(即空值)。
運行上述程序時,因為有使用到我們自定義的類,因此需要將com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter編譯打包為獨立的Jar:converter.jar,並通過參數指定,如下:
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_save_data_to_seqfile.py
(2)IntArrayWritable => Object[] => Tuple;
我們需要將(1)中寫入SequenceFile的Key(IntArrayWritable)還原為list,其中list的元素類型為tuple,tuple的元素類型為int,IntArrayWritable => Object[]也需要用到我們自定義的Converter(Object[] => Tuple由Pyrolite負責):

PySpark使用這個Converter讀取數據:

同(1),我們需要將com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter編譯打包為獨立的Jar:converter.jar,並通過參數指定,如下:
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_read_data_from_seqfile.py
輸出結果:

可以看出,通過自定義擴展的Converter:com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter,我們實現了IntArrayWritable(com.sina.dip.spark.converter.IntArrayWritable)與Tuple(Python)之間的轉換。