一.sparksql讀取數據源的過程
1.spark目前支持讀取jdbc,hive,text,orc等類型的數據,如果要想支持hbase或者其他數據源,就必須自定義
2.讀取過程
(1)sparksql進行 session.read.text()或者 session.read .format("text") .options(Map("a"->"b")).load("")
read.方法:創建DataFrameReader對象
format方法:賦值DataFrameReade數據源類型
options方法:賦值DataFrameReade額外的配置選項
進入 session.read.text()方法內,可以看到format為“text”
(2)進入load方法
load原來是:sparkSession.baseRelationToDataFrame這個方法最終創建dataframe
(3)進入DataSource的resolveRelation()方法
此段就是:providingClass這個類是哪一個接口的實現類,分為有shema與沒有傳入schema的兩種
(3)providingClass是format傳入的數據源類型,也就是前面的source
spark提供的所有數據源的map
4.得出結論只要寫一個類,實現RelationProvider下面這個方法,在方法里面返回一個baserelation
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
我們在實現baserelation里面的邏輯就可以了
5.看看spark讀取jdbc類
需要一個類,實現xxxScan這中類,這種類有三種,全局掃描tableScan,PrunedFilteredScan(列裁剪與謂詞下推),PrunedScan ,
實現buildscan方法返回row類型rdd,結合baserelation有shcame這個變量 ,就湊成了dataframe
6.jdbcRdd.scanTable方法,得到RDD
7.查看jdbcRDD的compute方法,是通過jdbc查詢sql的方式獲取數據
RDD的計算是惰性的,一系列轉換操作只有在遇到動作操作是才會去計算數據,而分區作為數據計算的基本單位。在計算鏈中,無論一個RDD有多么復雜,其最終都會調用內部的compute函數來計算一個分區的數據。
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = { var closed = false var rs: ResultSet = null var stmt: PreparedStatement = null var conn: Connection = null def close() { if (closed) return try { if (null != rs) { rs.close() } } catch { case e: Exception => logWarning("Exception closing resultset", e) } try { if (null != stmt) { stmt.close() } } catch { case e: Exception => logWarning("Exception closing statement", e) } try { if (null != conn) { if (!conn.isClosed && !conn.getAutoCommit) { try { conn.commit() } catch { case NonFatal(e) => logWarning("Exception committing transaction", e) } } conn.close() } logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } closed = true } context.addTaskCompletionListener{ context => close() } val inputMetrics = context.taskMetrics().inputMetrics val part = thePart.asInstanceOf[JDBCPartition] conn = getConnection() val dialect = JdbcDialects.get(url) import scala.collection.JavaConverters._ dialect.beforeFetch(conn, options.asProperties.asScala.toMap) // H2's JDBC driver does not support the setSchema() method. We pass a // fully-qualified table name in the SELECT statement. I don't know how to // talk about a table in a completely portable way. //坐上每個分區的Filter條件 val myWhereClause = getWhereClause(part) //最終查詢sql語句 val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause" //jdbc查詢 stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setFetchSize(options.fetchSize) rs = stmt.executeQuery() val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics) //返回迭代器 CompletionIterator[InternalRow, Iterator[InternalRow]]( new InterruptibleIterator(context, rowsIterator), close()) }