Spark SQL實現日志離線批處理


一、 基本的離線數據處理架構:
 
  1. 數據采集   Flume:Web日志寫入到HDFS
  2. 數據清洗   臟數據 Spark、Hive、MR等計算框架來完成。 清洗完之后再放回HDFS
  3. 數據處理   按照需要,進行業務的統計和分析。 也通過計算框架完成
  4. 處理結果入庫   存放到RDBMS、NoSQL中
  5. 數據可視化    通過圖形化展示出來。  ECharts、HUE、Zeppelin
 
處理框圖:
 
1 2 3 4 5 6 7為離線處理,其中5不一定是Hive(還有Spark SQL等) 6不一定是RDBMS(NoSQL)
執行時,可用調度框架Oozie、Azkaban,指定任務執行的時間
 
另外一條線是實時處理
 
 
擬定項目需求:
  1. 統計某時間段最受歡迎的某項的TopN和對應的訪問次數
  2. 按地市統計最受歡迎  從IP提取城市信息
  3. 按訪問流量統計
 
 
互聯網日志一般包括有:
訪問時間  訪問URL  耗費流量   訪問IP地址
從日志里提取以上我們需要的數據
 
假設我們現在僅有一台電腦供學習作為集群使用,為了防止內存溢出,有必要進行剪切日志:
用head -10000命令截取前10000條
數據量太大的話,在IDE中可能會報錯
 
 
 
 二、日志處理過程
 
數據清洗:
 
第一步: 從原始日志提取有用信息,本例中就是拿到時間、URL、流量、IP
  1. 讀取日志文件,得到RDD,通過map方法,split成一個數組,然后選擇數組中有用的幾項(用斷點的方法分析哪幾項有用,並匹配相應的變量)
  2. 獲取到的信息有可能因為某些問題,如線程問題而導致生成了帶有錯誤的信息,第一步中一開始用了SimpleDateFormat(線程不安全)來轉變時間格式,會導致某些時間轉換錯誤。一般要改成FastDateFormat來做
 
實現代碼:
//提取有用信息,轉換格式
object SparkStatFormatJob {
  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("/Users/kingheyleung/Downloads/data/10000_access.log")
    //access.take(10).foreach(println)
    access.map(line => {
      val splits = line.split(" ")
      val ip = splits(0)
      //用斷點的方法,觀察splits數組,找出時間、url、流量對應哪一個字段
      //創建時間類DateUtils,轉換成常用的時間表達方式
      //把url多余的""引號清除掉
      val time = splits(3) + " " + splits(4)
      val url = splits(11).replaceAll("\"", "")
      val traffic = splits(9)
      //(ip, DateUtils.parse(time), url, traffic)  用來測試輸出是否正常
      //把裁剪好的數據重新組合,用Tab分割
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).saveAsTextFile("file:///usr/local/mycode/immooclog/")
    spark.stop()
  }
}

 

//日期解析
object DateUtils {
  //輸入格式
  val ORIGINAL_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:sss Z", Locale.ENGLISH)
  //輸出格式
  val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
 
  def parse(time:String) = {
    TARGET_TIME_FORMAT.format(new Date(getTime(time)))
  }
  def getTime(time:String) = {
    try {
      ORIGINAL_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
    } catch { 
      case e : Exception => {
        0l
      }
    }
  }

 

一般日志處理需要進行分區

本例中按照日志中的訪問時間進行分區
 
 
第二步:解析上一步得到的有用信息,我把它稱為解析日志
 
其實就是把較為整潔的數據日志,解析出每個字段的含義,並把RDD轉成DF
在此案例中,完成的是:
輸入:訪問時間  訪問URL  耗費流量   訪問IP地址  =>轉變為輸出:url、類型(本例中url的后綴有article還是video)、對應ID號、流量、ip、城市、時間、天(用於分組)
並且創建DataFrame(也就是定義Row和StructType,其中Row要和原日志的每個字段對應,而StructType是根據所需要的輸出來定義就行)
 
實現代碼:
//解析日志
object SparkStatCleanJob {
  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("file:///Users/kingheyleung/Downloads/data/access_10000.log")
    //RDD convert to DF, define Row and StructType
    val accessDF = spark.createDataFrame(accessRDD.map(line => LogConvertUtils.convertToRow(line)), LogConvertUtils.struct)
    //accessDF.printSchema()
    //accessDF.show(false)
    spark.stop()
  }
}

 

//RDD轉換成DF的工具類
object LogConvertUtils {
  //構建Struct
  val struct = StructType(
    Array(
      StructField("url", StringType),
      StructField("cmsType", StringType),
      StructField("cmsId", LongType),
      StructField("traffic", LongType),
      StructField("ip", StringType),
      StructField("city", StringType),
      StructField("time", StringType),
      StructField("day", StringType)
    )
  )
  //提取信息,構建Row
  def convertToRow(line:String) = {
 
    try {
      val splits = line.split("\t")
      val url = splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)
      val domain = "http://www.imooc.com/"
      val cms = url.substring(url.indexOf(domain) + domain.length())
      val cmsSplits = cms.split("/")
 
      var cmsType = ""
      var cmsId = 0l
      //判斷是否存在
      if (cmsSplits.length > 1) {
        cmsType = cmsSplits(0)
        cmsId = cmsSplits(1).toLong
      }
      val city = IpUtils.getCity(ip)     //通過Ip解析工具傳進,具體看下面
      val time = splits(0)
      val day = time.substring(0, 10).replaceAll("-", "")
 
      //定義Row,與Struct一樣
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e: Exception => Row(0)
    }
  }
}

注意:轉換時一定要記得類型轉換!!!!

 
進一步解析:對IP地址解析來獲得城市信息
 
在這里,為了讓IP地址轉換成直觀的城市信息,我使用了GitHub上的開源項目來實現:
用Maven編譯下載的項目
mvn clean package -DskipTests
 
安裝jar包到自己的Maven倉庫中: 
mvn install:install-file -Dfile=路徑.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
 
在IDE里面的pom.xml添加dependency,參照GitHub主頁上的pom.xml中的dependency
但是出現報錯了:
java.io.FileNotFoundException:
file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
根據提示,我們需要在項目源碼中找到相應的文件拷進去IDE中的main/resources中!
 
 存儲清洗后的數據:
按day分區來進行存儲  partitionBy
存儲模式:mode(SaveMode.Overwrite)  覆蓋存儲
coalesce:據說生產中經常用,是項目的調優點,控制文件的輸出大小,個數
 
 
三、統計功能實現
 
功能實現一:統計TopN視頻  
 
第一步:讀取數據,read.format().load
第二步:
  1. 使用DataFrame API統計分析
  2. SQL API
最后把統計結果保存在MySQL數據庫中
 
調優點:
讀取parquet文件時,系統會默認解析各字段相應的數據類型,但有時候我們就只需要它是String類型,需要在SparkSession定義時添加:
config("spark.sql.sources.partitionColumnTypeInference.enabled, "false"")
變成只會按照原類型讀入
 
兩種方法:
若使用DataFrame API來做:
用$號時候需要導入 隱式轉換(這里是列名轉換成列)!spark.implicits._
用到dataframe的count()函數要導入包:org.apache.spark.sql.functions._
 
若使用SQL API來做:
創建臨時表createTempView
小心寫SQL語句換行時不注意而忽略空格
 
實現代碼:
 
//完成統計操作
object TopNStatJob {
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
      .master("local[2]").getOrCreate()
    val accessDF = spark.read.format("parquet").load("/Users/kingheyleung/Downloads/data/clean/")
    dfCountTopNVideo(spark, accessDF)
    sqlCountTopNVideo(spark, accessDF)
    //accessDF.printSchema()
 
 
    spark.stop()
  }
  def dfCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {
    /*
    * DF API
    * */
 
    //導入隱式轉換, 留意$號的使用, 並且導入functions包,使agg聚合函數count能夠使用,此處若不用$的話,就無法讓times進行desc排序了
    import spark.implicits._
    val topNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
      .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
    topNDF.show(false)
  }
  def sqlCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {
    /*
    * SQL API
    * */
 
    //創建臨時表access_view,注意換行時,很容易忽略掉空格
    accessDF.createTempView("access_view")
    val topNDF = spark.sql("select day, cmsId, count(1) as times from access_view " +
      "where day == '20170511' and cmsType == 'video' " +
      "group by day, cmsId " +
      "order by times desc")
    topNDF.show(false)
  }
}
 
 
在保存數據之前,需要寫連接MySQL數據庫的工具類,用到java.sql包
  1. 使用DriverManager,連接到mysql 3306
  2. 釋放資源,connection和preparedstatement都要,注意處理異常
 
注意:若測試時拿不到連接,出現以下報錯,那就是沒有在dependency中添加或者選對mysql-connetor包
java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/imooc_project?user=root&password=666
Error:scalac: error while loading <root>, Error accessing /Users/kingheyleung/.m2/repository/mysql/mysql-connector-java/5.0.8/mysql-connector-java-5.0.8.jar
 
我最終選的是5.1.40版本才對了
 
 
實現代碼:
/*
* 連接MySQL數據庫
* 操作工具類
* */
object MySQLUtils {
  //獲得連接
  def getConnection(): Unit = {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=666")
  }
  //釋放資源
  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      connection.close()
    }
  }
}

 

 
把統計數據保存到MySQL
  1. 在mysql中創建一張表,包含day,cms_Id,times三個字段(注意各自的數據類型,以及定義不允許為NULL,並把day和cms_Id作為PRI KEY)
  2. 創建模型類case class,三個輸入參數,day、cms_Id,times
  3. 創建操作數據庫DAO類,輸入的參數是一個list,list裝的是上面的模型類,目的是插入insert記錄到數據庫中,DAO中分以下幾步:
  4. 首先,做jdbc連接的准備,創建connection和prepareStatement,把關閉連接也寫好,用try catch finally拋出異常;
  5. 然后寫sql語句,preparestatement需要賦值的地方用占位符放着;
  6. 進行對list遍歷,把每個對象都放進pstmt中
  7. 調優點!!!遍歷前把自動提交關掉,遍歷中把pstmt加入批處理中,遍歷完后執行批處理操作!最后手工提交連接
 
實現代碼:
//課程訪問次數實體類
case class VideoAccessStat(day: String, cmsId:Long, times: Long)
 
 
/*
* 各個維度統計的DAO操作
* */
object StatDAO {
  /*
  * 批量保存VideoAccessStat到數據庫
  * */
  def insertDayAccessTopN(list: ListBuffer[VideoAccessStat]): Unit = {
 
    var connection: Connection = null  //jdbc的准備工作, 定義連接
    var pstmt: PreparedStatement = null
 
    try {
      connection = MySQLUtils.getConnection() //真正獲取連接
 
      connection.setAutoCommit(false)   //為了實現批處理,要關掉默認的自動提交
 
      val sql = "insert into day_topn_video(day, cms_id, times) values (?, ?, ?)"  //占位符
      pstmt = connection.prepareStatement(sql)  //把SQL語句生成pstmt對象,后面才可以填充占位符中的數據
 
      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.times)
 
        pstmt.addBatch()   //加入批處理
      }
 
      pstmt.execute()    //執行批量處理
      connection.commit()    //手工提交
 
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }
}

 

 
為了對應以上的第3步,要把統計記錄的DF生成一個個對象,放進list中:
  1. 創建模型類對應的list
  2. 對記錄進行遍歷,把記錄的每個字段當做參數,創建模型類對象
  3. 把每個對象添加到list中
  4. 把list傳進DAO類中 
 
以下代碼添加到上面的TopNJob類里面中就可以把之前生成到的topDF的結果記錄保存到MySQL當中了:
try {
  topNDF.foreachPartition(partitionOfRecords => { //
    val list = new ListBuffer[VideoAccessStat]  //創建list來裝統計記錄
 
    //遍歷每一條記錄,取出來上面對應的三個字段day,cmsId,times
    partitionOfRecords.foreach(info => {
      val day = info.getAs[String]("day")   //后面的就是取出來的記錄的每個字段
      val cmsId = info.getAs[Long]("cmsId")
      val times = info.getAs[Long]("times")
 
      //每一次循環創建一個VideoAccessStat對象,添加一次進入list中
      list.append(VideoAccessStat(day, cmsId, times))
    })
    //把list傳進DAO類
    StatDAO.insertDayAccessTopN(list)
  })
} catch {
  case e: Exception => e.printStackTrace()
}

 

 到此為止已經把項目需求一完成。
 
 
 功能實現二:按照城市來找出topN視頻
 
在功能一的基礎上,運用row_number函數來實現
 
具體的實現代碼:
 
  //先計算訪問次數,並按照day,cmsId,city分組
  val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20170511" && accessDF.col("cmsType") === "video")
    .groupBy("day", "cmsId", "city").agg(count("cmsId").as("times"))
 
  //進行分地市排序,使用到row_number函數,生成一個排名,定義為time_rank, 並且取排名前3
  cityAccessTopNDF.select(
    cityAccessTopNDF.col("day"),
    cityAccessTopNDF.col("cmsId"),
    cityAccessTopNDF.col("times"),
    cityAccessTopNDF.col("city"),
    row_number().over(Window.partitionBy(cityAccessTopNDF.col("city"))
      .orderBy(cityAccessTopNDF.col("times").desc)
    ).as("times_rank")
  ).filter("times_rank <= 3").show(false)
}

 

 其他步驟和功能一一樣,但是 插入Mysql的時候報錯,原因是MySQL不支持插入中文!!!!
首先可以在mysql命令行中用SET character來改:
SET character_set_client = utf8
 
可通過
show variables like 'character_set_%’;
查看當前的character編碼設置
 
然后在jdbc連接時,加上:
useUnicode=true&characterEncoding=utf8
 
改了之后,雖然能夠導入MySQL了,而且不出現亂碼,但只有一部分數據,並且在控制台報錯:
 
com.mysql.jdbc.PreparedStatement.fillSendPacket
com.mysql.jdbc.PreparedStatement.execute
 
后來把批處理刪掉竟然就可以把所有數據導入了:
 
pstmt.executeUpdate  //不使用批處理的pstmt插入
 
 
功能三:按流量來排序topN視頻
和功能一幾乎完全一樣,只不過計算流量總和時用的不是count函數而是要用sum函數
 
為了代碼的復用性,防止生成重復的數據,在StatDAO定義刪除的函數:
 
def deleteDayData(day: String) = {
 
  var connection: Connection = null
  var pstmt: PreparedStatement = null
  var tables = Array("day_topn_video",
    "day_city_topn_video",
    "traffic_topn_video"
  )
 
  try {
    connection = MySQLUtils.getConnection()
 
    for (table <- tables) {
      val deleteSql = s"delete from $table where day = ?”  //Scala特殊處理
      pstmt = connection.prepareStatement(deleteSql)
      pstmt.setString(1, table)
      pstmt.setString(2, day)
      pstmt.executeUpdate()
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    MySQLUtils.release(connection, pstmt)
  }
}

 

 
需要注意的是,table在pstmt中的特殊用法!!
 
 
后續會對以上內容進行可視化處理、跑在YARN上的修改、性能調優


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM