spark數據源讀取及讀數據原理


一、讀文件

1、textfile讀取不同場景文件

https://blog.csdn.net/legotime/article/details/51871724?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase

 二、讀hbase數據

https://www.bilibili.com/video/av83930526/

https://blog.csdn.net/yuanbingze/article/details/51891222

阿里雲:

https://help.aliyun.com/document_detail/28123.html?spm=5176.11065259.1996646101.searchclickresult.a5af608aantQGJ

hbase官網:

http://hbase.apache.org/book.html#spark

其中hbase-site.xml,只需要少量配置即可,不用完全將hbase集群中的文件拷貝出來

<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://host:9870/hacluster/hbase</value>
        <description>The directory shared byRegionServers.
        </description>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
        <description>The mode the clusterwill be in. Possible values are
            false: standalone and pseudo-distributedsetups with managed Zookeeper
            true: fully-distributed with unmanagedZookeeper Quorum (see hbase-env.sh)
        </description>
    </property>

    <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
        <description>Property fromZooKeeper's config zoo.cfg.
            The port at which the clients willconnect.
        </description>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value></value>
        <description>Comma separated listof servers in the ZooKeeper Quorum.
            For example,"host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
            By default this is set to localhost forlocal and pseudo-distributed modes
            of operation. For a fully-distributedsetup, this should be set to a full
            list of ZooKeeper quorum servers. IfHBASE_MANAGES_ZK is set in hbase-env.sh
            this is the list of servers which we willstart/stop ZooKeeper on.
        </description>
    </property>
</configuration>

三、讀寫MongoDB

val spark = SparkSession.builder()
      .appName("MongoDistribution")
      .config("spark.mongodb.input.uri", "mongodb://host:27017/database.table")
      .getOrCreate()

val frame: DataFrame = MongoSpark.load(spark)
frame.createTempView("table")
val javaDf: DataFrame = spark.sql("SELECT _id,uid,url,language,createDate,updateDate from table where language='Java'")

val javaWriteConfig = WriteConfig(Map("uri" -> "mongodb://host/database.table"))

MongoSpark.save(javaDf, javaWriteConfig)

四、讀寫hive

 

五、讀取es

1、讀取方式:調用esRDD算子。

(1)全量讀取

(2)選擇性讀取

2、讀取性能調優:

(1)合理設置es是前提。

合理設置es分片數提升性能:

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/arch.html

http://xx.xx.xx.xx:9200/codefs_parth/_search_shards

線程開多了報data too large

解決方式

https://stackoverflow.com/questions/61870751/circuit-breaking-exception-parent-data-too-large-data-for-http-request

es內存調優

https://www.cnblogs.com/Don/p/11912872.html 

 

(2)合理設置spark相關參數。

二、spark textFile讀取數據原理

在編寫spark測試應用時, 會用到sc.textFile(path, partition)

當配置為spark分布式集群時,當你讀取本地文件作為輸入時, 需要將文件存放在每台work節點上。

這時會有困惑,spark在讀取文件時,是每台worker節點都把文件讀入? 然后在進行分配? 會不會出現重復讀的情況? 文件會分為幾個partition?

轉自知乎:https://www.zhihu.com/question/36996853

 

一·是在執行action的時候再拷貝相應分區到多個worker節點進行並行計算嗎?
不是,這種讀取local file system而不是hdfs的情況,需要同一個文件存在所有的worker node上面,在讀取的時候每個worker node的task會去讀取本文件的一部分。打個比方,比如你有一個file,有一個spark集群(node1是master,node2,node3兩個是slaves),那么這個file需要在node2,node3上面都存在,這兩個節點的task會各讀一半,不然會出錯。(這里其實還有一個點注意,你的spark app所運行的節點也需要有這個file,因為需要用到file進行Partition划分)。

二·具體對應哪一段源碼。
1.由讀取文件的方法SparkContext.textFile(path)跟蹤源碼知道它利用了TextInputFormat生成了一個HadoopRDD.
def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }


def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

2.再來分析HadoopRDD,對於你的疑問來說最重要的是getPartitions方法,也就是如何划分你輸入的文件成為Partitions:

override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    val inputFormat = getInputFormat(jobConf)
    if (inputFormat.isInstanceOf[Configurable]) {
      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
    }
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    array
  }

其中 val inputSplits = inputFormat.getSplits(jobConf, minPartitions), 是將你的輸入文件划分為多個Split,一個Split對應一個Partition,因為是本地文件系統,通過"file://"前綴可以獲取文件系統,這個源碼我就不帖了,這里minPartitions是2(如果你沒有指定的話),也就是將file划分為2部分,每個Split都有SplitLocationInfo描述該Split在哪個node上如何存儲,比如FileSplit包含了(Hosts,start, len, path),就是在哪個host上面的哪個path,從哪個起點start讀取len這么多數據就是這個Split的內容了。對於本地文件,他的Host直接指定的是 localhost,path就是你傳入的文件路徑,start和len根據2份進行簡單的計算即可,我就不贅述。有了這個信息我們可以構造每個Split的PreferLocation:
override def getPreferredLocations(split: Partition): Seq[String] = {
    val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
    val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
      case Some(c) =>
        try {
          val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
          val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
          Some(HadoopRDD.convertSplitLocationInfo(infos))
        } catch {
          case e: Exception =>
            logDebug("Failed to use InputSplitWithLocations.", e)
            None
        }
      case None => None
    }
    locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
  }

從這段代碼可以看出來,對於localhost的host,是沒有PreferredLocation的,這個會把對應於該partition的task追加到no_prefs的任務隊列中,進行相應data locality的任務調度。

3.任務調度
val taskIdToLocations = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.resultOfJob.get
          partitionsToCompute.map { id =>
            val p = job.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
由於Spark每個partition的運算都是由一個task進行的,那么partition的preferlocation會成為task的preferLocation,這是data locality的任務調度,遵循着移動計算比移動數據更加高效的原則。
那么這樣每個task都有了自己的應該允許的Location,然而對於本地文件系統,這是一個坑爹的存在,因為getPreferredLocs這個方法返回的是Nil,是空的。如果task沒有PreferLocation,那么它如何被調度呢?答案在TaskSetManager里面:
if (tasks(index).preferredLocations == Nil) {
      addTo(pendingTasksWithNoPrefs)
    }
如何沒有preferLocation的話,那么是會把這個任務追加到pendingTasksWithNoPrefs數組里面。
該數組里面的任務是以 Round-Robin的方式分發到各個Executor里面的,到這里已經能說明問題了, 你有一個file,根據FileInputFormat生成了兩個Split,HadoopRDD據此生成了兩個Partition,兩個Partition需要兩個Task,這兩個Task會  Round-Robin 得 spread到你的node2,node3上面的executor上面,這些Task要讀取的Split的文件的host都是localhost,大小就是file的一半,到此,你應該可以理解為什么需要這個file在每個worker node都存在了,因為每個worker node的executor執行的task要讀取的Split的Location信息是localhost,他不會到master上面讀,只會在運行這個task的worker node本地讀。相對應的源碼就是上面的,細節留待你自己去再梳理一遍。

PS:
1.這種使用textFile方法讀取本地文件系統的文件的方法,只能用於debug,不用於其他任何用途,因為他會導致file的replication數與node的個數同步增長。

2.上述描述中的分成2份這種是默認值,為了方面說明,你可以自己設置partition個數。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM