Spark程序進行單元測試-使用scala


Spark 中進行一些單元測試技巧:
最近剛寫了一點Spark上的單元測試,大概整理了一些

rdd測試

spark程序一般從集群中讀取數據然后通過rdd進行轉換,這其中涉及到集群,每次修改bug,上傳到集群再運行測試,代價還是挺大;所以盡可能先本地進行單元測試,以減少在集群上運行時錯誤,特別是map等各種tranforms動作的邏輯錯誤;以下示例用於測試本地返回rdd相關的方法(利用spark本地模式進行單元測試)
Tips:

//定義一個簡單的wordcount
object WordCount extends Serializable{ def count(lines:RDD[String]): RDD[(String,Int)]={ val rdd=lines.flatMap(line=>line.split("\\s")).map(word=>(word,1)).reduceByKey(_ + _) rdd } }
//引入scalatest建立一個單元測試類,混入特質BeforeAndAfter,在before和after中分別初始化sc和停止sc, //初始化SparkContext時只需將Master設置為local(local[N],N表示線程)即可,無需本地配置或搭建集群,

class WordCountTests extends FlatSpec with BeforeAndAfter{ val master="local" //sparkcontext的運行master 
  var sc:SparkContext=_ it should("test success") in{ //其中參數為rdd或者dataframe可以通過通過簡單的手動構造即可
    val seq=Seq("the test test1","the test","the") val rdd=sc.parallelize(seq) val wordCounts=WordCount.count(rdd) wordCounts.map(p=>{ p._1 match { case "the"=> assert(p._2==3) case "test"=> assert(p._2==2) case "test1"=> assert(p._2==1) case _=> None } }).foreach(_=>()) } //這里before和after中分別進行sparkcontext的初始化和結束,如果是SQLContext也可以在這里面初始化
 before{ val conf=new SparkConf() .setAppName("test").setMaster(master) sc=new SparkContext(conf) } after{ if(sc!=null){ sc.stop() } } }

無返回值方法測試

有時候一個方法起到一個調用流程的作用,最后可能是輸出或者寫入某個文件而沒有返回值,一般單元測試可能是查看最后有沒有輸出文件,但是ide在本地可能不太好進行測試 
例如:

trait WriterHandle{ def writer(records:Seq[GenericRecord]):Unit={ val parquetWriter=... records.foreach(parquetWriter.writer(..)) } } //一個類處混入這個特質,經過一定轉換后將結果數據寫入parquet中
class ProcessHandle(objects:Iterator[T]) extends Serializable with WriterHandle{ def process():Unit={ val records:Seq[GenericRecord]=build(objects)={ ... } //這里調用了特質writer中的writer方法,實際單元測試運行到這里可能寫入的時候會出錯,不能正常測試通過
 writer(records) } }
class Writertests extends FlatSpec { it should("write success") in{ val objects=Seq(object1,object2..).toIterator //在new處理類,混入原先特質的一個子特質
    val process=new ProcessHandle(objects) with Writerhandletest } } //可以自定義一個trait繼承自原先的特質,通過將原先的方法覆蓋,然后在重寫后的方法里面的根據傳入值定義所需要斷言即可
trait Writerhandletest extends WriterHandle{ override def writer(records:Seq[GenericRecord]):Unit={ assert(records.length==N) assert(records(0).XX=="xxx") } }

如有必要也可以測試下私有方法:

理論上來說,私有方法都會被公有方法調用,調用公有方法也可以驗證私有方法,不過如果公有方法不方便測試也可以對某個私有方法進行測試,就看是否有必要 
可以測試如下:

class MyTest(s:String){ //此公有方法可能不方便測試
  def ():Unit={ ... doSth(s) } //這里私有方法,可能是邏輯關鍵所在,有必要測試
  private def doSth(s:String):String={ ... } }

編寫單元測試

//要混入PrivateMethodTester特質
class MytestTests extends FlatSpec with PrivateMethodTester{ it should("write success") in{ //首先new一個要測試的類
    val myTest=new MyTest("string") //其中通過PrivateMethod修飾,[]中為返回值, ('method)單引號后跟私有方法名 
    val dosth=PrivateMethod[String]('doSth)
       //通過invokePrivate 委托調用私有方法,注意參數要對,貌似傳null的話會找不到對應的方法
    val str=myTest invokePrivate dosth("string") //最后斷言相應的至即可
    asset(str=="string") } } 

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM