Spark SQL使用時需要有若干“表”的存在,這些“表”可以來自於Hive,也可以來自“臨時表”。如果“表”來自於Hive,它的模式(列名、列類型等)在創建時已經確定,一般情況下我們直接通過Spark SQL分析表中的數據即可;如果“表”來自“臨時表”,我們就需要考慮兩個問題:
(1)“臨時表”的數據是哪來的?
(2)“臨時表”的模式是什么?
通過Spark的官方文檔可以了解到,生成一張“臨時表”需要兩個要素:
(1)關聯着數據的RDD;
(2)數據模式;
也就是說,我們需要將數據模式應用於關聯着數據的RDD,然后就可以將該RDD注冊為一張“臨時表”。在這個過程中,最為重要的就是數據(模式)的數據類型,它直接影響着Spark SQL計算過程以及計算結果的正確性。
目前pyspark.sql.types支持的數據類型:NullType、StringType、BinaryType、BooleanType、DateType、TimestampType、DecimalType、DoubleType、FloatType、ByteType、IntegerType、LongType、ShortType、ArrayType、MapType、StructType(StructField),其中ArrayType、MapType、StructType我們稱之為“復合類型”,其余稱之為“基本類型”,“復合類型”在是“基本類型”的基礎上構建而來的。
這些“基本類型”與Python數據類型的對應關系如下:
NullType | None |
StringType | basestring |
BinaryType | bytearray |
BooleanType | bool |
DateType | datetime.date |
TimestampType | datetime.datetime |
DecimalType | decimal.Decimal |
DoubleType | float(double precision floats) |
FloatType | float(single precision floats) |
ByteType | int(a signed integer) |
IntegerType | int(a signed 32-bit integer) |
LongType | long(a signed 64-bit integer) |
ShortType | int(a signed 16-bit integer) |
下面我們分別介紹這幾種數據類型在Spark SQL中的使用。
1. 數字類型(ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType、DecimalType)
數字類型可分為兩類,整數類型:ByteType、ShortType、IntegerType、LongType,使用時需要注意各自的整數表示范圍;浮點類型:FloatType、DoubleType、DecimalType,使用時不但需要注意各自的浮點數表示范圍,還需要注意各自的精度范圍。
我們以常見的數據類型IntegerType來說明數字類型的使用方法:

a. 模擬“一行兩列”的數據,並通過parallelize方法將其轉換為一個RDD source,這個RDD就是關聯着數據的RDD;
b. 創建數據模式,需要分別為這兩列指定列名、列類型、可否包含空(Null)值;其中模式需要使用StructType表示,每一列的各個屬性(列名稱、列類型、可否包含空(Null)值)需要使用StructField表示;第一列的列名為col1,列類型為IntegerType,不可包含空(Null)值(False);第二列的列名為col2,列類型為IntegerType,不可包含空(Null)值(False);(注意:實際使用中每列的數據類型並不一定相同)
c. 通過applySchema方法將數據模式schema應用於RDD source,這會產生一個SchemaRDD(具有模式的RDD) table;
d. 將SchemaRDD table注冊為一張表:temp_table;
到此我們就完成了創建RDD、創建Schema、注冊Table的整個過程,接下來就可以使用這張表(temp_table)通過Spark(Hive) SQL完成分析。其它數字類型的使用方式類似。
實際上本例中“一行兩列”的數據實際就是IntergerType的表示范圍:[-2147483648, 2147483647],其它數字類型的表示范圍如下:
ByteType | [-128, 127] |
ShortType | [-32768, 32767] |
IntegerType | [-2147483648, 2147483647] |
LongType | [-9223372036854775808, 9223372036854775807] |
FloatType | [1.4E-45, 3.4028235E38] |
DoubleType | [4.9E-324, 1.7976931348623157E308] |
可以看出,雖然我們使用Python編寫程序,這些數據類型的表示范圍與Java中的Byte、Short、Integer、Long、Float、Double是一致的,因為Spark是Scala實現的,而Scala運行於Java虛擬機之上,因此Spark SQL中的數據類型ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType、DecimalType在運行過程中對應的數據實際上是由Java中的Byte、Short、Integer、Long、Float、Double表示的。
在使用Python編寫Spark Application時需要牢記:為分析的數據選擇合適的數據類型,避免因為數據溢出導致輸入數據異常,但這僅僅能夠解決數據輸入的溢出問題,還不能解決數據在計算過程中可能出現的溢出問題。
我們將上述例子中的示例數據修改為(9223372036854775807, 9223372036854775807),數據類型修改為LongType,現在的示例數據實際是LongType所能表示的最大值,如果我們將這兩例值相加,是否會出現溢出的情況呢?

輸出結果:

可以看出,實際計算結果與我們預想的完全一樣,這是因為col1與col2的類型為LongType,那么col1 + col2的類型也應為LongType(原因見后),然而col1 + col2的結果值18446744073709551614已經超過LongType所能表示的范圍([-9223372036854775808, 9223372036854775807]),必然導致溢出。
因為我們使用的是HiveContext(SQLContext目前不被推薦使用),很多時候我們會想到使用“bigint”,

輸出結果依然是:

要解釋這個原因,需要了解一下Hive中數字類型各自的表示范圍:

通過比對可以發現Hive BIGINT的表示范圍與LongType是一致的,畢竟Hive是Java實現的,因此我們可以猜想Hive tinyint、smallint、int、bigint、float、double與Java Byte、Short、Integer、Long、Float、Double是一一對應的(僅僅是猜想,並沒有實際查看源碼驗證),所以我們將LongType的數據類型轉換為BIGINT的方式是行不通的,它們的數值范圍是一樣的。
那么我們應該如何解決溢出問題呢?注意到Hive Numeric Types中的最后一個數字類型:DECIMAL,從Hive 0.11.0引入,Hive 0.13.0開始支持用戶可以自定義“precision”和“scale”。Decimal基於Java BigDecimal實現,可以表示不可變的任務精度的十進制數字,支持常規的數學運算(+,-,*,/)和UDF(floor、ceil、round等),也可以與其它數字類型相互轉換(cast)。使用示例如下:

使用Decimal時需要注意“precision”和“scale”值的選取,Java BigDecimal(BigInteger,后續會提到)取值范圍理論上取決於(虛擬)內存的大小,可見它們是比較消耗內存資源的,因此我們需要根據我們的實際需要為它們選取合適的值,並且需要滿足下述條件:
整數部分位數(precision - scale) + 小數部分位數(scale) = precision
LongType所能表示的最大位數:19,因為在我們的示例中會導致溢出問題,因此我們將數值轉換為Decimal,並指定precision為38,scale為0,這樣我們便可以得到正確的結果:

需要注意的是計算結果類型也變成decimal.Decimal(Python),使用Python編寫Spark Application時,pyspark也提供了DecimalType,它是一種比較特殊的數據類型,它不是Python內建的數據類型,使用時需要導入模塊decimal,使用方式如下:

使用數據類型DecimalType時有兩個地方需要注意:
(1)創建RDD時需要使用模塊decimal中的Decimal生成數據;
(2)DecimalType在Spark 1.2.0環境下使用時會出現異常:java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal,在Spark 1.5.0環境下可以正常使用,但需要將模塊名稱由“pyspark.sql”修改為“pyspark.sql.types”。
我們明確指定數據的類型是什么,那么什么決定我們常規數學運算(+,-,*,/)之后的結果類型呢?這些數學運行在Hive中實際都是由UDF實現的(org.apache.hadoop.hive.ql.exec.FunctionRegistry),

(1)+

(2)-

(3)*

(4)/

(5)%

可以看出,“+”,“-”,“*”,“%”通過重載支持的數據類型:byte、short、int、long、float、double、decimal,“/”通過重載僅僅支持數據類型:double、decimal,計算的結果類型與輸入類型是相同的,這也意味着:
(1)數學運算“+”、“-”,“*”,“%”時可能會出現隱式轉換(如int + long => long + long);
(2)數學運算“/”則統一將輸入數據轉換為數據類型double或decimal進行運算,這一點也意味着,計算結果相應地為數據類型double或decimal。
2. 時間類型(DateType,TimestampType)
DateType可以理解為年、月、日,TimestampType可以理解為年、月、日、時、分、秒,它們分別對着着Python datetime中的date,datetime,使用示例如下:

輸出結果:

3. StringType、BooleanType、BinaryType、NoneType
這幾種數據類型的使用方法大致相同,就不一一講解了,注意BinaryType對應着使用了Python中的bytearray。

輸出結果:

4. 復合數據類型(ArrayType、MapType、StructType)
復合數據類型共有三種:數組(ArrayType)、字典(MapType)、結構體(StructType),其中數組(ArrayType)要求數組元素類型一致;字典(MapType)要求所有“key”的類型一致,所有“value”的類型一致,但“key”、“value”的類型可以不一致;結構體(StructType)的元素類型可以不一致。
(1)ArrayType
ArrayType要求指定數組元素類型。


(2)MapType
MapType要求指定鍵(key)類型和值(value)類型。


(3)StructType
StructType包含的元素類型可不一致,需要根據元素的次序依次為其指定合適的名稱與數據類型。


綜上所述,Spark(Hive)SQL為我們提供了豐富的數據類型,我們需要根據分析數據的實際情況為其選取合適的數據類型(基本類型、復合類型)、尤其是數據類型各自的表示(精度)范圍以及數據溢出的情況處理。