使用JdbcRDD方法:
jdbcRDD定義:
- * An RDD that executes an SQL query on a JDBC connection and reads results.
- * For usage example, see test case JdbcRDDSuite.
- *
- * @param getConnection a function that returns an open Connection.
- * The RDD takes care of closing the connection.
- * @param sql the text of the query.
- * The query must contain two ? placeholders for parameters used to partition the results.
- * E.g. "select title, author from books where ? <= id and id <= ?"
- * @param lowerBound the minimum value of the first placeholder
- * @param upperBound the maximum value of the second placeholder
- * The lower and upper bounds are inclusive.
- * @param numPartitions the number of partitions.
- * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
- * the query would be executed twice, once with (1, 10) and once with (11, 20)
- * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
- * This should only call getInt, getString, etc; the RDD takes care of calling next.
- * The default maps a ResultSet to an array of Object.
- */
- class JdbcRDD[T: ClassTag](
- sc: SparkContext,
- getConnection: () => Connection,
- sql: String,
- lowerBound: Long,
- upperBound: Long,
- numPartitions: Int,
- mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
注意第二個參數,最后一個參數是一個函數,下面是跑通的代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.rdd.JdbcRDD
object startScala {
val driver_str = "oracle.jdbc.driver.OracleDriver"
//oracle數據庫IP(221.181.117.106),端口號(1521),數據庫實例名(orcl)
val conn_str = "jdbc:oracle:thin:@221.181.117.106:1521:orcl"
val user_str = "channel" //連接數據庫用戶名
val pwd_str = "channel" //連接數據庫密碼
//連接ORACLE數據庫函數
def createConnection() = {
Class.forName(driver_str).newInstance()
DriverManager.getConnection(conn_str,user_str,pwd_str)
}
//處理結果集函數
def extractValues(r: ResultSet) = {
(r.getString(1), r.getString(2))
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("CONN_ORACLE_TEST")
.setMaster("local") //local是本地跑,如是集群環境就寫環境IP
val sc = new SparkContext(conf)
val data = new JdbcRDD(sc,createConnection,"select * from tbl_address", lowerBound = 1, upperBound = 2, numPartitions = 1,mapRow = extractValues )
println(data.collect().toList)
sc.stop()
}
}
注意可能出現沒有加驅動會報如下錯:
java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver異常的解決辦法
注意要在程序里面添加ojdbc6.jar,在IDEA里面方法如下: