scala使用jdbc連接oracle數據庫


使用JdbcRDD方法:

jdbcRDD定義:

  1.  * An RDD that executes an SQL query on a JDBC connection and reads results.  
  2.  * For usage example, see test case JdbcRDDSuite.  
  3.  *  
  4.  * @param getConnection a function that returns an open Connection.  
  5.  *   The RDD takes care of closing the connection.  
  6.  * @param sql the text of the query.  
  7.  *   The query must contain two ? placeholders for parameters used to partition the results.  
  8.  *   E.g. "select title, author from books where ? <= id and id <= ?"  
  9.  * @param lowerBound the minimum value of the first placeholder  
  10.  * @param upperBound the maximum value of the second placeholder  
  11.  *   The lower and upper bounds are inclusive.  
  12.  * @param numPartitions the number of partitions.  
  13.  *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,  
  14.  *   the query would be executed twice, once with (1, 10) and once with (11, 20)  
  15.  * @param mapRow a function from a ResultSet to a single row of the desired result type(s).  
  16.  *   This should only call getInt, getString, etc; the RDD takes care of calling next.  
  17.  *   The default maps a ResultSet to an array of Object.  
  18.  */  
  19. class JdbcRDD[T: ClassTag](  
  20.     sc: SparkContext,  
  21.     getConnection: () => Connection,  
  22.     sql: String,  
  23.     lowerBound: Long,  
  24.     upperBound: Long,  
  25.     numPartitions: Int,  
  26.     mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)  

注意第二個參數,最后一個參數是一個函數,下面是跑通的代碼:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.rdd.JdbcRDD
object startScala {
val driver_str = "oracle.jdbc.driver.OracleDriver"
//oracle數據庫IP(221.181.117.106),端口號(1521),數據庫實例名(orcl)
val conn_str = "jdbc:oracle:thin:@221.181.117.106:1521:orcl"
val user_str = "channel" //連接數據庫用戶名
val pwd_str = "channel" //連接數據庫密碼
//連接ORACLE數據庫函數
def createConnection() = {
Class.forName(driver_str).newInstance()
DriverManager.getConnection(conn_str,user_str,pwd_str)
}
//處理結果集函數
def extractValues(r: ResultSet) = {
(r.getString(1), r.getString(2))
}
def main(args: Array[String]): Unit = {
   val conf = new SparkConf()
        .setAppName("CONN_ORACLE_TEST")
        .setMaster("local") //local是本地跑,如是集群環境就寫環境IP
val sc = new SparkContext(conf)
val data = new JdbcRDD(sc,createConnection,"select * from tbl_address", lowerBound = 1, upperBound = 2, numPartitions = 1,mapRow = extractValues )
println(data.collect().toList)
    sc.stop()
}
}

注意可能出現沒有加驅動會報如下錯:

java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver異常的解決辦法

注意要在程序里面添加ojdbc6.jar,在IDEA里面方法如下:

 



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM