Spark SQL External Data Sources JDBC簡易實現


在spark1.2版本中最令我期待的功能是External Data Sources,通過該API可以直接將External Data Sources注冊成一個臨時表,該表可以和已經存在的表等通過sql進行查詢操作。External Data Sources API代碼存放於org.apache.spark.sql包中。


具體的分析可參見OopsOutOfMemory的兩篇精彩博文:

http://blog.csdn.net/oopsoom/article/details/42061077

http://blog.csdn.net/oopsoom/article/details/42064075

 

自己嘗試實現了一個簡易的讀取關系型數據庫的外部數據源,代碼參見:https://github.com/luogankun/spark-jdbc

支持MySQL/Oracle/DB2,以及幾種簡單的數據類型,暫時還不支持PrunedScan、PrunedFilteredScan,僅支持TableScan,后續在接着完善。

 

使用步驟:

1、編譯spark-jdbc代碼

sbt package

2、添加jar包到spark-env.sh

export SPARK_CLASSPATH=/home/spark/software/source/spark_package/spark-jdbc/target/scala-2.10/spark-jdbc_2.10-0.1.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/ojdbc6.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/db2jcc4.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/mysql-connector-java-3.0.10.jar:$SPARK_CLASSPATH

3、SQL使用:啟動spark-sql

參數說明:

url :關系型數據庫url

user :關系型數據庫用戶名

password: 關系型數據庫密碼

sql:關系型數據庫sql查詢語句

 

MySQL: 

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
url    'jdbc:mysql://hadoop000:3306/hive',
user    'root',
password    'root',
sql 'select TBL_ID,TBL_NAME,TBL_TYPE FROM TBLS WHERE TBL_ID < 100'
);

SELECT * FROM jdbc_table;

 

Oracle:

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
url    'jdbc:oracle:thin:@hadoop000:1521/ora11g',
user    'coc',
password    'coc',
sql 'select HISTORY_ID, APPROVE_ROLE_ID, APPROVE_OPINION from CI_APPROVE_HISTORY'
);

SELECT * FROM jdbc_table;

 

DB2:

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
url    'jdbc:db2://hadoop000:60000/CI',
user    'ci',
password    'ci',
sql 'select LABEL_ID from coc.CI_APPROVE_STATUS'
);

SELECT * FROM jdbc_table;

 

在測試過程中遇到的問題:

如上的代碼在連接MySQL數據庫操作時沒有問題,但是在操作Oracle或者DB2數據庫時,報錯如下:

09:56:48,302 [Executor task launch worker-0] ERROR Logging$class : Error in TaskCompletionListener
java.lang.AbstractMethodError: oracle.jdbc.driver.OracleResultSetImpl.isClosed()Z
    at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:99)
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
    at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
    at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
    at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:85)
    at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:110)
    at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:108)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:108)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
09:56:48,302 [Executor task launch worker-1] ERROR Logging$class : Error in TaskCompletionListener

跟了下JdbcRDD源代碼發現,問題在於:

我在本案例中使用的oracle的驅動是ojdbc14-10.2.0.3.jar,查閱了些資料說是Oracle的實現類沒有該方法;

該issues詳見: https://issues.apache.org/jira/browse/SPARK-5239

解決辦法:

1)升級驅動包;

2)暫時屏蔽掉這兩個isClosed的判斷方法(https://github.com/apache/spark/pull/4033)

 

4、Scala API使用方式

import  com.luogankun.spark.jdbc._
val sqlContext = new HiveContext(sc)
val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root","root","select id, name from city")
cities.collect

 

后續將會繼續完善,現在的實現確實很“丑陋”,湊合着先能使用吧。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM