關於sparksql操作hive,讀取本地csv文件並以parquet的形式裝入hive中


說明:spark版本:2.2.0

    hive版本:1.2.1

需求: 有本地csv格式的一個文件,格式為${當天日期}visit.txt,例如20180707visit.txt,現在需要將其通過spark-sql程序實現將該文件讀取並以parquet的格式通過外部表的形式保存到hive中,最終要實現通過傳參的形式,將該日期區間內的csv文件批量加載進去,方式有兩種:

  1、之傳入一個參數,說明只加載一天的數據進去

  2、傳入兩個參數,批量加載這兩個日期區間的每一天的數據

  最終打成jar包,進行運行

步驟如下:

  1、初始化配置,先創建sparkSession(spark2.0版本開始將sqlContext、hiveContext同意整合為sparkSession)

//初始化配置
  val spark = new sql.SparkSession
   .Builder()
    .enableHiveSupport()  //操作hive這一步千萬不能少
    .appName("project_1")
    .master("local[2]")
    .getOrCreate()

  2、先將文件讀進來,並轉換為DF 

val data = spark.read.option("inferSchema", "true").option("header", "false") //這里設置是否處理頭信息,false代表不處理,也就是說文件的第一行也會被加載進來,如果設置為true,那么加載進來的數據中不包含第一行,第一行被當作了頭信息,也就是表中的字段名處理了
        .csv(s"file:///home/spark/file/project/${i}visit.txt")  //這里設置讀取的文件,${i}是我引用的一個變量,如果要在雙引號之間引用變量的話,括號前面的那個s不能少
        .toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time") //將讀進來的數據轉換為DF,並為每個字段設置字段名

  3、將轉換后的DF注冊為一張臨時表

data.createTempView(s"table_${i}")

  4、通過spark-sql創建hive外部表,這里有坑

spark.sql(
        s"""
           |create external table if not exists ${i}visit
           |(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,
           |region string, screen string, stay_time int) stored as parquet
           |location 'hdfs://master:9000/project_dest/${i}'
         """.stripMargin)

  這里的見表語句需要特別注意,如果寫成如下的方式是錯誤的:

spark.sql(
        s"""
           |create external table if not exists ${i}visit
           |(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,
           |region string, screen string, stay_time int) row format delimited fields terminated by '\t' stored as parquet
           |location /project_dest/${i}'
         """.stripMargin)

    (1)對於row format delimited fields terminated by '\t'這語句只支持存儲文件格式為textFile,對於parquet文件格式不支持

    (2)對於location這里,一定要寫hdfs的全路徑,如果向上面這樣寫,系統不認識,切記

  5、通過spark-sql執行insert語句,將數據插入到hive表中

spark.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)

  至此,即完成了將本地數據以parquet的形式加載至hive表中了,接下來既可以到hive表中進行查看數據是否成功載入

貼一下完整代碼:

package _sql.project_1

import org.apache.spark.sql

/**
  * Author Mr. Guo
  * Create 2018/9/4 - 9:04
  * ┌───┐   ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┐
  * │Esc│   │ F1│ F2│ F3│ F4│ │ F5│ F6│ F7│ F8│ │ F9│F10│F11│F12│ │P/S│S L│P/B│  ┌┐    ┌┐    ┌┐
  * └───┘   └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┘  └┘    └┘    └┘
  * ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───────┐ ┌───┬───┬───┐ ┌───┬───┬───┬───┐
  * │~ `│! 1│@ 2│# 3│$ 4│% 5│^ 6│& 7│* 8│( 9│) 0│_ -│+ =│ BacSp │ │Ins│Hom│PUp│ │N L│ / │ * │ - │
  * ├───┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─────┤ ├───┼───┼───┤ ├───┼───┼───┼───┤
  * │ Tab │ Q │ W │ E │ R │ T │ Y │ U │ I │ O │ P │{ [│} ]│ | \ │ │Del│End│PDn│ │ 7 │ 8 │ 9 │   │
  * ├─────┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴─────┤ └───┴───┴───┘ ├───┼───┼───┤ + │
  * │ Caps │ A │ S │ D │ F │ G │ H │ J │ K │ L │: ;│" '│ Enter  │               │ 4 │ 5 │ 6 │   │
  * ├──────┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴────────┤     ┌───┐     ├───┼───┼───┼───┤
  * │ Shift  │ Z │ X │ C │ V │ B │ N │ M │< ,│> .│? /│  Shift   │     │ ↑ │     │ 1 │ 2 │ 3 │   │
  * ├─────┬──┴─┬─┴──┬┴───┴───┴───┴───┴───┴──┬┴───┼───┴┬────┬────┤ ┌───┼───┼───┐ ├───┴───┼───┤ E││
  * │ Ctrl│    │Alt │         Space         │ Alt│    │    │Ctrl│ │ ← │ ↓ │ → │ │   0   │ . │←─┘│
  * └─────┴────┴────┴───────────────────────┴────┴────┴────┴────┘ └───┴───┴───┘ └───────┴───┴───┘
  **/

object Spark_Sql_Load_Data_To_Hive {
  //初始化配置
  val spark = new sql.SparkSession
  .Builder()
    .enableHiveSupport()
    .appName("project_1")
    .master("local[2]")
    .getOrCreate()

  //設置日志的級別
  spark.sparkContext.setLogLevel("WARN")

  def main(args: Array[String]): Unit = {

    try {
      if (args.length != 1) {
          data_load(args(0).toInt)
      } else if (args.length != 2) {
        for (i <- args(0).toInt to args(1).toInt) {
          data_load(i)
        }
      } else {
        System.err.println("Usage:<start_time> or <start_time> <end_time>")
        System.exit(1)
      }
    }catch {
      case ex:Exception => println("Exception")
    }finally{
      spark.stop()
    }
  }
  def data_load(i:Int): Unit = {
    println(s"*******data_${i}********")
    val data = spark.read.option("inferSchema", "true").option("header", "false")
      .csv(s"file:///home/spark/file/project/${i}visit.txt")
      .toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time")
    data.createTempView(s"table_${i}")
    spark.sql("use project_1".stripMargin)
    spark.sql(
      s"""
         |create external table if not exists ${i}visit
         |(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,
         |region string, screen string, stay_time int) stored as parquet
         |location 'hdfs://master:9000/project_dest/${i}'
         """.stripMargin)
    spark
      .sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)
  }
}

    6、打成jar包(我的IDEA版本是2017.3版本)

  如果沒有上面這一欄,點擊View,然后勾選Toolbar即可

點擊ok

此時這里會成成這么一個文件,是編譯之后的class文件

到這個目錄下會找到這么一個jar包

找到該文件夾,上傳到服務器,cd到該目錄下運行命令:

spark-submit  --class spark._sql.project_1.Conn_hive --master spark://master:7077 --executor-memory 2g --num-executors 3  /spark_maven_project.jar 20180901 20180910

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM