通過spark實現點擊流日志分析案例
1. 訪問的pv
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object PV { def main(args: Array[String]): Unit = { //todo:創建sparkconf,設置appName //todo:setMaster("local[2]")在本地模擬spark運行 這里的數字表示 使用2個線程 val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]") //todo:創建SparkContext val sc: SparkContext = new SparkContext(sparkConf) //todo:讀取數據 val file: RDD[String] = sc.textFile("d:\\data\\access.log") //todo:將一行數據作為輸入,輸出("pv",1) val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1)) //todo:聚合輸出 val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_) totalPV.foreach(println) sc.stop() } }
2. 訪問的uv
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object UV { def main(args: Array[String]): Unit = { //todo:構建SparkConf和 SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]") val sc: SparkContext = new SparkContext(sparkConf) //todo:讀取數據 val file: RDD[String] = sc.textFile("d:\\data\\access.log") //todo:對每一行分隔,獲取IP地址 val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0)) //todo:對ip地址進行去重,最后輸出格式 ("UV",1) val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1)) //todo:聚合輸出 val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_) totalUV.foreach(println) //todo:數據結果保存 totalUV.saveAsTextFile("d:\\data\\out") sc.stop() } }
3. 訪問的topN
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 求訪問的topN */ object TopN { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]") val sc: SparkContext = new SparkContext(sparkConf) sc.setLogLevel("WARN") //讀取數據 val file: RDD[String] = sc.textFile("d:\\data\\access.log") //將一行數據作為輸入,輸出(來源URL,1) val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1)) //聚合 排序-->降序 val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false) //通過take取topN,這里是取前5名 val finalResult: Array[(String, Int)] = result.take(5) println(finalResult.toBuffer) sc.stop() } }
通過Spark實現ip地址查詢
1. 需求分析
在互聯網中,我們經常會見到城市熱點圖這樣的報表數據,例如在百度統計中,會統計今年的熱門旅游城市、熱門報考學校等,會將這樣的信息顯示在熱點圖中。
因此,我們需要通過日志信息(運行商或者網站自己生成)和城市ip段信息來判斷用戶的ip段,統計熱點經緯度。
2. 技術調研
因為我們的需求是完成一張報表信息,所以對程序的實時性沒有要求,所以可以選擇內存計算spark來實現上述功能。
3. 架構設計
搭建spark集群
4. 開發流程
4.1. 數據准備
4.2. ip日志信息
在ip日志信息中,我們只需要關心ip這一個維度就可以了,其他的不做介紹
4.3. 城市ip段信息
5. 代碼開發
5.1. 思路
1、 加載城市ip段信息,獲取ip起始數字和結束數字,經度,維度
2、 加載日志數據,獲取ip信息,然后轉換為數字,和ip段比較
3、 比較的時候采用二分法查找,找到對應的經度和維度
4、 然后對經度和維度做單詞計數
5.2. 代碼
package cn.itcast.IPlocaltion import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * ip地址查詢 */ object IPLocaltion_Test { def main(args: Array[String]): Unit = { //todo:創建sparkconf 設置參數 val sparkConf: SparkConf = new SparkConf().setAppName("IPLocaltion_Test").setMaster("local[2]") //todo:創建SparkContext val sc = new SparkContext(sparkConf) //todo:讀取基站數據 val data: RDD[String] = sc.textFile("d:\\data\\ip.txt") //todo:對基站數據進行切分 ,獲取需要的字段 (ipStart,ipEnd,城市位置,經度,緯度) val jizhanRDD: RDD[(String, String, String, String, String)] = data.map(_.split("\\|")).map( x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8), x(13), x(14))) //todo:獲取RDD的數據 val jizhanData: Array[(String, String, String, String, String)] = jizhanRDD.collect() //todo:廣播變量,一個只讀的數據區,所有的task都能讀到的地方 val jizhanBroadcast: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanData) //todo:讀取目標數據 val destData: RDD[String] = sc.textFile("d:\\data\\20090121000132.394251.http.format") //todo:獲取數據中的ip地址字段 val ipData: RDD[String] = destData.map(_.split("\\|")).map(x=>x(1)) //todo:把IP地址轉化為long類型,然后通過二分法去基站數據中查找,找到的維度做wordCount val result=ipData.mapPartitions(iter=>{ //獲取廣播變量中的值 val valueArr: Array[(String, String, String, String, String)] = jizhanBroadcast.value //todo:操作分區中的itertator iter.map(ip=>{ //將ip轉化為數字long val ipNum:Long=ipToLong(ip) //拿這個數字long去基站數據中通過二分法查找,返回ip在valueArr中的下標 val index:Int=binarySearch(ipNum,valueArr) //根據下標獲取對一個的經緯度 val tuple = valueArr(index) //返回結果 ((經度,維度),1) ((tuple._4,tuple._5),1) }) }) //todo:分組聚合 val resultFinal: RDD[((String, String), Int)] = result.reduceByKey(_+_) //todo:打印輸出 resultFinal.foreach(println) //todo:將結果保存到mysql表中 resultFinal.map(x=>(x._1._1,x._1._2,x._2)).foreachPartition(data2Mysql) sc.stop() } //todo:ip轉為long類型 def ipToLong(ip: String): Long = { //todo:切分ip地址。 val ipArray: Array[String] = ip.split("\\.") var ipNum=0L for(i <- ipArray){ ipNum=i.toLong | ipNum << 8L } ipNum } //todo:通過二分查找法,獲取ip在廣播變量中的下標 def binarySearch(ipNum: Long, valueArr: Array[(String, String, String, String, String)]): Int ={ //todo:口訣:上下循環尋上下,左移右移尋中間 //開始下標 var start=0 //結束下標 var end=valueArr.length-1 while(start<=end){ val middle=(start+end)/2 if(ipNum>=valueArr(middle)._1.toLong && ipNum<=valueArr(middle)._2.toLong){ return middle } if(ipNum > valueArr(middle)._2.toLong){ start=middle } if(ipNum<valueArr(middle)._1.toLong){ end=middle } } -1 } //todo:數據保存到mysql表中 def data2Mysql(iterator:Iterator[(String,String, Int)]):Unit = { //todo:創建數據庫連接Connection var conn:Connection=null //todo:創建PreparedStatement對象 var ps:PreparedStatement=null //todo:采用拼占位符問號的方式寫sql語句。 var sql="insert into iplocaltion(longitude,latitude,total_count) values(?,?,?)" //todo:獲取數據連接 conn=DriverManager.getConnection("jdbc:mysql://itcast01:3306/spark","root","root123") //todo: 選中想被try/catch包圍的語句 ctrl+alt+t 快捷鍵選中try/catch/finally try { iterator.foreach(line=> { //todo:預編譯sql語句 ps = conn.prepareStatement(sql) //todo:對占位符設置值,占位符順序從1開始,第一個參數是占位符的位置,第二個參數是占位符的值。 ps.setString(1, line._1) ps.setString(2, line._2) ps.setLong(3, line._3) //todo:執行 ps.execute() }) } catch { case e:Exception =>println(e) } finally { if(ps!=null){ ps.close() } if (conn!=null){ conn.close() } } } }