spark連接mysql數據庫的幾種方式


一、spark連接mysql數據庫的第一種方式:

 
         
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("createdataframefrommysql")
.config("spark.sql.shuffle.partitions", 1).getOrCreate()

/**
* 讀取mysql的第一中方式
*
*/

val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123")
val person: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","person",properties)

person.show()
spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","(select person.id,person.name,person.age,score.score from person,score where person.id = score.id) T",properties).show()


二、第二種讀取mysql數據的方式 

val map: Map[String, String] = Map[String, String](
      elems = "url" -> "jdbc:mysql://192.168.126.111:3306/spark",
      "driver" -> "com.mysql.jdbc.Driver",
      "user" -> "root",
      "password" -> "123",
      "dbtable" -> "score"

    )

    val score: DataFrame = spark.read.format("jdbc").options(map).load

    score.show()

三、第三種讀取mysql 的方式

  val reader: DataFrameReader = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.126.111:3306/spark")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123")
      .option("dbtable", "score")

    val source2: DataFrame = reader.load()

    source2.show()

四、將spark中的數據傳輸到mysql數據庫

//將以上兩張表注冊為臨時表,進行關聯查詢
    person.createOrReplaceTempView("person")
    score2.createOrReplaceTempView("score")

    val result = spark.sql("select person.id,person.name,person.age,score.score from person,score  where person.id=score.id ")

    result.show()

    //將查詢出的結果保存到mysql表之中

    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result",properties)

一個重要的參數: 

參數:  spark.sql.shuffle.partitions指定sql執行時,解析成sparkjob的分區數。

 

spark-sql將hive中的數據加載成為Dataframe

通過配置讓spark找到hive所在的位置:

1、啟動hive hive --service metastore &

2、將mynode3節點hive的配置文件發送到spark的配置節點  scp  ./hive-site-xml  mynode4:/software/spark-2.3.1/conf/

3、修改hive-site-xml中的配置參數   將其與的配置都刪了,只保留

<configuration>
 <property>
  <name>hive.metastore.uris</name>
  <value>thrift://mynode1:9083</value>
 </property>
</configuration>
配置這個文件的作用:讓spark可以找到hive中的元數據 ,找到元數據也就找到了hive

------------------------------------------------------------------------------------------------------------------------

用spark-sql查詢hive中的數據:

1、啟動hadoop   2、啟動hive  3、啟動spark   /software/spark-2.3.1/sbin/   ./start-all.sh  

./spark-shell --master spark://mynode1:7077,mynode2:7077   --通過這個命令啟動spark的服務 
spark.sql("show databases").show()


使用MR和spaek sql 測試對同一批數據的查詢速度 

spark代碼在本地運行的時候,沒有SparkSession.master()屬性的設置,運行一定會報錯 

spark-sql讀取hive中的數據:

 val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()
    spark.sql("use spark")
    spark.sql("drop table if exists student_infos")
    spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
    spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")
    spark.sql("drop table if exists student_scores")
    spark.sql("create table if not exists student_scores (name string,age int) row format delimited fields terminated by '\t'")
    spark.sql("load data local inpath'/root/test/student_scores' into table student_scores")
    val frame: DataFrame = spark.table("student_infos")

    frame.show()

    val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
    df.show()

    spark.sql("drop table if exists good_student_infos")

    /**
      * 將結果保存到hive 表之中
      *
      */
    df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

 用maven將spark讀取hive中的數據進行打包,首先 clear 之前項目中的tarfget文件就會消失,之后package 對數據進行打包 

將打包完成的jar包上傳到linux從服務器,

用一下命令讀取hive中的數據,並在hive中完成創建表或者刪除一張表 

在spark bin 目錄下

./spark-submit --master spark://mynode1:7077,mynode2:7077 --calss 報名.類名  jar在linux服務器中所在的位置

UDF : 用戶自定義函數 

使用UDF是一對一的關系,讀取一條數據處理得到一條數據

注冊UDF: spark.udf.register("udf name ",function)\

使用UDF:  sparkSession.sql("select xx,udf Name from tableName ....")

實例代碼:

 val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()

    val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhouyu", "lili")

    import spark.implicits._

    val nameDF: DataFrame = nameList.toDF("name")
    nameDF.createOrReplaceTempView("students")
    nameDF.show()

    /**
      * 注冊並自定義函數
      *
      */
    spark.udf.register("STRLEN",(name:String)=>
      {name.length}
    )

   spark.sql("select name,STRLEN(name) as length from  students order by length desc").show(100)

 

 

UDAF: 用戶自定義聚合函數 

       主要是引用一個繼承了UserDefinedAggregateFunction 類的類 

       繼承這個類需要實現八個方法 ,以及每個方法所實現的作用 

       initialize :  1、在Map端每個RDD分區內,按照group by 的字段分組,每個分組都有初始化的值
                         2、在reduce 端給每個group by 的分組做初始值

       update  : 每個組,有新的值進來的時候,進行分組對應的聚合值的計算 

      merge : 在reduce階段,有新的數據進來的時候,對該數據進行聚合

      bufferSchema: 聚合操作的時候,所處理數據的類型

     dataType : //最終函數返回值得數據類型 

     deterministic: 多次運行相同的輸入總是有相同的輸出

      evaluate最后返回一個最終的聚合值要和dataType的類型一致   

    UDAF:用戶自定義聚合函數 的主要作用就是可以實現自己的聚合操作的具體內容的控制,具體實現需要按照業務的不同需求,去重新定義繼承類的八個方法

    開窗函數 : 

   

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("over").enableHiveSupport().getOrCreate()
    spark.sql("use spark")
    spark.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'")
    spark.sql("load data local inpath '/root/test/sales' into table sales")

    /**
      * rank 在每個組內從1開始
      *   5 A 200   --- 1
      *   3 A 100   ---2
      *   4 A 80   ---3
      *   7 A 60   ---4
      *
      *   1 B 100   ---1
      *   8 B 90  ---2
      *   6 B 80  ---3
      *   1 B 70  ---4
      */
    val result = spark.sql(
      "select"
        +" riqi,leibie,jine "
        + "from ("
        + "select "
        +"riqi,leibie,jine,row_number() over (partition by leibie order by jine desc) rank "
        + "from sales) t "
        + "where t.rank<=3")
    result.write.mode(SaveMode.Append).saveAsTable("salesResult")
    result.show(100)

 

用java語言實現讀取mysql中的數據:

SparkConf conf =  new SparkConf();
        conf.setMaster("local").setAppName("mysql");
        conf.set("spark.sql.shuffle.partitions","1");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        Map<String,String> map = new HashMap<String,String>();
        map.put("driver","com.mysql.jdbc.Driver");
        map.put("url","jdbc:mysql://192.168.126.111:3306/spark");
        map.put("user","root");
        map.put("password","123");
        map.put("dbtable","person");

        Dataset<Row> df = sqlContext.read().options(map).format("jdbc").load();
        //df.show();
        df.registerTempTable("person1");

        /**
         * 第二種連接JDBC的方式
         *
         */

        DataFrameReader read = sqlContext.read();
        read.option("driver","com.mysql.jdbc.Driver");
        read.option("url","jdbc:mysql://192.168.126.111:3306/spark");
        read.option("user","root");
        read.option("password","123");
        read.option("dbtable","score");
        Dataset<Row> jdbc = read.format("jdbc").load();
        jdbc.registerTempTable("score1");
        Dataset<Row> result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score from person1 join  score1 on  person1.name = score1.name ");
        result.show();
        Properties prop = new Properties();
        prop.put("user","root");
        prop.put("password","123");

        result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result1",prop);
        //jdbc.show();
        sc.stop();

用Java言語實現讀取hive 中的數據 :

SparkConf conf = new SparkConf();
        conf.setAppName("hive");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext hc = new SQLContext(sc);
        //創建表並加載數據
        hc.sql("use spark");
        hc.sql("create table student_info(name string,age int) row format delimited fields terminated by ','");
        hc.sql("load data local inpath '/root/data/student_infos' into table student_info");
        
        hc.sql("create table student_scores(name string,score int) row format delimited fields terminated by ','");
        hc.sql("load data local inpath '/root/data/student_scores' into table student_score");
        //得到表連接結果 
        Dataset<Row> sql = hc.sql("select t1.name,t1.age,t2.score from student_info t1 join student_score t2 on t1.name = t2.name");
        //將結果寫回到hive 
        sql.write().mode(SaveMode.Overwrite).saveAsTable("student_result");

 

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM