SparkRDD編程實戰


通過spark實現點擊流日志分析案例

1. 訪問的pv

package cn.itcast

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  object PV {
  def main(args: Array[String]): Unit = {
        //todo:創建sparkconf,設置appName
        //todo:setMaster("local[2]")在本地模擬spark運行 這里的數字表示 使用2個線程
        val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")

        //todo:創建SparkContext
        val sc: SparkContext = new SparkContext(sparkConf)

        //todo:讀取數據
        val file: RDD[String] = sc.textFile("d:\\data\\access.log")

        //todo:將一行數據作為輸入,輸出("pv",1)
        val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1))

        //todo:聚合輸出
         val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_)
         totalPV.foreach(println)

         sc.stop()
  }
}

 


2. 訪問的uv
 

package cn.itcast

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  object UV {
  def main(args: Array[String]): Unit = {
    //todo:構建SparkConf和 SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")

    val sc: SparkContext = new SparkContext(sparkConf)

    //todo:讀取數據
    val file: RDD[String] = sc.textFile("d:\\data\\access.log")

    //todo:對每一行分隔,獲取IP地址
    val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))

    //todo:對ip地址進行去重,最后輸出格式 ("UV",1)
    val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1))

    //todo:聚合輸出
    val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_)
    totalUV.foreach(println)

    //todo:數據結果保存
    totalUV.saveAsTextFile("d:\\data\\out")

    sc.stop()
  }
}

 


3. 訪問的topN
 

package cn.itcast

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  /**
  * 求訪問的topN
  */
  object TopN {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")

    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    //讀取數據
    val file: RDD[String] = sc.textFile("d:\\data\\access.log")

    //將一行數據作為輸入,輸出(來源URL,1)
    val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))

    //聚合 排序-->降序
    val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)

    //通過take取topN,這里是取前5名
    val finalResult: Array[(String, Int)] = result.take(5)
    println(finalResult.toBuffer)

    sc.stop()
  }
}

 


通過Spark實現ip地址查詢
 

1. 需求分析

       在互聯網中,我們經常會見到城市熱點圖這樣的報表數據,例如在百度統計中,會統計今年的熱門旅游城市、熱門報考學校等,會將這樣的信息顯示在熱點圖中。

 

       因此,我們需要通過日志信息(運行商或者網站自己生成)和城市ip段信息來判斷用戶的ip段,統計熱點經緯度。

2. 技術調研

       因為我們的需求是完成一張報表信息,所以對程序的實時性沒有要求,所以可以選擇內存計算spark來實現上述功能。

3. 架構設計

搭建spark集群

4. 開發流程

4.1. 數據准備

4.2. ip日志信息

在ip日志信息中,我們只需要關心ip這一個維度就可以了,其他的不做介紹

 

4.3. 城市ip段信息

 

5. 代碼開發

5.1. 思路

1、  加載城市ip段信息,獲取ip起始數字和結束數字,經度,維度

2、  加載日志數據,獲取ip信息,然后轉換為數字,和ip段比較

3、  比較的時候采用二分法查找,找到對應的經度和維度

4、  然后對經度和維度做單詞計數

5.2. 代碼

package cn.itcast.IPlocaltion

  import java.sql.{Connection, DriverManager, PreparedStatement}
  import org.apache.spark.broadcast.Broadcast
  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  /**
  * ip地址查詢
  */
  object IPLocaltion_Test {
  def main(args: Array[String]): Unit = {
      //todo:創建sparkconf 設置參數
      val sparkConf: SparkConf = new SparkConf().setAppName("IPLocaltion_Test").setMaster("local[2]")

      //todo:創建SparkContext
      val sc = new SparkContext(sparkConf) 

      //todo:讀取基站數據
      val data: RDD[String] = sc.textFile("d:\\data\\ip.txt")

      //todo:對基站數據進行切分 ,獲取需要的字段 (ipStart,ipEnd,城市位置,經度,緯度)
      val jizhanRDD: RDD[(String, String, String, String, String)] = data.map(_.split("\\|")).map(

        x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8), x(13), x(14)))

      //todo:獲取RDD的數據
      val jizhanData: Array[(String, String, String, String, String)] = jizhanRDD.collect()

      //todo:廣播變量,一個只讀的數據區,所有的task都能讀到的地方
      val jizhanBroadcast: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanData) 

      //todo:讀取目標數據
      val destData: RDD[String] = sc.textFile("d:\\data\\20090121000132.394251.http.format")

      //todo:獲取數據中的ip地址字段
      val ipData: RDD[String] = destData.map(_.split("\\|")).map(x=>x(1))

     //todo:把IP地址轉化為long類型,然后通過二分法去基站數據中查找,找到的維度做wordCount
     val result=ipData.mapPartitions(iter=>{

      //獲取廣播變量中的值
      val valueArr: Array[(String, String, String, String, String)] = jizhanBroadcast.value

      //todo:操作分區中的itertator
      iter.map(ip=>{
        //將ip轉化為數字long
        val ipNum:Long=ipToLong(ip)

        //拿這個數字long去基站數據中通過二分法查找,返回ip在valueArr中的下標
        val index:Int=binarySearch(ipNum,valueArr)

        //根據下標獲取對一個的經緯度
        val tuple = valueArr(index)

        //返回結果 ((經度,維度),1)
        ((tuple._4,tuple._5),1)
      })
    })

    //todo:分組聚合
    val resultFinal: RDD[((String, String), Int)] = result.reduceByKey(_+_)

    //todo:打印輸出
    resultFinal.foreach(println)

    //todo:將結果保存到mysql表中
  resultFinal.map(x=>(x._1._1,x._1._2,x._2)).foreachPartition(data2Mysql)
sc.stop()
  }

  //todo:ip轉為long類型
  def ipToLong(ip: String): Long = {
    //todo:切分ip地址。
    val ipArray: Array[String] = ip.split("\\.")
    var ipNum=0L

    for(i <- ipArray){
      ipNum=i.toLong | ipNum << 8L
    }
    ipNum
  }

  //todo:通過二分查找法,獲取ip在廣播變量中的下標
  def binarySearch(ipNum: Long, valueArr: Array[(String, String, String, String, String)]): Int ={
    //todo:口訣:上下循環尋上下,左移右移尋中間
    //開始下標
    var start=0

    //結束下標
    var end=valueArr.length-1

    while(start<=end){
      val middle=(start+end)/2

      if(ipNum>=valueArr(middle)._1.toLong && ipNum<=valueArr(middle)._2.toLong){
        return middle
      }
      if(ipNum > valueArr(middle)._2.toLong){
        start=middle
      }

      if(ipNum<valueArr(middle)._1.toLong){
        end=middle
      }
    }
    -1
  }

  //todo:數據保存到mysql表中
  def data2Mysql(iterator:Iterator[(String,String, Int)]):Unit = {
    //todo:創建數據庫連接Connection
    var conn:Connection=null

    //todo:創建PreparedStatement對象
    var ps:PreparedStatement=null

    //todo:采用拼占位符問號的方式寫sql語句。
    var sql="insert into iplocaltion(longitude,latitude,total_count) values(?,?,?)"

    //todo:獲取數據連接    conn=DriverManager.getConnection("jdbc:mysql://itcast01:3306/spark","root","root123")

    //todo:  選中想被try/catch包圍的語句 ctrl+alt+t 快捷鍵選中try/catch/finally
    try {
        iterator.foreach(line=> {
        //todo:預編譯sql語句
        ps = conn.prepareStatement(sql)

        //todo:對占位符設置值,占位符順序從1開始,第一個參數是占位符的位置,第二個參數是占位符的值。
        ps.setString(1, line._1)
        ps.setString(2, line._2)
        ps.setLong(3, line._3)

        //todo:執行
        ps.execute()
        })
      } catch {
        case e:Exception =>println(e)
      } finally {
        if(ps!=null){
          ps.close()
        }

        if (conn!=null){
          conn.close()
        }
      }
  }
}

 

 

 

 

 

 

 

 

 

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM