scala之 spark連接SQL和HIVE/IDEA操作HDFS


一、連接SQL

方法一、

package com.njbdqn.linkSql

import java.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._

object LinkSql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("apptest").master("local[2]").getOrCreate()
    // 1.properties
    val prop = new Properties()
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    prop.setProperty("user","root")
    prop.setProperty("password","root")
    // 2.jdbcDF show
    val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)
    jdbcDF.show(false)
    // 3.添加一行
    import spark.implicits._
    val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq((90, "抖抖抖", "男", 23, "sdf", "sdfg@dfg"),(8, "抖33", "男", 23, "s444f", "sdfg@dfg"))))
      .toDF("sid","sname","sgender","sage","saddress","semail")
  //  df.show(false)
    df.write.mode("append").jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)

  }
}

方法二、

package com.njbdqn

import org.apache.spark.sql.{DataFrame, SparkSession}

object KMeansTest {
  def readMySQL(spark:SparkSession):DataFrame ={
    val map:Map[String,String]=Map[String,String](
      elems="url"->"jdbc:mysql://192.168.56.111:3306/myshops",
      "driver" -> "com.mysql.jdbc.Driver",
      "user" ->"root",
      "password"->"root",
      "dbtable"->"customs"
    )
    spark.read.format("jdbc").options(map).load()
  }
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("db").master("local[*]").getOrCreate()
    readMySQL(spark).select("cust_id","company","province_id","city_id","district_id","membership_level","create_at","last_login_time","idno","biz_point","sex","marital_status","education_id","login_count","vocation","post")
      .show(20)
    spark.stop()

  }
}

 方法三、讀取Resource上寫的.properties配置:

https://www.cnblogs.com/sabertobih/p/13874061.html

二、連接HIVE

(一)8 9月寫的,沒有理解,寫的不好

1.添加resources

 2.代碼

package com.njbdqn.linkSql

import org.apache.spark.sql.SparkSession

object LinkHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("apptest").master("local[2]")
      .enableHiveSupport()
      .getOrCreate()
    spark
       // .sql("show databases")
      .sql("select * from storetest.testhive")
      .show(false)
  }
}

 注意!如果XML配置中配置的是集群,  val df = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv") 就失敗了,因為

>>> spark可以讀取本地數據文件,但是需要在所有的節點都有這個數據文件(親測,在有三個節點的集群中,只在master中有這個數據文件時執行textFile方法一直報找不到文件,

在另外兩個work中復制這個文件之后,就可以讀取文件了)

>>> 解決:刪除配置(本地)/上傳到hdfs(集群)


 (二)12月25日寫的

pom文件:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.3.4</version>
    </dependency>

代碼中增加配置:hive.metastore.uris 

開啟metastore元數據共享,

修改方案詳見:https://www.pianshen.com/article/8993307375/

為什么這樣修改?原理見:https://www.cnblogs.com/sabertobih/p/13772933.html

import org.apache.spark.sql.SparkSession

object EventTrans {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]")
     .config("hive.metastore.uris","thrift://192.168.56.115:9083") # 配置metastore server的訪問地址,該server必須開啟服務
      .appName("test")
      .enableHiveSupport().getOrCreate()
    spark.sql("select * from dm_events.dm_final limit 3")
      .show(false)

    spark.close()

  }
}

1)192.168.56.115 需要開啟metastore服務

hive --service metastore

如果不啟動服務,在啟動Spark thriftServer服務的時候會報如下錯誤:

org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2)192.168.56.115 需要配置直連mysql

驗證:

三、操作HDFS 之 刪除

    val spark = SparkSession.builder().master("local[*]").appName("app").getOrCreate();
    /**
     *  刪除checkpoint留下的過程數據
     */
    val path = new Path(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"); //聲明要操作(刪除)的hdfs 文件路徑
    val hadoopConf = spark.sparkContext.hadoopConfiguration
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf)
    if(hdfs.exists(path)) {
      //需要遞歸刪除設置true,不需要則設置false
      hdfs.delete(path, true) //這里因為是過程數據,可以遞歸刪除
    }

出現的問題:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://h1:9000/out, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:381)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:55)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:393)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:452)
at mapreduce.WordCountApp.main(WordCountApp.java:36)

解決方法:

  val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf) 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM