SparkSQL之讀取數據庫的並行度分析


JDBC的API

SparkSQL提供聯系數據庫的APi有以下三個:

//構建一個DataFrame,通過JDBC的連接屬性和URL訪問數據庫的表table.
Dataset<Row> jdbc(String url, String table, java.util.Properties properties)
//構建一個DataFrame,通過JDBC的連接屬性和URL訪問數據庫的表table.predicates參數提供了在where的條件語句,每個條件為1個分區
Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
//構建一個DataFrame,通過JDBC的連接屬性和URL訪問數據庫的表table.
Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)

API解析

Dataset<Row> jdbc(String url, String table, java.util.Properties properties)

 實例:

SQLContext sqlContext = new SQLContext(javaSparkContext);
// 配置連接屬性
Properties dbProps = new Properties();
dbProps.put("user","user_name");
dbProps.put("password","password");
dbProps.put("driver","org.postgresql.Driver");
// 連接數據庫 獲取數據 要使用自己的數據庫連接串
DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://ip/database_name", "table_name", dbProps);
Partition[] partitions = dataFrame.rdd().getPartitions();
System.out.println(partitions.length);
List<Row> collect = dataFrame.toJavaRDD().collect();
System.out.println(collect.size());

測試結果:
-------------------------
1
32444 由於這里沒添加限制條件,所以這里是全表的數據量的大小

Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)

SQLContext sqlContext = new SQLContext(javaSparkContext);
// 配置連接屬性
Properties dbProps = new Properties();
dbProps.put("user","postgres");
dbProps.put("password","postgres");
dbProps.put("driver","org.postgresql.Driver");
//where的查詢條件
String[] strings = {"id >=26602657","id<=26610000"};
// 連接數據庫 獲取數據 要使用自己的數據庫連接串
DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics",strings, dbProps);
Partition[] partitions = dataFrame.repartition(100).rdd().getPartitions();
System.out.println(partitions.length);
List<Row> collect = dataFrame.toJavaRDD().collect();
System.out.println(collect.size());

測試結果:
-------------------------
2      這里是根據where的查詢條件對應起多少個連接分區
37321  這里的數據量大於全表的數據量,因為這個兩個查詢條件的並集union all

Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)

SQLContext sqlContext = new SQLContext(javaSparkContext);
// 配置連接屬性
Properties dbProps = new Properties();
dbProps.put("user","postgres");
dbProps.put("password","postgres");
dbProps.put("driver","org.postgresql.Driver");
// 連接數據庫 獲取數據 要使用自己的數據庫連接串  
//columnName "id"是用於分區的列,該列必須是數字列
//lowerBound 分區列的最小值 必須為整數
//upperBound 分區列的最大值 必須為整數
//分區數量,必須為整數,當為0或負整數時,實際的分區數為1;並不一定是最終的分區數量,例如“upperBound - lowerBound< numPartitions”時,實際的分區數量是“upperBound - lowerBound”;
DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics","id",0,1500,105, dbProps);
Partition[] partitions = dataFrame.rdd().getPartitions();
System.out.println(partitions.length);
List<Row> collect = dataFrame.toJavaRDD().collect();
System.out.println(collect.size());

測試結果:
-------------------------
105 32444 這里的數據量也是全表的數據量,上述不會進行條件過濾

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM