當數據增加,我們又無法無限制的增加硬件,我們就要利用RDD的partition。將獲取一個大表的任務拆分成多個任務,一個一個來執行,每個任務只獲取一小部分數據,這樣通過多個連接同時去取數據,速度反而更快。
我的配置目前是 master 1 8g,slave 3 8g
Dataset<Row> dataset = spark.read().format("jdbc") .option("url", JDBCUtil.getJdbcUrl(datasourceModel)) .option("dbtable", tableName) .option("user", datasourceModel.getUserName()) .option("password", datasourceModel.getPassword()) .option("partitionColumn", "ID") .option("lowerBound", 10000) .option("upperBound", 100000000) .option("numPartitions", 10000) .load();
參數具體意義:
partitionColumn, lowerBound, upperBound |
These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading. |
numPartitions |
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing. |
partitionColumn:根據哪個字段分區,必須是數字類型,int是可以的,一般用id
lowerBound:分區下界,假如是10000,那么10000條數據之前都是在一個任務執行
upperBound:分區上屆,lowerBound和upperBound的數據會被拆分,而邊界外圍的會單獨作為分區
numPartitions:分區邊界之間的數據要分多少分區。
至於到底分了多少塊,邊界之外的數據怎么分的塊,沒必要糾結,只要知道,數據肯定是全部取回來了。
另外只需要部分數據的,可以按照sql的方式:
.option("dbtable", "test_table")
可以改寫成:
.option("dbtable", "(select * from test_table where dt >= '2017-05-01') as T")
參考:
http://spark.apache.org/docs/latest/configuration.html