JDBC的API
SparkSQL提供聯系數據庫的APi有以下三個:
//構建一個DataFrame,通過JDBC的連接屬性和URL訪問數據庫的表table. Dataset<Row> jdbc(String url, String table, java.util.Properties properties) //構建一個DataFrame,通過JDBC的連接屬性和URL訪問數據庫的表table.predicates參數提供了在where的條件語句,每個條件為1個分區 Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties) //構建一個DataFrame,通過JDBC的連接屬性和URL訪問數據庫的表table. Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)
API解析
Dataset<Row> jdbc(String url, String table, java.util.Properties properties)
實例:
SQLContext sqlContext = new SQLContext(javaSparkContext); // 配置連接屬性 Properties dbProps = new Properties(); dbProps.put("user","user_name"); dbProps.put("password","password"); dbProps.put("driver","org.postgresql.Driver"); // 連接數據庫 獲取數據 要使用自己的數據庫連接串 DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://ip/database_name", "table_name", dbProps); Partition[] partitions = dataFrame.rdd().getPartitions(); System.out.println(partitions.length); List<Row> collect = dataFrame.toJavaRDD().collect(); System.out.println(collect.size());
測試結果:
-------------------------
1
32444 由於這里沒添加限制條件,所以這里是全表的數據量的大小
Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
SQLContext sqlContext = new SQLContext(javaSparkContext); // 配置連接屬性 Properties dbProps = new Properties(); dbProps.put("user","postgres"); dbProps.put("password","postgres"); dbProps.put("driver","org.postgresql.Driver"); //where的查詢條件 String[] strings = {"id >=26602657","id<=26610000"}; // 連接數據庫 獲取數據 要使用自己的數據庫連接串 DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics",strings, dbProps); Partition[] partitions = dataFrame.repartition(100).rdd().getPartitions(); System.out.println(partitions.length); List<Row> collect = dataFrame.toJavaRDD().collect(); System.out.println(collect.size()); 測試結果: ------------------------- 2 這里是根據where的查詢條件對應起多少個連接分區 37321 這里的數據量大於全表的數據量,因為這個兩個查詢條件的並集union all
Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)
SQLContext sqlContext = new SQLContext(javaSparkContext); // 配置連接屬性 Properties dbProps = new Properties(); dbProps.put("user","postgres"); dbProps.put("password","postgres"); dbProps.put("driver","org.postgresql.Driver"); // 連接數據庫 獲取數據 要使用自己的數據庫連接串 //columnName "id"是用於分區的列,該列必須是數字列 //lowerBound 分區列的最小值 必須為整數 //upperBound 分區列的最大值 必須為整數 //分區數量,必須為整數,當為0或負整數時,實際的分區數為1;並不一定是最終的分區數量,例如“upperBound - lowerBound< numPartitions”時,實際的分區數量是“upperBound - lowerBound”; DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics","id",0,1500,105, dbProps); Partition[] partitions = dataFrame.rdd().getPartitions(); System.out.println(partitions.length); List<Row> collect = dataFrame.toJavaRDD().collect(); System.out.println(collect.size());
測試結果:
-------------------------
105 32444 這里的數據量也是全表的數據量,上述不會進行條件過濾