JDBC的API
SparkSQL提供联系数据库的APi有以下三个:
//构建一个DataFrame,通过JDBC的连接属性和URL访问数据库的表table. Dataset<Row> jdbc(String url, String table, java.util.Properties properties) //构建一个DataFrame,通过JDBC的连接属性和URL访问数据库的表table.predicates参数提供了在where的条件语句,每个条件为1个分区 Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties) //构建一个DataFrame,通过JDBC的连接属性和URL访问数据库的表table. Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)
API解析
Dataset<Row> jdbc(String url, String table, java.util.Properties properties)
实例:
SQLContext sqlContext = new SQLContext(javaSparkContext); // 配置连接属性 Properties dbProps = new Properties(); dbProps.put("user","user_name"); dbProps.put("password","password"); dbProps.put("driver","org.postgresql.Driver"); // 连接数据库 获取数据 要使用自己的数据库连接串 DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://ip/database_name", "table_name", dbProps); Partition[] partitions = dataFrame.rdd().getPartitions(); System.out.println(partitions.length); List<Row> collect = dataFrame.toJavaRDD().collect(); System.out.println(collect.size());
测试结果:
-------------------------
1
32444 由于这里没添加限制条件,所以这里是全表的数据量的大小
Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
SQLContext sqlContext = new SQLContext(javaSparkContext); // 配置连接属性 Properties dbProps = new Properties(); dbProps.put("user","postgres"); dbProps.put("password","postgres"); dbProps.put("driver","org.postgresql.Driver"); //where的查询条件 String[] strings = {"id >=26602657","id<=26610000"}; // 连接数据库 获取数据 要使用自己的数据库连接串 DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics",strings, dbProps); Partition[] partitions = dataFrame.repartition(100).rdd().getPartitions(); System.out.println(partitions.length); List<Row> collect = dataFrame.toJavaRDD().collect(); System.out.println(collect.size()); 测试结果: ------------------------- 2 这里是根据where的查询条件对应起多少个连接分区 37321 这里的数据量大于全表的数据量,因为这个两个查询条件的并集union all
Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)
SQLContext sqlContext = new SQLContext(javaSparkContext); // 配置连接属性 Properties dbProps = new Properties(); dbProps.put("user","postgres"); dbProps.put("password","postgres"); dbProps.put("driver","org.postgresql.Driver"); // 连接数据库 获取数据 要使用自己的数据库连接串 //columnName "id"是用于分区的列,该列必须是数字列 //lowerBound 分区列的最小值 必须为整数 //upperBound 分区列的最大值 必须为整数 //分区数量,必须为整数,当为0或负整数时,实际的分区数为1;并不一定是最终的分区数量,例如“upperBound - lowerBound< numPartitions”时,实际的分区数量是“upperBound - lowerBound”; DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics","id",0,1500,105, dbProps); Partition[] partitions = dataFrame.rdd().getPartitions(); System.out.println(partitions.length); List<Row> collect = dataFrame.toJavaRDD().collect(); System.out.println(collect.size());
测试结果:
-------------------------
105 32444 这里的数据量也是全表的数据量,上述不会进行条件过滤