Spark與mysql整合


一、需求:把最終結果存儲在mysql中

1、UrlGroupCount1類

import java.net.URL
import java.sql.DriverManager

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 把最終結果存儲在mysql中
  */
object UrlGroupCount1 {
  def main(args: Array[String]): Unit = {
    //1.創建spark程序入口
    val conf: SparkConf = new SparkConf().setAppName("UrlGroupCount1").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    //2.加載數據
    val rdd1: RDD[String] = sc.textFile("e:/access.log")

    //3.將數據切分
    val rdd2: RDD[(String, Int)] = rdd1.map(line => {
      val s: Array[String] = line.split("\t")
      //元組輸出
      (s(1), 1)
    })

    //4.累加求和
    val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_+_)

    //5.取出分組的學院
    val rdd4: RDD[(String, Int)] = rdd3.map(x => {
      val url = x._1
      val host: String = new URL(url).getHost.split("[.]")(0)
      //元組輸出
      (host, x._2)
    })

    //6.根據學院分組
    val rdd5: RDD[(String, List[(String, Int)])] = rdd4.groupBy(_._1).mapValues(it => {
      //根據訪問量排序 倒序
      it.toList.sortBy(_._2).reverse.take(1)
    })

    //7.把計算結果保存到mysql中
    rdd5.foreach(x => {
      //把數據寫到mysql
      val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urlcount?charactorEncoding=utf-8","root", "root")
      //把spark結果插入到mysql中
      val sql = "INSERT INTO url_data (xueyuan,number_one) VALUES (?,?)"
      //執行sql
      val statement = conn.prepareStatement(sql)

      statement.setString(1, x._1)
      statement.setString(2, x._2.toString())
      statement.executeUpdate()
      statement.close()
      conn.close()
    })

    //8.關閉資源
    sc.stop()
  }
}

2、mysql創建數據庫和表

CREATE DATABASE urlcount;
USE urlcount;

CREATE TABLE url_data(
    uid INT PRIMARY KEY AUTO_INCREMENT,
    xueyuan VARCHAR(50),
    number_one VARCHAR(200)
)

3、結果

二、Spark提供的連接mysql的方式--jdbcRDD

1、JdbcRDDDemo類

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark提供的連接mysql的方式
  * jdbcRDD
  */
object JdbcRDDDemo {
  def main(args: Array[String]): Unit = {
    //1.創建spark程序入口
    val conf: SparkConf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    //匿名函數
    val connection = () => {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      DriverManager.getConnection("jdbc:mysql://localhost:3306/urlcount?characterEncoding=utf-8","root", "root")
    }

    //查詢數據
    val jdbcRdd: JdbcRDD[(Int, String, String)] = new JdbcRDD(
      //指定sparkContext
      sc,
      connection,
      "SELECT * FROM url_data where uid >= ? AND uid <= ?",
       //2個任務並行
      1, 4, 2,
      r => {
        val uid = r.getInt(1)
        val xueyuan = r.getString(2)
        val number_one = r.getString(3)
        (uid, xueyuan, number_one)
      }
    )

    val result: Array[(Int, String, String)] = jdbcRdd.collect()
    println(result.toBuffer)
    sc.stop()
  }
}

2、結果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM