1. 背景介紹
將一份數據量很大的用戶屬性文件解析成結構化的數據供查詢框架查詢剖析,其中用戶屬性包含用戶標識,平台類型,性別,年齡,學歷,興趣愛好,購物傾向等等,大概共有七百個左右的標簽屬性。為了查詢框架能夠快速查詢出有特定標簽的人群,將最終的存儲結果定義為了將七百個左右的標簽屬性展平存儲為parquet文件,這樣每個標簽屬性對於用戶而言只有存在和不存在兩種情況。
2. 第一版實現過程
第一步,將用戶所有標簽標識作為一個資源文件保存到spark中,並讀取該資源文件的標簽標識為一個標簽集合(定義為listAll),並通過sparkContext來進行廣播;
第二步,使用spark core讀取hdfs上的用戶屬性文件(其中每行是一個用戶所擁有的標簽),將單個用戶所擁有的標簽解析成一個標簽集合(定義為listUser),也就是說listUser是listAll的一個子集;
第三步,對於單個用戶而言,遍歷步驟一的結果集listAll,對於每一個標簽判斷該用戶是否存在,如果存在則將標簽設置為1(表示存在),否則設置為0(表示不存在),並將所有標簽及相應的值保存為一個Map(定義為map)
第四步,根據第三步的map構造成spark sql中的Row
第五步,依據第一步的集合listAll構造出spark sql的Schema
第六步,將第四步和第五步的結果通過spark sql的createDataFrame構造成DataFrame。
第七步,通過DataFrame.write.parquet(output)將結果保存到hdfs上
通過上述的七步,認為已經很easy的處理完了這個需求,但是真正測試時發現性能比想象的要慢的多,嚴重的達不到性能要求。對於性能影響究竟出現在什么地方?初步猜測,問題出現在第四步,第六步,第七步的可能性比較大。
經過實際的測試,發現性能主要消耗在第七步,其他步驟的執行都特別快。這樣也就定位到了問題。
而且通過測試知道,生成parquet消耗的性能最高,生成json的話很快就能完成,如果不生成任何對象,而是直接foreach執行的話,性能會更高。而且相同數據量下,如果列數在七百多個時,json寫入時間是parquet寫入時間的三分之一,如果列數在四百個時,json寫入時間是parquet寫入時間的二分之一,如果列數在五十個,json寫入時間是parquet寫入時間的三分之二。也就是列數越少,json和parquet的寫入速度越接近。至於為什么生成parquet性能很差,待后續分析spark sql的save方法。
測試的例子
private def CTRL_A = '\001' private def CTRL_B = '\002' private def CTRL_C = '\003' def main(args: Array[String]): Unit = { val resourcePath = this.getClass.getResource("/resource.txt").getFile val sourcePath = this.getClass.getResource("/*.gz").getFile val output = "/home/dev/output" val conf = new SparkConf().setAppName("user test").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") val map: Map[String, String] = buildResource(resourcePath) val schema = buildSchema(map) val bd = sc.broadcast(map) val bdSchema = sc.broadcast(schema) val start=System.currentTimeMillis() val rdd = sc.textFile(sourcePath) .map(line => { val map = buildUser(line, bd.value) buildRow(map._3, map._1, map._2) }) // rdd.foreach(_=>()) // sqlContext.createDataFrame(rdd, bdSchema.value).write.mode(SaveMode.Overwrite).json(output) sqlContext.createDataFrame(rdd, bdSchema.value).write.mode(SaveMode.Overwrite).parquet(output) val end = System.currentTimeMillis() System.out.print(end - start) } /** * 讀取資源文件 * @param file * @return */ def buildResource(file: String): Map[String, String] = { val reader = Source.fromFile(file) val map = new mutable.HashMap[String, String]() for (line <- reader.getLines() if !Strings.isNullOrEmpty(line)) { val arr = StringUtils.splitPreserveAllTokens(line, '\t') map.+=((arr(0), "0")) } map.toMap } /** * 生成用戶屬性 * @param line * @param map * @return */ def buildUser(line: String, map: Map[String, String]): (String, Int, Map[String, String]) = { if (Strings.isNullOrEmpty(line)) { return ("", 0, Map.empty) } val array = StringUtils.splitPreserveAllTokens(line, CTRL_A) val cookie = if (Strings.isNullOrEmpty(array(0))) "-" else array(0) val platform = array(1).toInt val base = buildFeature(array(2)) val interest = buildFeature(array(3)) val buy = buildFeature(array(4)) val features = base ++ interest ++ buy val result = new mutable.HashMap[String, String]() for (pair <- map) { val value = if (features.contains(pair._1)) "1" else "0" result.+=((pair._1, value)) } (cookie, platform, result.toMap) } /** * 抽取用戶標簽 * @param expr * @return */ def buildFeature(expr: String): Array[String] = { if (Strings.isNullOrEmpty(expr)) { return Array.empty } val arr = StringUtils.splitPreserveAllTokens(expr, CTRL_B) val buffer = new ArrayBuffer[String]() for (key <- arr) { val pair = StringUtils.splitPreserveAllTokens(key, CTRL_C) buffer += (s"_${pair(0)}") } buffer.toArray } /** * 動態生成DataFrame的Schema * @param map * @return */ def buildSchema(map: Map[String, String]): StructType = { val buffer = new ArrayBuffer[StructField]() buffer += (StructField("user", StringType, false)) buffer += (StructField("platform", IntegerType, false)) for (pair <- map) { buffer += (StructField(s"_${pair._1}", IntegerType, true)) } return StructType(List(buffer: _*)) } /** * 將用戶屬性構造成Spark SQL的Row * @param map * @param user * @param platform * @return */ def buildRow(map: Map[String, String], user: String, platform: Int): Row = { val buffer = new ArrayBuffer[Any]() buffer += (user) buffer += (platform) for (pair <- map) { buffer += (pair._2.toInt) } return Row(buffer: _*) }
3. 第二版實現過程
在第一版中初步懷疑是DataFrame在生成parquet時進行了一些特殊邏輯的處理,所以決定自己實現ParquetWriter方法來測試下性能,采用了avro來向parquet中寫入數據。方法大概包含定義好avro資源文件,然后使用AvroParquetWriter類來向parquet中寫入內容,具體的寫入方法類似於https://blog.csdn.net/gg584741/article/details/51614752。通過這種方式來寫入parquet,相同數據量的情況下,性能提升了一倍多。至於為什么性能有這么大的提升,有待后續研究。到此優化就告一段落了。
在此優化期間,遇到了下列問題:
1. avro 的資源文件在生成java類時,屬性限制必須255個一下。該限制在https://issues.apache.org/jira/browse/AVRO-1642 提到。
2. java 類屬性和方法參數也需要小於255個,詳見https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.11,https://stackoverflow.com/questions/30581531/maximum-number-of-parameters-in-java-method-declaration
對於上述顯示的解決方案是在maven配置文件中不適用avro-maven-plugin插件來自動生成java類,而是在程序運行時通過
val Schema = (new Schema.Parser()).parse(new File(file))
來動態生成Schema來供后續AvroParquetWriter使用。
