sparksql系列(四) sparksql 操作數據庫


一:SparkSql操作mysql

老規矩:先抽出來公共的方法:

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList


object WordCount {

  def dataAndJdbcoption() = {  

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD1 = javasc.parallelize(Arrays.asList("{'id':'7'}","{'id':'8'}","{'id':'9'}"));
    val nameRDD1df = sparkSession.read.json(nameRDD1)

    val prop = new java.util.Properties
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    prop.setProperty("dbtable","blog")
    prop.setProperty("url","jdbc:mysql://127.0.0.1:3306/test")

    (nameRDD1df,prop)

  }

}

讀mysql

    val df = dataAndJdbcoption()._1
    val prop = dataAndJdbcoption()._2

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val data = sparkSession.read.format("jdbc").option("user","root").option("password","123456")
      .option("driver","com.mysql.jdbc.Driver")
.      option("url","jdbc:mysql://127.0.0.1:3306/test").option("dbtable", "blog")
      .load()
    data.show(100)

寫mysql

  val df = dataAndJdbcoption()._1
  val prop = dataAndJdbcoption()._2
  df.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), prop.getProperty("dbtable"), prop)

二:SparkSql操作Hive

公司讀Hive數據

                     其實是讀Hive表的location的文件,生成最終的文件。

公司寫Hive數據

                     生成文件后將數據load進Hive

直接使用Sql操作Hive的數據       

    val conf = new SparkConf().setAppName("WordCount")
    //合並小文件,sparksql默認有200個task執行文件,會生成很多小文件。其實有很多參數可以優化詳見sparkSession.sql("SET -v")

    conf.set("mapreduce.input.fileinputformat.split.minsize","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.minsize.per.node","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize.per.node","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize.per.rack","1024000000")
    val sparkSession= SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()

      sparkSession.sql("insert into table table1 select aa from sparksqlTempTable")

 

    除了上述方法可以合並文件之外,還有一種方法可以合並文件:

    val dataFrame = sparkSession.sql("select aa from table ").coalesce(3);//日志看task數量3
    dataFrame.createOrReplaceTempView("sparksqlTempTable")
    sparkSession.sql("insert into table table1 select aa from sparksqlTempTable")

 

    但是這種方法並不實用,因為大部分操作的Sql操作是需要insert。

 

    網上說還有第三種方法:

    即在Sql中插入一個REPARTITION(4),但是我在實驗過程中並沒有作用,可能這種語法只是針對於HiveSql本身,使用SparkSql並沒有作用。

    栗子:select /*+ REPARTITION(4) */ aa from table 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM