spark練習--mysql的讀取


前面我們一直操作的是,通過一個文件來讀取數據,這個里面不涉及數據相關的只是,今天我們來介紹一下spark操作中存放與讀取
  1.首先我們先介紹的是把數據存放進入mysql中,今天介紹的這個例子是我們前兩篇介紹的統計IP的次數的一篇內容,最后的返回值類型是List((String,Int))類型的,其內容是為:

  

  此時,我們只需要在寫一個與數據庫相連接,把數據放入里面即可,這個方法為data2Mysql

  val data2MySQL = (iterator:Iterator[(String,Int)]) =>{
    var conn:Connection = null
    var ps:PreparedStatement = null
    val sql = "INSERT INTO location_info1 (location,counts,accesse_date) VALUES(?,?,?)"
    try{
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "root")
      iterator.foreach(line =>{
        ps = conn.prepareStatement(sql)
        ps.setString(1,line._1)
        ps.setInt(2,line._2)
        ps.setDate(3,new Date(System.currentTimeMillis()))
        ps.executeUpdate()
      })
    }catch{
      case e:Exception => println("Mysql Exception")
    }finally{
      if(ps != null)
          ps.close()
      if(conn != null)
        conn.close()
    }
  }

則此時整體代碼為:

package cn.wj.spark.day06

import java.sql.{Connection, Date, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by WJ on 2017/1/4.
  */
object IPLocation {

  val data2MySQL = (iterator:Iterator[(String,Int)]) =>{
    var conn:Connection = null
    var ps:PreparedStatement = null
    val sql = "INSERT INTO location_info1 (location,counts,accesse_date) VALUES(?,?,?)"
    try{
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "root")
      iterator.foreach(line =>{
        ps = conn.prepareStatement(sql)
        ps.setString(1,line._1)
        ps.setInt(2,line._2)
        ps.setDate(3,new Date(System.currentTimeMillis()))
        ps.executeUpdate()
      })
    }catch{
      case e:Exception => println("Mysql Exception")
    }finally{
      if(ps != null)
          ps.close()
      if(conn != null)
        conn.close()
    }
  }

  def ip2Long(ip: String): Long = {
    val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i <- 0 until fragments.length){
      ipNum =  fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }

   def binarySearch(lines:Array[(String,String,String)],ip:Long) :Int ={
     var low = 0
     var high = lines.length -1
     while(low <= high){
       val middle = (low + high) / 2
       if((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
         return middle
       if(ip < lines(middle)._1.toLong)
         high = middle -1
       else{
         low = middle + 1
       }
     }
      -1
   }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("IPLocation").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ipRulesRdd = sc.textFile("e://Test/ip.txt").map(lines =>{
      val fields = lines.split("\\|")
      val start_num = fields(2)
      val end_num = fields(3)
      val province = fields(6)
      (start_num,end_num,province)
    })
    //全部的IP映射規則
      val ipRulesArrary = ipRulesRdd.collect()

    //廣播規則,這個是由Driver向worker中廣播規則
      val ipRulesBroadcast = sc.broadcast(ipRulesArrary)

    //加載要處理的數據
      val ipsRdd = sc.textFile("e://Test/access_log").map(line =>{
        val fields = line.split("\\|")
        fields(1)
      })

    val result = ipsRdd.map(ip =>{
      val ipNum = ip2Long(ip)
      val index = binarySearch(ipRulesBroadcast.value,ipNum)
      val info = ipRulesBroadcast.value(index)
      info
    }).map(t => {(t._3,1)}).reduceByKey(_+_)

    //將數據寫入數據庫中
    result.foreachPartition(data2MySQL)

    println(result.collect().toBuffer)
    sc.stop()

  }


}

我們查詢數據庫,我們就可以看見

  

  2.說完了把數據放入到數據庫中,但是我跟傾向於從數據庫中讀取數據,然后在進行操作

  例如,我們就把上面存入數據庫中數據讀取出來吧,主要比較懶,就是想這個樣子用現成的數據庫

  

package cn.wj.spark.day07

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by WJ on 2017/1/5.
*/
object JdbcRDDDemo_3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JdbcRDDDemo_3").setMaster("local[2]")
val sc = new SparkContext(conf)

val connection =() =>{
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata","root","root")
}
val jdbcRDD = new JdbcRDD(
sc,
connection,
"SELECT * from location_info where id >= ? AND id <= ?",
1,5,2,
r =>{
val id = r.getInt(1)
val location = r.getString(2)
val counts = r.getInt(3)
val access_date = r.getDate(4)
(id,location,counts,access_date)
}
)
val jdbcRDDC = jdbcRDD.collect()
jdbcRDDC.map(line =>{
println("id:"+line._1)
println("location:"+line._2)
println("counts:"+line._3)
println("date:"+line._4)
println("------------------------------")
})
sc.stop
}
}

  其中這個里面比較難以就是在於new JdbcRDD(),我們為什么這樣寫,其實當我們進入這個源碼的時候,它就已經規定了這個里面寫的是什么

  1.sc,

  2.connection,

  3.sql語句

  4.查詢出的數據的lowereBound,upperBound,已經線程數(其實可以簡單理解為分區數),這個里面我們可能回想,我就想查詢出所有,為什么還要傳入參數,能不能不傳輸上下界的參數,其實不行的,這個是代碼都已近提前規定好的,就算你想全部查詢完成這個整個表,你也應該讓id覆蓋上着整個的范圍,

  5.Set,其實就是一個元祖,也可以是返回來的值

  則最后的輸出結果為:

  

  最后再說一點,我們可以看到有用到foreachPartition(),這個和foreach()的區別是什么

    spark操作mysql的數據庫,此時如果對於foreach(),其實我們可以選擇foreachPartition(),因為當我們選擇foreachPartition(),這個可以拿取一整個分區的數據然后再把他放入到數據庫中,如果使用foreach()的話,則是拿取一個數據放入到數據庫中,建立連接,在拿取一個數據,建立連接,再放入數據庫中


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM