1.四個需求
需求一:求contentsize的平均值、最小值、最大值
需求二:請各個不同返回值的出現的數據 ===> wordCount程序
需求三:獲取訪問次數超過N次的IP地址
需求四:獲取訪問次數最多的前K個endpoint的值 ==> TopN
2.主程序LogAnalyzer.scala
1 package com.ibeifeng.bigdata.spark.core.log 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 /** 7 * Apache日志分析 8 * Created by ibf on 01/15. 9 */ 10 object LogAnalyzer { 11 def main(args: Array[String]): Unit = { 12 val conf = new SparkConf() 13 .setAppName("log-analyzer") 14 .setMaster("local[*]") 15 .set("spark.eventLog.enabled", "true") 16 .set("spark.eventLog.dir", "hdfs://hadoop-senior01:8020/spark-history") 17 val sc = SparkContext.getOrCreate(conf) 18 19 // ================日志分析具體代碼================== 20 // HDFS上日志存儲路徑 21 val path = "/beifeng/spark/access/access.log" 22 23 // 創建rdd 24 val rdd = sc.textFile(path) 25 26 // rdd轉換,返回進行后續操作 27 val apacheAccessLog: RDD[ApacheAccessLog] = rdd 28 // 過濾數據 29 .filter(line => ApacheAccessLog.isValidateLogLine(line)) 30 .map(line => { 31 // 對line數據進行轉換操作 32 ApacheAccessLog.parseLogLine(line) 33 }) 34 35 // 對多次時候用的rdd進行cache 36 apacheAccessLog.cache() 37 38 // 需求一:求contentsize的平均值、最小值、最大值 39 /* 40 * The average, min, and max content size of responses returned from the server. 41 * */ 42 val contentSizeRDD: RDD[Long] = apacheAccessLog 43 // 提取計算需要的字段數據 44 .map(log => (log.contentSize)) 45 46 // 對重復使用的RDD進行cache 47 contentSizeRDD.cache() 48 49 // 開始計算平均值、最小值、最大值 50 val totalContentSize = contentSizeRDD.sum() 51 val totalCount = contentSizeRDD.count() 52 val avgSize = 1.0 * totalContentSize / totalCount 53 val minSize = contentSizeRDD.min() 54 val maxSize = contentSizeRDD.max() 55 56 // 當RDD不使用的時候,進行unpersist 57 contentSizeRDD.unpersist() 58 59 // 結果輸出 60 println(s"ContentSize Avg:${avgSize}, Min: ${minSize}, Max: ${maxSize}") 61 62 // 需求二:請各個不同返回值的出現的數據 ===> wordCount程序 63 /* 64 * A count of response code's returned. 65 * */ 66 val responseCodeResultRDD = apacheAccessLog 67 // 提取需要的字段數據, 轉換為key/value鍵值對,方便進行reduceByKey操作 68 // 當連續出現map或者flatMap的時候,將多個map/flatMap進行合並 69 .map(log => (log.responseCode, 1)) 70 // 使用reduceByKey函數,按照key進行分組后,計算每個key出現的次數 71 .reduceByKey(_ + _) 72 73 // 結果輸出 74 println(s"""ResponseCode :${responseCodeResultRDD.collect().mkString(",")}""") 75 76 // 需求三:獲取訪問次數超過N次的IP地址 77 // 需求三額外:對IP地址進行限制,部分黑名單IP地址不統計 78 /* 79 * All IPAddresses that have accessed this server more than N times. 80 * 1. 計算IP地址出現的次數 ===> WordCount程序 81 * 2. 數據過濾 82 * */ 83 val blackIP = Array("200-55-104-193.dsl.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net") 84 // 由於集合比較大,將集合的內容廣播出去 85 val broadCastIP = sc.broadcast(blackIP) 86 val N = 10 87 val ipAddressRDD = apacheAccessLog 88 // 過濾IP地址在黑名單中的數據 89 .filter(log => !broadCastIP.value.contains(log.ipAddress)) 90 // 獲取計算需要的IP地址數據,並將返回值轉換為Key/Value鍵值對類型 91 .map(log => (log.ipAddress, 1L)) 92 // 使用reduceByKey函數進行聚合操作 93 .reduceByKey(_ + _) 94 // 過濾數據,要求IP地址必須出現N次以上 95 .filter(tuple => tuple._2 > N) 96 // 獲取滿足條件IP地址, 為了展示方便,將下面這行代碼注釋 97 // .map(tuple => tuple._1) 98 99 // 結果輸出 100 println(s"""IP Address :${ipAddressRDD.collect().mkString(",")}""") 101 102 // 需求四:獲取訪問次數最多的前K個endpoint的值 ==> TopN 103 /* 104 * The top endpoints requested by count. 105 * 1. 先計算出每個endpoint的出現次數 106 * 2. 再進行topK的一個獲取操作,獲取出現次數最多的前K個值 107 * */ 108 val K = 10 109 val topKValues = apacheAccessLog 110 // 獲取計算需要的字段信息,並返回key/value鍵值對 111 .map(log => (log.endpoint, 1)) 112 // 獲取每個endpoint對應的出現次數 113 .reduceByKey(_ + _) 114 // 獲取前10個元素, 而且使用我們自定義的排序類 115 .top(K)(LogSortingUtil.TupleOrdering) 116 // 如果只需要endpoint的值,不需要出現的次數,那么可以通過map函數進行轉換 117 // .map(_._1) 118 119 // 結果輸出 120 println(s"""TopK values:${topKValues.mkString(",")}""") 121 122 123 // 對不在使用的rdd,去除cache 124 apacheAccessLog.unpersist() 125 126 // ================日志分析具體代碼================== 127 128 sc.stop() 129 } 130 }
3.需要的輔助類一(返回匹配的日志)
1 package com.ibeifeng.bigdata.spark.core.log 2 3 import scala.util.matching.Regex 4 5 /** 6 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 7 * Created by ibf on 01/15. 8 */ 9 case class ApacheAccessLog( 10 ipAddress: String, // IP地址 11 clientId: String, // 客戶端唯一標識符 12 userId: String, // 用戶唯一標識符 13 serverTime: String, // 服務器時間 14 method: String, // 請求類型/方式 15 endpoint: String, // 請求的資源 16 protocol: String, // 請求的協議名稱 17 responseCode: Int, // 請求返回值:比如:200、401 18 contentSize: Long // 返回的結果數據大小 19 ) 20 21 /** 22 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 23 * on 01/15. 24 * 提供一些操作Apache Log的工具類供SparkCore使用 25 */ 26 object ApacheAccessLog { 27 // Apache日志的正則 28 val PARTTERN: Regex = 29 """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r 30 31 /** 32 * 驗證一下輸入的數據是否符合給定的日志正則,如果符合返回true;否則返回false 33 * 34 * @param line 35 * @return 36 */ 37 def isValidateLogLine(line: String): Boolean = { 38 val options = PARTTERN.findFirstMatchIn(line) 39 40 if (options.isEmpty) { 41 false 42 } else { 43 true 44 } 45 } 46 47 /** 48 * 解析輸入的日志數據 49 * 50 * @param line 51 * @return 52 */ 53 def parseLogLine(line: String): ApacheAccessLog = { 54 if (!isValidateLogLine(line)) { 55 throw new IllegalArgumentException("參數格式異常") 56 } 57 58 // 從line中獲取匹配的數據 59 val options = PARTTERN.findFirstMatchIn(line) 60 61 // 獲取matcher 62 val matcher = options.get 63 64 // 構建返回值 65 ApacheAccessLog( 66 matcher.group(1), // 獲取匹配字符串中第一個小括號中的值 67 matcher.group(2), 68 matcher.group(3), 69 matcher.group(4), 70 matcher.group(5), 71 matcher.group(6), 72 matcher.group(7), 73 matcher.group(8).toInt, 74 matcher.group(9).toLong 75 ) 76 } 77 }
4.需要的輔助類二(自定義的一個二元組的比較器,方便進行TopN)
1 package com.ibeifeng.bigdata.spark.core.log 2 3 /** 4 * Created by ibf on 01/15. 5 */ 6 object LogSortingUtil { 7 8 /** 9 * 自定義的一個二元組的比較器 10 */ 11 object TupleOrdering extends scala.math.Ordering[(String, Int)] { 12 override def compare(x: (String, Int), y: (String, Int)): Int = { 13 // 按照出現的次數進行比較,也就是按照二元組的第二個元素進行比較 14 x._2.compare(y._2) 15 } 16 } 17 18 }