使用Spark SQL的基礎是“注冊”(Register)若干表,表的一個重要組成部分就是模式,Spark SQL提供兩種選項供用戶選擇:
(1)applySchema

applySchema的方式需要用戶編碼顯示指定模式,優點:數據類型明確,缺點:多表時有一定的代碼工作量。
(2)inferSchema

inferSchema的方式無需用戶編碼顯示指定模式,而是系統自動推斷模式,代碼比較簡潔,但既然是推斷,就可能出現推斷錯誤(即與用戶期望的數據類型不匹配的情況),所以我們需要對其推斷過程有清晰的認識,才能在實際應用中更好的應用。
本文僅僅針對Python(spark-1.5.1)進行介紹,推斷過程是依賴SQLContext(HiveContext是SQLContext的子類) inferSchema實現的:

SQLContext inferSchema已經在1.3版本中被棄用,取而代之的是createDataFrame,inferSchema仍然可以在1.5.1版本中被使用,其實際執行過程就是SQLContext createDataFrame,這里需要注意一個參數samplingRation,它的默認值為None,后續會討論它的具體作用。


這里我們僅僅考慮從RDD推斷數據類型的情況,也就是isinstance(data, RDD)為True的情況,代碼執行流程轉入SQLContext _createFromRDD:

從上述的代碼調用邏輯可以看出,schema為None,代碼執行流程轉入SQLContext _inferSchema:

SQLContext _inferSchema的主要流程大致分為三步:
第一步:獲取RDD的第一行記錄first,而且要求first不能為空值(注意不是None);
第二步:如果first的類型為“dict”,會輸出一條警告信息:推斷模式時建議RDD的元素類型為Row(pyspark.sql.Row),dict已被棄用;
第三步:如果samplingRatio為None,則直接使用first(也就是RDD的第一條記錄)推斷模式;如果samplingRation不為None,則根據該值“篩選”數據推斷模式。
我們將着重介紹第三步的實現邏輯。
1. samplingRatio is None

_infer_schema使用一行記錄row(也就是RDD的第一行記錄)推斷模式,大致分為四個步驟:
(1)如果記錄row的數據類型為dict;


由此我們可以得出items實際就是一個鍵值對列表,其中鍵值對也可以理解為(列名,列值);之所以要進行排序操作(sorted)是為了保證列名順序的一致性(dict.items()並不負責返回的列表元素順序)。
(2)如果記錄row的數據類型為tuple或list,可以細分為三種情況:
a. row的數據類型為Row,模擬處理過程:


b. row的數據類型為namedtuple,模擬處理過程:


c. row的數據類型為其它(tuple or tuple),模擬處理過程:


(3)如果記錄row的數據類型為object;


由(1)、(2)、(3)可以看出,它們最終的邏輯是一致的,就是將記錄row轉換為一個鍵值對列表;如果(1)、(2)、(3)均不匹配,則認為無法推斷,拋出異常即可。
(4)創建模式(StructType)
items中的每一個鍵值對會對應着形成一個StructField,StructField用於描述一個列的模式,它接收三個參數:列名、列類型、可否包含None;列名就是“鍵”,列類型則需要根據“值”推斷(_infer_type),這里默認設置可以包含None。
迭代items中的這些鍵值對會形成一個StructField列表,最后通過StructType創建模式。
這是根據RDD的一行記錄創建模式的過程,這其中還沒有涉及具體的數據類型是如何被推斷的,我們還需要看一下_infer_type:

_infer_type就是根據傳入的obj來推斷類型的,返回值為類型實例,需要處理以下六種情況:
(1)如果obj為None,則類型為NullType;
(2)真的沒有理解,不解釋;
(3)嘗試根據type(obj)直接從_type_mappings中獲取對應的類型信息dataType,_type_mappings就是一個字典,預先保留着一些Python類型與Spark SQL數據類型的對應關系,如下:

如果dataType不為None,則直接返回相應類型的實例即可;需要特殊處理的是DecimalType,考慮到實際數據中可能存在precision和scale不一致的情況,這里統一處理為precision:38,scale:18;如果dataType為None,則表明obj為復合數據類型(數組、字典、結構體)。
(4)如果obj的數據類型為dict,我們需要分別推斷它的鍵類型(遞歸調用_infer_type)、值類型(遞歸調用_infer_type),然后構造MapType實例並返回;
推斷鍵、值類型時,僅僅選取某一個鍵值對:它的鍵、值均不為None,如果存在多個這樣的鍵值對,則選取是隨機的,取決於dict.items();如果找不到這樣的鍵值對,則認為鍵、值的類型均為NullType。
(5)如果obj的數據類型為list或array,則選取其中某一個不為None的元素推斷其類型(遞歸調用_infer_type);如果找不到不為None的元素,則認為元素類型為NullType;最后構造ArrayType實例並返回;
(6)如果(1)、(2)、(3)、(4)、(5)均無法完成推斷,則我們認為obj可能(僅僅是可能)是一個結構體類型(StructType),使用_infer_schema推斷其類型;
2. samplingRatio is not None
samplingRatio為None時,則僅僅選取RDD的第一行記錄參與推斷,這就對這一行記錄的“質量”提出很高的要求,某些情況下它無法代表全局,此時我們可以通過顯示設置samplingRatio,“篩選”足夠多的數據參與推斷過程。
如果samplingRatio的值小於0.99,則使用RDD sample API根據samplingRatio“篩選”部分數據(rdd)參與推斷;否則整個RDD(rdd)的所有記錄參與推斷。
推斷過程可以簡單理解為兩步:
(1)對於RDD中的每一行記錄通過方法_infer_schema推斷出一個類型(map);
(2)將這些類型進行聚合(reduce)。
我們着重看一下聚合的實現邏輯:

聚合的實現邏輯由方法_merge_type完成,需要處理六種情況:
(1)如果a是NullType的實例,則返回b的類型;
(2)如果a不是NullType的實例,b是NullType的實例,則返回a的類型;
(3)如果a和b的類型不相同,則拋出異常;
以下處理過程基於a和b的類型相同。
(4)如果a的類型為StructType(結構體),則以a中的各個元素為模板合並類型(遞歸調用_merge_type),並追加b-a(差集)的元素(類型);
(5)如果a的類型為ArrayType(數組),則合並(遞歸調用_merge_type)兩者的元素類型即可;
(6)如果a的類型為MapType(字典),則需要分別合並兩者的鍵類型(遞歸調用_merge_type)、值類型(遞歸調用_merge_type)。
個人覺得目前的類型聚合邏輯過於簡單,實際使用意義不大。