最近加入一個Spark項目,作為臨時的開發人員協助進行開發工作。該項目中不存在測試的概念,開發人員按需求進行編碼工作后,直接向生產系統部署,再由需求的提出者在生產系統檢驗程序運行結果的正確性。在這種原始的工作方式下,產品經理和開發人員總是在生產系統驗證自己的需求、代碼。可以想見,各種直接交給用戶的錯誤導致了一系列的事故和不信任。為了處理各類線上問題,大家都疲於奔命。當工作進行到后期,每一個相關人都已經意氣消沉,常常對工作避之不及。
為了改善局面,我嘗試了重構部分代碼,將連篇的SQL分散到不同的方法里,並對單個方法構建單元測試。目的是,在編碼完成后,首先在本地執行單元測試,以實現:
- 部署到生產系統的代碼中無SQL語法錯誤。
- 將已出現的bug寫入測試用例,避免反復出現相同的bug。
- 提前發現一些錯誤,減少影響到后續環節的問題。
- 通過自動化減少開發和程序問題處理的總時間花費。
- 通過流程和結果的改善,減少開發人員的思維負擔,增加與其他相關人的互信。
本文將介紹我的Spark單元測試實踐,供大家參考、批評。
本文中的Spark API是PySpark,測試框架為pytest。
對於希望將本文當作單元測試教程使用的讀者,本文會假定讀者已經准備好了開發和測試所需要的環境。如果沒有也沒有關系,文末的參考部分會包含一些配置環境相關的鏈接。
本文鏈接:https://www.cnblogs.com/hhelibeb/p/10534862.html
原創內容,轉載請注明
概念
定義
單元測試是一種測試方法,它的對象是單個程序單元/組件,目的是驗證軟件的每個組件都符合設計要求。
單元是軟件中最小的可測試部分。它通常包含一些輸入和單一的輸出。
本文中的單元就是python函數(function)。
單元測試通常是程序開發人員的工作。
原則
為了實現單元測試,函數最好符合一個條件,
- 對於相同的輸入,函數總有相同的輸出。
這要求函數的輸出結果不依賴內外部狀態。
它的輸出結果的確定不應該依賴輸入參數外的任何內容,例如,不可以因為本地測試環境中沒有相應的數據庫就產生“連接數據庫異常”導致無法返回結果。如果是類方法的話,也不可以依據一個可能被改變的類屬性來決定輸出。
同時,函數內部不能存在“副作用”。它不應該改變除了返回結果以外的任何內容,例如,不可以改變全局可變狀態。
滿足以上條件的函數,可以被稱為“純函數”。
代碼實踐
下面是數據和程序部分。
數據
假設我們的服務對象是一家水果運銷公司,公司在不同城市設有倉庫,現有三張表,其中inventory包含水果的總庫存數量信息,inventory_ratio包含水果在不同城市的應有比例,
目標是根據總庫存數量和比例算出水果在各地的庫存,寫入到第三張表inventory_city中。三張表的列如下,
1. inventory. Columns: “item”, “qty”.
2. inventory_ratio. Columns: “item”, “city”, “ratio”.
3. inventory_city. Columns: “item”, “city”, “qty”.
第一版代碼
用最直接的方式實現這一功能,代碼將是,
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''') result.write.csv(path="somepath/inventory_city", mode="overwrite")
這段代碼可以實現計算各城市庫存的需求,但測試起來會不太容易。特別是如果未來我們還要在這個程序中增加其他邏輯的話,不同的邏輯混雜在一起后,測試和修改都會變得麻煩。
所以,在下一步,我們要將部分代碼封裝到一個函數中。
有副作用的函數
創建一個名為get_inventory_city的函數,將代碼包含在內,
from pyspark.sql import SparkSession def get_inventory_city(): spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''')
result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()
顯然,這是一個不太易於測試的函數,因為它,
- 沒有輸入輸出參數,不能直接根據給定數據檢驗運行結果。
- 包含對數據庫的讀/寫,這意味着它要依賴外部數據庫。
- 包含對spark session的獲取/創建,這和計算庫存的邏輯也毫無關系。
我們把這些函數中的多余的東西稱為副作用。副作用和函數的核心邏輯糾纏在一起,使單元測試變得困難,也不利於代碼的模塊化。
我們必須另外管理副作用,只在函數內部保留純邏輯。
無副作用的函數
按照上文中提到的原則,重新設計函數,可以得到,
from pyspark.sql import SparkSession, DataFrame def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame): inventory.createOrReplaceTempView('v_inventory') ratio.createOrReplaceTempView('v_ratio') result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''') return result if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() inventory = spark.sql('''select * from inventory''') ratio = spark.sql('''select * from inventory_ratio''') result = get_inventory_city(spark, inventory, ratio) result.write.csv(path="somepath/inventory_city", mode="overwrite")
修改后的函數get_inventory_city有3個輸入參數和1個返回參數,函數內部已經不再包含對spark session和數據庫表的處理,這意味着對於確定的輸入值,它總會輸出不變的結果。
這比之前的設計更加理想,因為函數只包含純邏輯,所以調用者使用它時不會再受到副作用的干擾,這使得函數的可測試性和可組合性得到了提高。
測試代碼
創建一個test_data目錄,將csv格式的測試數據保存到里面。測試數據的來源可以是手工模擬制作,也可以是生產環境導出。
然后創建測試文件,添加代碼,
from inventory import get_inventory_city from pyspark.sql import SparkSession spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() def test_get_inventory_city(): #導入測試數據 inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv") ratio = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv") #執行函數 result = get_inventory_city(spark, inventory, ratio) #驗證拆分后的總數量等於拆分前的總數量 result.createOrReplaceTempView('v_result') inventory.createOrReplaceTempView('v_inventory') qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''') qty_after_split = spark.sql('''select sum(qty) as qty from v_result''') assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']
執行測試,可以看到以下輸出內容
============================= test session starts =============================
platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item
test_get_inventory_city.py .2019-03-21 14:16:24 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
[100%]
========================= 1 passed in 18.06 seconds ==========================
這樣一個單元測試例子就完成了。
相比把程序放到服務器測試,單元測試的運行速度更快,開發者不用再擔心測試會對生產作業和用戶造成影響,也可以更早發現在編碼期間犯下的錯誤。它也可以成為自動化測試的基礎。
待解決的問題
目前我已經可以在項目中構建初步的單元測試,但依然面臨着一些問題。
運行時間
上面這個簡單的測試示例在我的聯想T470筆記本上需要花費18.06秒執行完成,而實際項目中的程序的復雜度要更高,執行時間也更長。執行時間過長一件糟糕的事情,因為單元測試的執行花費越大,就會越被開發者拒斥。面對顯示器等待單元測試執行完成的時間是難捱的。雖然相比於把程序丟到生產系統中執行,這種單元測試模式已經可以節約不少時間,但還不夠好。
接下來可能會嘗試的解決辦法:提升電腦配置/改變測試數據的導入方式。
有效范圍
在生產實踐中構建純函數是一件不太容易的事情,它對開發者的設計和編碼能力有相當的要求。
單元測試雖然能幫助發現一些問題和確定問題代碼范圍,但它似乎並不能揭示錯誤的原因。只靠單元測試,不能完全證明代碼的正確性。
筆者水平有限,目前寫出的代碼中仍有很多單元測試力所不能及的地方。可能需要在實踐中對它們進行改進,或者引入其它測試手段作為補充。
參考
一些參考內容。
配置
Getting Started with PySpark on Windows
閱讀