SparkSql自定義數據源之讀取的實現


一.sparksql讀取數據源的過程

1.spark目前支持讀取jdbc,hive,text,orc等類型的數據,如果要想支持hbase或者其他數據源,就必須自定義

 2.讀取過程

(1)sparksql進行 session.read.text()或者 session.read .format("text") .options(Map("a"->"b")).load("")

 

 

 

 

read.方法:創建DataFrameReader對象

format方法:賦值DataFrameReade數據源類型

options方法:賦值DataFrameReade額外的配置選項

 進入 session.read.text()方法內,可以看到format為“text”

(2)進入load方法

 load原來是:sparkSession.baseRelationToDataFrame這個方法最終創建dataframe

(3進入DataSource的resolveRelation()方法

 

 此段就是:providingClass這個類是哪一個接口的實現類,分為有shema與沒有傳入schema的兩種

(3)providingClass是format傳入的數據源類型,也就是前面的source

 

 

 spark提供的所有數據源的map

 4.得出結論只要寫一個類,實現RelationProvider下面這個方法,在方法里面返回一個baserelation

def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

 我們在實現baserelation里面的邏輯就可以了

 5.看看spark讀取jdbc類

需要一個類,實現xxxScan這中類,這種類有三種,全局掃描tableScan,PrunedFilteredScan(列裁剪與謂詞下推),PrunedScan ,

實現buildscan方法返回row類型rdd,結合baserelation有shcame這個變量 ,就湊成了dataframe

 6.jdbcRdd.scanTable方法,得到RDD

7.查看jdbcRDD的compute方法,是通過jdbc查詢sql的方式獲取數據

RDD的計算是惰性的,一系列轉換操作只有在遇到動作操作是才會去計算數據,而分區作為數據計算的基本單位。在計算鏈中,無論一個RDD有多么復雜,其最終都會調用內部的compute函數來計算一個分區的數據。

override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = {
    var closed = false
    var rs: ResultSet = null
    var stmt: PreparedStatement = null
    var conn: Connection = null

    def close() {
      if (closed) return
      try {
        if (null != rs) {
          rs.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing resultset", e)
      }
      try {
        if (null != stmt) {
          stmt.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing statement", e)
      }
      try {
        if (null != conn) {
          if (!conn.isClosed && !conn.getAutoCommit) {
            try {
              conn.commit()
            } catch {
              case NonFatal(e) => logWarning("Exception committing transaction", e)
            }
          }
          conn.close()
        }
        logInfo("closed connection")
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
      closed = true
    }

    context.addTaskCompletionListener{ context => close() }

    val inputMetrics = context.taskMetrics().inputMetrics
    val part = thePart.asInstanceOf[JDBCPartition]
    conn = getConnection()
    val dialect = JdbcDialects.get(url)
    import scala.collection.JavaConverters._
    dialect.beforeFetch(conn, options.asProperties.asScala.toMap)

    // H2's JDBC driver does not support the setSchema() method.  We pass a
    // fully-qualified table name in the SELECT statement.  I don't know how to
    // talk about a table in a completely portable way.

//坐上每個分區的Filter條件
    val myWhereClause = getWhereClause(part)

  //最終查詢sql語句
    val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
//jdbc查詢
    stmt = conn.prepareStatement(sqlText,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    stmt.setFetchSize(options.fetchSize)
    rs = stmt.executeQuery()

    val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
//返回迭代器
    CompletionIterator[InternalRow, Iterator[InternalRow]](
      new InterruptibleIterator(context, rowsIterator), close())
  }

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM