需求1、找到ip所屬區域
描述
http.log:用戶訪問網站所產生的日志。日志格式為:時間戳、IP地址、訪問網址、訪問數據、瀏覽器信息等
ip.dat:ip段數據,記錄着一些ip段范圍對應的位置
文件位置:data/http.log、data/ip.dat
# http.log樣例數據。格式:時間戳、IP地址、訪問網址、訪問數據、瀏覽器信息
20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59http://bsalsa.com/EmbeddedWB- 14.59 from: http://bsalsa.com/ )|http://show.51.com/main.php|
# ip.dat樣例數據
122.228.96.0|122.228.96.255|2061787136|2061787391|亞洲|中國|浙江|溫州||電信|330300|China|CN|120.672111|28.000575
要求
將 http.log 文件中的 ip 轉換為地址。如將 122.228.96.111 轉為 溫州,並統計各城市的總訪問量
詳解
1.解析http.log文件,獲取該文件的IP地址
2.解析IP.dat文件,獲取每個城市對應的IP網段
3.通過數據將http.log的IP地址替換為城市名稱,並通過城市去重累加個數;
package com.lagou.homework import org.apache.spark import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD object Homework { def main(args: Array[String]): Unit = { //定義sparkContext val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) val sc = new SparkContext(conf) //設置日志級別 sc.setLogLevel("warn") /** * 數據計算 */ //采用RDD:讀取 http.log文件 val httpData: RDD[Long] = sc.textFile("data/http.log") .map(x => ipConvert(x.split("\\|")(1))) //讀取IP配置文件 val ipData: Array[(Long, Long, String)] = sc.textFile("data/ip.dat") .map { line => val field: Array[String] = line.split("\\|") (field(2).toLong, field(3).toLong, field(6)) }.collect() val ipBC: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(ipData.sortBy(_._1)) //逐條數據比對,找到對應的城市。使用二分查找 val results: Array[(Any, Int)] = httpData.mapPartitions { iter => val ipsInfo: Array[(Long, Long, String)] = ipBC.value iter.map { ip => val city: Any = getCityName(ip, ipsInfo) (city, 1) } }.reduceByKey(_ + _) .collect() results.sortBy(_._2) .foreach(println) //關閉連接 sc.stop() } //定義一個IP轉換函數 192.168.10.2 -- 192.168.25.3 def ipConvert(ip: String): Long = { val arr: Array[Long] = ip.split("\\.") .map(_.toLong) var ipLong: Long = 0L for (i <- arr.indices) { val ans: Long = scala.math.pow(255, i).toLong ipLong += arr(i) * ans } ipLong } //尋找IP對應的城市 def getCityName(ip: Long, ips: Array[(Long, Long, String)]): String = { var start = 0 var end: Int = ips.length - 1 var middle = 0 while (start <= end) { middle = (start + end) / 2 if ((ip >= ips(middle)._1) && (ip <= ips(middle)._2)) return ips(middle)._3 else if (ip < ips(middle)._1) end = middle - 1 else start = middle + 1 } "Unknown" } }
結果驗證
需求2 日志分析
日志格式:IP 命中率(Hit/Miss) 響應時間 請求時間 請求方法 請求URL 請求協議 狀態碼 響應大小referer 用戶代理
日志文件位置:data/cdn.txt
~~~
100.79.121.48 HIT 33 [15/Feb/2017:00:00:46 +0800] "GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174055 "http://www.abc.com.cn/" "Mozilla/4.0+(compatible;+MSIE+6.0;+Windows+NT+5.1;+Trident/4.0;)"
~~~
術語解釋:
PV(page view),即頁面瀏覽量;衡量網站或單一網頁的指標
uv(unique visitor),指訪問某個站點或點擊某條新聞的不同IP地址的人數
要求
2.1、計算獨立IP數
2.2、統計每個視頻獨立IP數(視頻的標志:在日志文件的某些可以找到 *.mp4,代表一個視頻文件)
2.3、統計一天中每個小時的流量
詳解
package com.lagou.homework import java.util.regex.{Matcher, Pattern} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Homework2 { //正則表達式:符合視頻格式的正則 val ipPattern: Pattern = Pattern.compile("""(\S+) .+/(\S+\.mp4) .*""")
//正則表達式 val flowPattern: Pattern = Pattern.compile(""".+ \[(.+?) .+ (200|206|304) (\d+) .+""") def main(args: Array[String]): Unit = { //定義sparkContext val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) val sc = new SparkContext(conf) //設置日志級別 sc.setLogLevel("warn") /** * 數據計算 */ //解析數據 val dataRDD: RDD[String] = sc.textFile("data/cdn.txt") // 1.獲取獨立的IP數 val results: RDD[(String, Int)] = dataRDD.map(x => (x.split("\\s+")(0), 1)) .reduceByKey(_ + _) .sortBy(_._2, ascending = false, 1) println("----------獨立IP數------------------") results.take(10).foreach(println) println(s"獨立IP數:${results.count()}") // 2.統計每個視頻獨立IP數 //匹配正則,查找視頻鏈接 val videoRDD: RDD[((String, String), Int)] = dataRDD.map(line => { val matcherFlag: Matcher = ipPattern.matcher(line) if (matcherFlag.matches()) { ((matcherFlag.group(2), matcherFlag.group(1)), 1) } else { ((" ", " "), 0) } }) // ((141081.mp4,125.116.211.162),1) //計算每個視頻的獨立ip數 val result2: RDD[(String, Int)] = videoRDD.filter { case ((video, ip), count) => video != "" && ip != "" && count != 0 } .map { case ((video, ip), _) => (ip, video) } .distinct() .map { case (_, video) => (video, 1) } .reduceByKey(_ + _) .sortBy(_._2,ascending = false,1) println("----------每個視頻的獨立IP數------------------") result2.foreach(println) //3.統計一天中每個小時的流量 val flowRDD: RDD[(String, Long)] = dataRDD.map(line => { val matchFlag: Matcher = flowPattern.matcher(line) if (matchFlag.matches()) (matchFlag.group(1).split(":")(1), matchFlag.group(3).toLong) else ("", 0L) }) println("----------每小時流量------------------") flowRDD.filter { case (hour, flow) => flow != 0 } // 數據量很小,可以收到一個分區中做reduce,然后轉為集合操作效率高 .reduceByKey(_ + _, 1) .collectAsMap() // 響應大小更換單位為 g .mapValues(_ / 1024 / 1024 / 1024) .toList // 根據小時排序 .sortBy(_._1) .foreach { case (k, v) => println(s"${k}時 CDN流量${v}G") } //關閉資源 sc.stop() } }
結果驗證
3、Spark面試題
假設點擊日志文件(click.log)中每行記錄格式如下:
~~~
INFO 2019-09-01 00:29:53 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:30:31 requestURI:/click?app=2&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:31:03 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=32
INFO 2019-09-01 00:31:51 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=33
~~~
另有曝光日志(imp.log)格式如下:
~~~
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=34
~~~
需求
3.1、用Spark-Core實現統計每個adid的曝光數與點擊數,將結果輸出到hdfs文件;
輸出文件結構為adid、曝光數、點擊數。注意:數據不能有丟失(存在某些adid有imp,沒有clk;或有clk沒有imp)
3.2、你的代碼有多少個shuffle,是否能減少?
(提示:僅有1次shuffle是最優的)
詳解
package com.lagou.homework import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Homework3 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("warn") val clickLog: RDD[String] = sc.textFile("data/click.log") val impLog: RDD[String] = sc.textFile("data/imp.log") // 讀文件:點擊日志 val clkRDD: RDD[(String, (Int, Int))] = clickLog.map { line => val arr: Array[String] = line.split("\\s+") val adid: String = arr(3).substring(arr(3).lastIndexOf("=") + 1) (adid, (1, 0)) } // 讀文件:曝光日志 val impRDD: RDD[(String, (Int, Int))] = impLog.map { line => val arr: Array[String] = line.split("\\s+") val adid: String = arr(3).substring(arr(3).lastIndexOf("=") + 1) (adid, (0, 1)) } // join val RDD: RDD[(String, (Int, Int))] = clkRDD.union(impRDD) .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 寫hdfs RDD.saveAsTextFile("hdfs://linux121:9000/data/") sc.stop() } }
結果驗證
需求4,spark SQL題目
描述:
A表有三個字段:ID、startdate、enddate,有3條數據:
1 2019-03-04 2020-02-03
2 2020-04-05 2020-08-04
3 2019-10-09 2020-06-11
寫SQL(需要SQL和DSL)將以上數據變化為:
2019-03-04 2019-10-09
2019-10-09 2020-02-03
2020-02-03 2020-04-05
2020-04-05 2020-06-11
2020-06-11 2020-08-04
2020-08-04 2020-08-04
詳解
package com.lagou.homework import org.apache.spark.SparkContext import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.{DataFrame, Row, SparkSession} object Homework4 { def main(args: Array[String]): Unit = { //初始化:創建sparkSession 和 SparkContext val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getCanonicalName) .master("local[*]") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //spark SQL 導包 import spark.implicits._ import org.apache.spark.sql.functions._ val df: DataFrame = List("1 2019-03-04 2020-02-03", "2 2020-04-05 2020-08-04", "3 2019-10-09 2020-06-11").toDF() //DSL操作 val w1: WindowSpec = Window.orderBy($"value" asc).rowsBetween(0, 1) println("-------------------DSL 操作----------------") df.as[String] .map(str => str.split(" ")(1) + " " + str.split(" ")(2)) .flatMap(str => str.split("\\s+")) .distinct() .sort($"value" asc) .withColumn("new", max("value") over (w1) ) .show() //SQL操作 println("-------------------sql 操作----------------") df.flatMap{case Row(line: String)=> line.split("\\s+").tail }.toDF("date") .createOrReplaceTempView("t1") spark.sql( """ |select date,max(date) over(order by date rows between current row and 1 following) as date1 | from t1 |""".stripMargin).show //關閉資源 spark.close() } }
結果驗證