1.寫在前面
Spark是專為大規模數據處理而設計的快速通用的計算引擎,在計算能力上優於MapReduce,被譽為第二代大數據計算框架引擎。Spark采用的是內存計算方式。Spark的四大核心是Spark RDD(Spark
core),SparkSQL,Spark Streaming,Spark ML。而SparkSQL在基於Hive數倉數據的分布式計算上尤為廣泛。本編博客主要介紹基於Java API的SparkSQL的一些用法建議和利用Spark處理各種大數據計算的性能優化建議
2.SparkSQL的一些用法及建議
實例化SparkSSql的SparkSession,Spark2.0之后都是利用SparkSession來進行SparkSQL,2.0以前是利用SparkSQLContext
SparkConf conf = new SparkConf().setAppName("sql-app");
setSparkConf(parameterParse, conf);
JavaSparkContext jsc = new JavaSparkContext(conf);
SparkSession.clearDefaultSession();
SparkSession session = SparkSession.builder().appName(parameterParse.getSpark_app_name())
.config("hive.metastore.uris", parameterParse.getHive_metastore_uris())
.config("spark.sql.warehouse.dir", parameterParse.getHive_metastore_warehouse_dir())
.enableHiveSupport().getOrCreate();
實例化之后直接執行Hive查詢語句
session.sql("select name,age,job from employee").foreachPartition(iterator -> {
while (iterator.hasNext()) {
Row row = iterator.next();
//邏輯處理
}
});
注意:
a.這里的查詢語句只要在hive命令能正確執行的都可以
b.這里的foreachPartition相當於對數據的一個遍歷,iterator得到對數據rdd集遍歷的一個迭代器,Row就是hive的每行數據,例如['Tom','25','軟件工程師'],遍歷之后可以對數據做任何的邏輯操作,也可以寫到其他組件如hbase,mysql中。
建議:
a.假設將hive數據利用SparkSQL查詢出來寫入mysql中,對於小部分數據不建議在遍歷rdd遍歷,創建連接寫入mysql,類似如下寫法
session.sql("select min(age),max(age)from employee").foreachPartition(iterator -> {
Connection conn = MysqlConnectionPool.getConnection(mysqlConnectBroadcast.getValue(), mysqlUserBroadcast.getValue(), mysqlPasswordBroadcast.getValue());
String maxRecordPositionValue = "";
while (iterator.hasNext()) {
Row row = iterator.next();
min = row.get(0).toString();
max = row.get(1).toString();
}
String sql = "update employee_age set max_age=? where min_age=?";
PreparedStatement ps = conn.prepareStatement(sql);
ps.setInt(1, Integer.parseInt(min));
ps.setString(2, Integer.parseInt(max));
ps.executeUpdate();
MysqlConnectionPool.release(ps, conn);
});
這樣寫法會導致每個task創建mysql連接,如果在session.sql("select min(age),max(age)from employee").foreachPartition()外部也有寫入mysql的邏輯,這樣會造成mysql連接過多,從而導致在PreparedStatement,ResultSet錯亂甚至會出現Sql異常。建議利用collectAsList()
這個方法來實現,類似如下寫法
List<Row> minAndMaxAgeList = session.sql("select min(age),max(age)from employee").collectAsList()
min = minAndMaxAgeList.get(0).get(0).toString();
max = minAndMaxAgeList.row.get(1).toString();
這里得到的值只有兩個,對於session.sql()
查大數據,如查詢幾億條數據,首先第一種方法更加不合適,會造成不斷的建立連接,關閉連接,同時寫入太過頻繁導致mysql崩潰。其次大數據量應該避免這樣寫到mysql,不管是第一種還是第二種方法。因為第二種方法可能導致GC崩潰。
b.如果實在要在foreachPartition
里寫,建議mysql的鏈接方式應該通過廣播遍歷傳入
final Broadcast<String> mysqlConnectBroadcast = jsc.broadcast(parameterParse.getMysql_conn());
final Broadcast<String> mysqlUserBroadcast = jsc.broadcast(parameterParse.getMysql_user());
final Broadcast<String> mysqlPasswordBroadcast = jsc.broadcast(parameterParse.getMysql_password());
c.對於Row這個SparkSQL特殊的對應,很多新手會直接row.toString()
得到一個數組字符串,再對數組字符串做數組切割,這種用法在邏輯上不存在問題,但是在大數據中,數據可能是多種多樣的,這樣寫法會造成數據錯亂問題,應該用如下寫法
min = row.get(0).toString();
max = row.get(1).toString();
row本質上是一個集合對象,所以提供類似集合操作的方法。
3.Spark的性能優化
關於這個網上很多配置參數的建議,讓開發者看的很繚亂,以下是精簡的,可以應用於常規開發中
a.參數級的優化
spark_driver_memory=4g
spark_num_executors=6
spark_executor_memory=4g
spark_executor_cores=1
spark_executor_memory_over_head=1024
spark_sql_shuffle_partitions=18
spark.default.parallelism=18
主要是這六個參數,這七個個參數的說明如下
spark_driver_memory設置driver的內存大小
spark_num_executors設置executors的個數
spark_executor_memory設置每個spark_executor_cores的內存大小
spark_executor_cores設置每個executor的cores數目
spark_executor_memory_over_head設置executor執行的時候,用的內存可能會超過executor-memoy,所以會為executor額外預留一部分內存。該參數代表了這部分內存
spark_sql_shuffle_partitions設置executor的partitions個數,注意這個參數只對SparkSQL有用
spark.default.parallelism設置executor的partitions個數,注意這個參數只對SparkRDD有用
對於這七個參數,需要充分理解Spark執行的邏輯才能明白並合適的配置,Spark的執行邏輯如下(這里不再細講),可以參照這邊博客https://www.cnblogs.com/cxxjohnson/p/8909578.html 或官方API
其中關於內存的配置要結合hadoop yarn的集群的資源情況而定,不是越大越好。而對於spark_num_executors
,spark_executor_cores
,spark_sql_shuffle_partitions
這三個參數,根據實際的經驗需滿足spark_sql_shuffle_partitions=spark_num_executorsspark_executor_cores3,而spark_executor_cores一般保持在1
再提交任務時:
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors $spark_num_executors \
--driver-memory $spark_driver_memory \
--executor-memory $spark_executor_memory \
--executor-cores $spark_executor_cores \
--queue yarn_queue_test \
--conf spark.app.name=spark_name_test \
--conf spark.yarn.executor.memoryOverhead=$spark_executor_memory_over_head \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.dynamicAllocation.enabled=false \
--jars test.jar
b.Task數據分布的優化
在一般情況下Task數據分配是隨機默認的,這樣會帶來一個問題,如果多大的Task,而只是部分的Task數據處理量大,大部分很小,那么如果能做到將小部分的Task數據處理量優化到和大部分的大致相等,那么性能自然就提升上去了。這樣優化分為兩步:
a.在執行的Java代碼中獲取num_executors參數的值,上面的例子是spark_num_executors=6
int rddPartition = Integer.parseInt(parameterParse.getNum_executors()) * 3;
b.不管是rdd的遍歷還是直接的session.sql("sql").foreachPartition()
在遍歷之前加上一個方法repartition(partition)
session.sql(sqlStr).repartition(partition).foreachPartition(iterator -> {
while (iterator.hasNext()) {
Row row = iterator.next();
//邏輯處理
}
});
這樣做后,在任務的管理頁面看到的executor數據分布式非常均勻的,從而提高性能
c.分而治之
分而治之是貫穿整個大數據計算的核心,不管是MapReduce,Spark,Flink等等,而這里要說的分而治之可以初略的物理流程上的分而治之,而不是對Spark的driver,executor,Task分而治之,因為本身就是分布式的分而治之。假設經過反復的性能壓力測試,得出Spark在現有規定資源上只有1000000條/s的性能,而現在的數據有一億條。現在不做任何處理提交session.sql("sql").foreachPartition()
或rdd.foreachPartition(),雖然最終會處理完,但發現時間是比預定的100000000/1000000s多得多,這樣會拖累整體性能,這個時候是可以對現有的一億條數據做以1000000條為組的組合切割分配成100000000/1000000個集合,對集合數據依次執行,這樣性能上會有所提升。當然這種優化方式還是需要跟實際業務邏輯來定