SparkSQL之读取数据库的并行度分析


JDBC的API

SparkSQL提供联系数据库的APi有以下三个:

//构建一个DataFrame,通过JDBC的连接属性和URL访问数据库的表table.
Dataset<Row> jdbc(String url, String table, java.util.Properties properties)
//构建一个DataFrame,通过JDBC的连接属性和URL访问数据库的表table.predicates参数提供了在where的条件语句,每个条件为1个分区
Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
//构建一个DataFrame,通过JDBC的连接属性和URL访问数据库的表table.
Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)

API解析

Dataset<Row> jdbc(String url, String table, java.util.Properties properties)

 实例:

SQLContext sqlContext = new SQLContext(javaSparkContext);
// 配置连接属性
Properties dbProps = new Properties();
dbProps.put("user","user_name");
dbProps.put("password","password");
dbProps.put("driver","org.postgresql.Driver");
// 连接数据库 获取数据 要使用自己的数据库连接串
DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://ip/database_name", "table_name", dbProps);
Partition[] partitions = dataFrame.rdd().getPartitions();
System.out.println(partitions.length);
List<Row> collect = dataFrame.toJavaRDD().collect();
System.out.println(collect.size());

测试结果:
-------------------------
1
32444 由于这里没添加限制条件,所以这里是全表的数据量的大小

Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)

SQLContext sqlContext = new SQLContext(javaSparkContext);
// 配置连接属性
Properties dbProps = new Properties();
dbProps.put("user","postgres");
dbProps.put("password","postgres");
dbProps.put("driver","org.postgresql.Driver");
//where的查询条件
String[] strings = {"id >=26602657","id<=26610000"};
// 连接数据库 获取数据 要使用自己的数据库连接串
DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics",strings, dbProps);
Partition[] partitions = dataFrame.repartition(100).rdd().getPartitions();
System.out.println(partitions.length);
List<Row> collect = dataFrame.toJavaRDD().collect();
System.out.println(collect.size());

测试结果:
-------------------------
2      这里是根据where的查询条件对应起多少个连接分区
37321  这里的数据量大于全表的数据量,因为这个两个查询条件的并集union all

Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)

SQLContext sqlContext = new SQLContext(javaSparkContext);
// 配置连接属性
Properties dbProps = new Properties();
dbProps.put("user","postgres");
dbProps.put("password","postgres");
dbProps.put("driver","org.postgresql.Driver");
// 连接数据库 获取数据 要使用自己的数据库连接串  
//columnName "id"是用于分区的列,该列必须是数字列
//lowerBound 分区列的最小值 必须为整数
//upperBound 分区列的最大值 必须为整数
//分区数量,必须为整数,当为0或负整数时,实际的分区数为1;并不一定是最终的分区数量,例如“upperBound - lowerBound< numPartitions”时,实际的分区数量是“upperBound - lowerBound”;
DataFrame dataFrame = sqlContext.read().jdbc("jdbc:postgresql://192.168.16.100/gx_passenger", "nil_subscriber_realtime_statistics","id",0,1500,105, dbProps);
Partition[] partitions = dataFrame.rdd().getPartitions();
System.out.println(partitions.length);
List<Row> collect = dataFrame.toJavaRDD().collect();
System.out.println(collect.size());

测试结果:
-------------------------
105 32444 这里的数据量也是全表的数据量,上述不会进行条件过滤

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM