024 關於spark中日志分析案例


1.四個需求

  需求一:求contentsize的平均值、最小值、最大值

  需求二:請各個不同返回值的出現的數據 ===> wordCount程序

  需求三:獲取訪問次數超過N次的IP地址

  需求四:獲取訪問次數最多的前K個endpoint的值 ==> TopN

 

2.主程序LogAnalyzer.scala

  1 package com.ibeifeng.bigdata.spark.core.log
  2 
  3 import org.apache.spark.rdd.RDD
  4 import org.apache.spark.{SparkConf, SparkContext}
  5 
  6 /**
  7   * Apache日志分析
  8   * Created by ibf on 01/15.
  9   */
 10 object LogAnalyzer {
 11   def main(args: Array[String]): Unit = {
 12     val conf = new SparkConf()
 13       .setAppName("log-analyzer")
 14       .setMaster("local[*]")
 15       .set("spark.eventLog.enabled", "true")
 16       .set("spark.eventLog.dir", "hdfs://hadoop-senior01:8020/spark-history")
 17     val sc = SparkContext.getOrCreate(conf)
 18 
 19     // ================日志分析具體代碼==================
 20     // HDFS上日志存儲路徑
 21     val path = "/beifeng/spark/access/access.log"
 22 
 23     // 創建rdd
 24     val rdd = sc.textFile(path)
 25 
 26     // rdd轉換,返回進行后續操作
 27     val apacheAccessLog: RDD[ApacheAccessLog] = rdd
 28       // 過濾數據
 29       .filter(line => ApacheAccessLog.isValidateLogLine(line))
 30       .map(line => {
 31         // 對line數據進行轉換操作
 32         ApacheAccessLog.parseLogLine(line)
 33       })
 34 
 35     // 對多次時候用的rdd進行cache
 36     apacheAccessLog.cache()
 37 
 38     // 需求一:求contentsize的平均值、最小值、最大值
 39     /*
 40     * The average, min, and max content size of responses returned from the server.
 41     * */
 42     val contentSizeRDD: RDD[Long] = apacheAccessLog
 43       // 提取計算需要的字段數據
 44       .map(log => (log.contentSize))
 45 
 46     // 對重復使用的RDD進行cache
 47     contentSizeRDD.cache()
 48 
 49     // 開始計算平均值、最小值、最大值
 50     val totalContentSize = contentSizeRDD.sum()
 51     val totalCount = contentSizeRDD.count()
 52     val avgSize = 1.0 * totalContentSize / totalCount
 53     val minSize = contentSizeRDD.min()
 54     val maxSize = contentSizeRDD.max()
 55 
 56     // 當RDD不使用的時候,進行unpersist
 57     contentSizeRDD.unpersist()
 58 
 59     // 結果輸出
 60     println(s"ContentSize Avg:${avgSize}, Min: ${minSize}, Max: ${maxSize}")
 61 
 62     // 需求二:請各個不同返回值的出現的數據 ===> wordCount程序
 63     /*
 64     * A count of response code's returned.
 65     * */
 66     val responseCodeResultRDD = apacheAccessLog
 67       // 提取需要的字段數據, 轉換為key/value鍵值對,方便進行reduceByKey操作
 68       // 當連續出現map或者flatMap的時候,將多個map/flatMap進行合並
 69       .map(log => (log.responseCode, 1))
 70       // 使用reduceByKey函數,按照key進行分組后,計算每個key出現的次數
 71       .reduceByKey(_ + _)
 72 
 73     // 結果輸出
 74     println(s"""ResponseCode :${responseCodeResultRDD.collect().mkString(",")}""")
 75 
 76     // 需求三:獲取訪問次數超過N次的IP地址
 77     // 需求三額外:對IP地址進行限制,部分黑名單IP地址不統計
 78     /*
 79     * All IPAddresses that have accessed this server more than N times.
 80     * 1. 計算IP地址出現的次數 ===> WordCount程序
 81     * 2. 數據過濾
 82     * */
 83     val blackIP = Array("200-55-104-193.dsl.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
 84     // 由於集合比較大,將集合的內容廣播出去
 85     val broadCastIP = sc.broadcast(blackIP)
 86     val N = 10
 87     val ipAddressRDD = apacheAccessLog
 88       // 過濾IP地址在黑名單中的數據
 89       .filter(log => !broadCastIP.value.contains(log.ipAddress))
 90       // 獲取計算需要的IP地址數據,並將返回值轉換為Key/Value鍵值對類型
 91       .map(log => (log.ipAddress, 1L))
 92       // 使用reduceByKey函數進行聚合操作
 93       .reduceByKey(_ + _)
 94       // 過濾數據,要求IP地址必須出現N次以上
 95       .filter(tuple => tuple._2 > N)
 96     // 獲取滿足條件IP地址, 為了展示方便,將下面這行代碼注釋
 97     //      .map(tuple => tuple._1)
 98 
 99     // 結果輸出
100     println(s"""IP Address :${ipAddressRDD.collect().mkString(",")}""")
101 
102     // 需求四:獲取訪問次數最多的前K個endpoint的值 ==> TopN
103     /*
104     * The top endpoints requested by count.
105     * 1. 先計算出每個endpoint的出現次數
106     * 2. 再進行topK的一個獲取操作,獲取出現次數最多的前K個值
107     * */
108     val K = 10
109     val topKValues = apacheAccessLog
110       // 獲取計算需要的字段信息,並返回key/value鍵值對
111       .map(log => (log.endpoint, 1))
112       // 獲取每個endpoint對應的出現次數
113       .reduceByKey(_ + _)
114       // 獲取前10個元素, 而且使用我們自定義的排序類
115       .top(K)(LogSortingUtil.TupleOrdering)
116     // 如果只需要endpoint的值,不需要出現的次數,那么可以通過map函數進行轉換
117     //      .map(_._1)
118 
119     // 結果輸出
120     println(s"""TopK values:${topKValues.mkString(",")}""")
121 
122 
123     // 對不在使用的rdd,去除cache
124     apacheAccessLog.unpersist()
125 
126     // ================日志分析具體代碼==================
127 
128     sc.stop()
129   }
130 }

 

3.需要的輔助類一(返回匹配的日志)

 1 package com.ibeifeng.bigdata.spark.core.log
 2 
 3 import scala.util.matching.Regex
 4 
 5 /**
 6   * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
 7   * Created by ibf on 01/15.
 8   */
 9 case class ApacheAccessLog(
10                             ipAddress: String, // IP地址
11                             clientId: String, // 客戶端唯一標識符
12                             userId: String, // 用戶唯一標識符
13                             serverTime: String, // 服務器時間
14                             method: String, // 請求類型/方式
15                             endpoint: String, // 請求的資源
16                             protocol: String, // 請求的協議名稱
17                             responseCode: Int, // 請求返回值:比如:200、401
18                             contentSize: Long // 返回的結果數據大小
19                           )
20 
21 /**
22   * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
23   * on 01/15.
24   * 提供一些操作Apache Log的工具類供SparkCore使用
25   */
26 object ApacheAccessLog {
27   // Apache日志的正則
28   val PARTTERN: Regex =
29   """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
30 
31   /**
32     * 驗證一下輸入的數據是否符合給定的日志正則,如果符合返回true;否則返回false
33     *
34     * @param line
35     * @return
36     */
37   def isValidateLogLine(line: String): Boolean = {
38     val options = PARTTERN.findFirstMatchIn(line)
39 
40     if (options.isEmpty) {
41       false
42     } else {
43       true
44     }
45   }
46 
47   /**
48     * 解析輸入的日志數據
49     *
50     * @param line
51     * @return
52     */
53   def parseLogLine(line: String): ApacheAccessLog = {
54     if (!isValidateLogLine(line)) {
55       throw new IllegalArgumentException("參數格式異常")
56     }
57 
58     // 從line中獲取匹配的數據
59     val options = PARTTERN.findFirstMatchIn(line)
60 
61     // 獲取matcher
62     val matcher = options.get
63 
64     // 構建返回值
65     ApacheAccessLog(
66       matcher.group(1), // 獲取匹配字符串中第一個小括號中的值
67       matcher.group(2),
68       matcher.group(3),
69       matcher.group(4),
70       matcher.group(5),
71       matcher.group(6),
72       matcher.group(7),
73       matcher.group(8).toInt,
74       matcher.group(9).toLong
75     )
76   }
77 }

 

4.需要的輔助類二(自定義的一個二元組的比較器,方便進行TopN)

 1 package com.ibeifeng.bigdata.spark.core.log
 2 
 3 /**
 4   * Created by ibf on 01/15.
 5   */
 6 object LogSortingUtil {
 7 
 8   /**
 9     * 自定義的一個二元組的比較器
10     */
11   object TupleOrdering extends scala.math.Ordering[(String, Int)] {
12     override def compare(x: (String, Int), y: (String, Int)): Int = {
13       // 按照出現的次數進行比較,也就是按照二元組的第二個元素進行比較
14       x._2.compare(y._2)
15     }
16   }
17 
18 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM