一、 基本的離線數據處理架構:
- 數據采集 Flume:Web日志寫入到HDFS
- 數據清洗 臟數據 Spark、Hive、MR等計算框架來完成。 清洗完之后再放回HDFS
- 數據處理 按照需要,進行業務的統計和分析。 也通過計算框架完成
- 處理結果入庫 存放到RDBMS、NoSQL中
- 數據可視化 通過圖形化展示出來。 ECharts、HUE、Zeppelin
處理框圖:

1 2 3 4 5 6 7為離線處理,其中5不一定是Hive(還有Spark SQL等) 6不一定是RDBMS(NoSQL)
執行時,可用調度框架Oozie、Azkaban,指定任務執行的時間
另外一條線是實時處理
擬定項目需求:
- 統計某時間段最受歡迎的某項的TopN和對應的訪問次數
- 按地市統計最受歡迎 從IP提取城市信息
- 按訪問流量統計
互聯網日志一般包括有:
訪問時間 訪問URL 耗費流量 訪問IP地址
從日志里提取以上我們需要的數據
假設我們現在僅有一台電腦供學習作為集群使用,為了防止內存溢出,有必要進行剪切日志:
用head -10000命令截取前10000條
數據量太大的話,在IDE中可能會報錯
二、日志處理過程
數據清洗:
第一步: 從原始日志提取有用信息,本例中就是拿到時間、URL、流量、IP
- 讀取日志文件,得到RDD,通過map方法,split成一個數組,然后選擇數組中有用的幾項(用斷點的方法分析哪幾項有用,並匹配相應的變量)
- 獲取到的信息有可能因為某些問題,如線程問題而導致生成了帶有錯誤的信息,第一步中一開始用了SimpleDateFormat(線程不安全)來轉變時間格式,會導致某些時間轉換錯誤。一般要改成FastDateFormat來做
實現代碼:
//提取有用信息,轉換格式 object SparkStatFormatJob { def main(args: Array[String]) = { val spark = SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate() val access = spark.sparkContext.textFile("/Users/kingheyleung/Downloads/data/10000_access.log") //access.take(10).foreach(println) access.map(line => { val splits = line.split(" ") val ip = splits(0) //用斷點的方法,觀察splits數組,找出時間、url、流量對應哪一個字段 //創建時間類DateUtils,轉換成常用的時間表達方式 //把url多余的""引號清除掉 val time = splits(3) + " " + splits(4) val url = splits(11).replaceAll("\"", "") val traffic = splits(9) //(ip, DateUtils.parse(time), url, traffic) 用來測試輸出是否正常 //把裁剪好的數據重新組合,用Tab分割 DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip }).saveAsTextFile("file:///usr/local/mycode/immooclog/") spark.stop() } }
//日期解析 object DateUtils { //輸入格式 val ORIGINAL_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:sss Z", Locale.ENGLISH) //輸出格式 val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") def parse(time:String) = { TARGET_TIME_FORMAT.format(new Date(getTime(time))) } def getTime(time:String) = { try { ORIGINAL_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime } catch { case e : Exception => { 0l } } }
一般日志處理需要進行分區
本例中按照日志中的訪問時間進行分區
第二步:解析上一步得到的有用信息,我把它稱為解析日志
其實就是把較為整潔的數據日志,解析出每個字段的含義,並把RDD轉成DF
在此案例中,完成的是:
輸入:訪問時間 訪問URL 耗費流量 訪問IP地址 =>轉變為輸出:url、類型(本例中url的后綴有article還是video)、對應ID號、流量、ip、城市、時間、天(用於分組)
並且創建DataFrame(也就是定義Row和StructType,其中Row要和原日志的每個字段對應,而StructType是根據所需要的輸出來定義就行)
實現代碼:
//解析日志 object SparkStatCleanJob { def main(args: Array[String]) = { val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate() val accessRDD = spark.sparkContext.textFile("file:///Users/kingheyleung/Downloads/data/access_10000.log") //RDD convert to DF, define Row and StructType val accessDF = spark.createDataFrame(accessRDD.map(line => LogConvertUtils.convertToRow(line)), LogConvertUtils.struct) //accessDF.printSchema() //accessDF.show(false) spark.stop() } }
//RDD轉換成DF的工具類 object LogConvertUtils { //構建Struct val struct = StructType( Array( StructField("url", StringType), StructField("cmsType", StringType), StructField("cmsId", LongType), StructField("traffic", LongType), StructField("ip", StringType), StructField("city", StringType), StructField("time", StringType), StructField("day", StringType) ) ) //提取信息,構建Row def convertToRow(line:String) = { try { val splits = line.split("\t") val url = splits(1) val traffic = splits(2).toLong val ip = splits(3) val domain = "http://www.imooc.com/" val cms = url.substring(url.indexOf(domain) + domain.length()) val cmsSplits = cms.split("/") var cmsType = "" var cmsId = 0l //判斷是否存在 if (cmsSplits.length > 1) { cmsType = cmsSplits(0) cmsId = cmsSplits(1).toLong } val city = IpUtils.getCity(ip) //通過Ip解析工具傳進,具體看下面 val time = splits(0) val day = time.substring(0, 10).replaceAll("-", "") //定義Row,與Struct一樣 Row(url, cmsType, cmsId, traffic, ip, city, time, day) } catch { case e: Exception => Row(0) } } }
注意:轉換時一定要記得類型轉換!!!!
進一步解析:對IP地址解析來獲得城市信息
在這里,為了讓IP地址轉換成直觀的城市信息,我使用了GitHub上的開源項目來實現:
用Maven編譯下載的項目
mvn clean package -DskipTests
安裝jar包到自己的Maven倉庫中:
mvn install:install-file -Dfile=路徑.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
在IDE里面的pom.xml添加dependency,參照GitHub主頁上的pom.xml中的dependency
但是出現報錯了:
java.io.FileNotFoundException:
file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
根據提示,我們需要在項目源碼中找到相應的文件拷進去IDE中的main/resources中!
存儲清洗后的數據:
按day分區來進行存儲 partitionBy
存儲模式:mode(SaveMode.Overwrite) 覆蓋存儲
coalesce:據說生產中經常用,是項目的調優點,控制文件的輸出大小,個數
三、統計功能實現
功能實現一:統計TopN視頻
第一步:讀取數據,read.format().load
第二步:
- 使用DataFrame API統計分析
- SQL API
最后把統計結果保存在MySQL數據庫中
調優點:
讀取parquet文件時,系統會默認解析各字段相應的數據類型,但有時候我們就只需要它是String類型,需要在SparkSession定義時添加:
config("spark.sql.sources.partitionColumnTypeInference.enabled, "false"")
變成只會按照原類型讀入
兩種方法:
若使用DataFrame API來做:
用$號時候需要導入
隱式轉換(這里是列名轉換成列)!spark.implicits._
用到dataframe的count()函數要導入包:org.apache.spark.sql.functions._
若使用SQL API來做:
創建臨時表createTempView
小心寫SQL語句換行時不注意而忽略空格
實現代碼:
//完成統計操作 object TopNStatJob { def main(args: Array[String]) { val spark = SparkSession.builder().appName("TopNStatJob") .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") .master("local[2]").getOrCreate() val accessDF = spark.read.format("parquet").load("/Users/kingheyleung/Downloads/data/clean/") dfCountTopNVideo(spark, accessDF) sqlCountTopNVideo(spark, accessDF) //accessDF.printSchema() spark.stop() } def dfCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = { /* * DF API * */ //導入隱式轉換, 留意$號的使用, 並且導入functions包,使agg聚合函數count能夠使用,此處若不用$的話,就無法讓times進行desc排序了 import spark.implicits._ val topNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video") .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc) topNDF.show(false) } def sqlCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = { /* * SQL API * */ //創建臨時表access_view,注意換行時,很容易忽略掉空格 accessDF.createTempView("access_view") val topNDF = spark.sql("select day, cmsId, count(1) as times from access_view " + "where day == '20170511' and cmsType == 'video' " + "group by day, cmsId " + "order by times desc") topNDF.show(false) } }
在保存數據之前,需要寫連接MySQL數據庫的工具類,用到java.sql包
- 使用DriverManager,連接到mysql 3306
- 釋放資源,connection和preparedstatement都要,注意處理異常
注意:若測試時拿不到連接,出現以下報錯,那就是沒有在dependency中添加或者選對mysql-connetor包
java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/imooc_project?user=root&password=666
Error:scalac: error while loading <root>, Error accessing /Users/kingheyleung/.m2/repository/mysql/mysql-connector-java/5.0.8/mysql-connector-java-5.0.8.jar
我最終選的是5.1.40版本才對了
實現代碼:
/* * 連接MySQL數據庫 * 操作工具類 * */ object MySQLUtils { //獲得連接 def getConnection(): Unit = { DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=666") } //釋放資源 def release(connection: Connection, pstmt: PreparedStatement): Unit = { try { if (pstmt != null) { pstmt.close() } } catch { case e: Exception => e.printStackTrace() } finally { connection.close() } } }
把統計數據保存到MySQL
- 在mysql中創建一張表,包含day,cms_Id,times三個字段(注意各自的數據類型,以及定義不允許為NULL,並把day和cms_Id作為PRI KEY)
- 創建模型類case class,三個輸入參數,day、cms_Id,times
- 創建操作數據庫DAO類,輸入的參數是一個list,list裝的是上面的模型類,目的是插入insert記錄到數據庫中,DAO中分以下幾步:
- 首先,做jdbc連接的准備,創建connection和prepareStatement,把關閉連接也寫好,用try catch finally拋出異常;
- 然后寫sql語句,preparestatement需要賦值的地方用占位符放着;
- 進行對list遍歷,把每個對象都放進pstmt中
- 調優點!!!遍歷前把自動提交關掉,遍歷中把pstmt加入批處理中,遍歷完后執行批處理操作!最后手工提交連接
實現代碼:
//課程訪問次數實體類 case class VideoAccessStat(day: String, cmsId:Long, times: Long) /* * 各個維度統計的DAO操作 * */ object StatDAO { /* * 批量保存VideoAccessStat到數據庫 * */ def insertDayAccessTopN(list: ListBuffer[VideoAccessStat]): Unit = { var connection: Connection = null //jdbc的准備工作, 定義連接 var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() //真正獲取連接 connection.setAutoCommit(false) //為了實現批處理,要關掉默認的自動提交 val sql = "insert into day_topn_video(day, cms_id, times) values (?, ?, ?)" //占位符 pstmt = connection.prepareStatement(sql) //把SQL語句生成pstmt對象,后面才可以填充占位符中的數據 for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.cmsId) pstmt.setLong(3, ele.times) pstmt.addBatch() //加入批處理 } pstmt.execute() //執行批量處理 connection.commit() //手工提交 } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } } }
為了對應以上的第3步,要把統計記錄的DF生成一個個對象,放進list中:
- 創建模型類對應的list
- 對記錄進行遍歷,把記錄的每個字段當做參數,創建模型類對象
- 把每個對象添加到list中
- 把list傳進DAO類中
以下代碼添加到上面的TopNJob類里面中就可以把之前生成到的topDF的結果記錄保存到MySQL當中了:
try { topNDF.foreachPartition(partitionOfRecords => { // val list = new ListBuffer[VideoAccessStat] //創建list來裝統計記錄 //遍歷每一條記錄,取出來上面對應的三個字段day,cmsId,times partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") //后面的就是取出來的記錄的每個字段 val cmsId = info.getAs[Long]("cmsId") val times = info.getAs[Long]("times") //每一次循環創建一個VideoAccessStat對象,添加一次進入list中 list.append(VideoAccessStat(day, cmsId, times)) }) //把list傳進DAO類 StatDAO.insertDayAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() }
到此為止已經把項目需求一完成。
功能實現二:按照城市來找出topN視頻
在功能一的基礎上,運用row_number函數來實現
具體的實現代碼:
//先計算訪問次數,並按照day,cmsId,city分組 val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20170511" && accessDF.col("cmsType") === "video") .groupBy("day", "cmsId", "city").agg(count("cmsId").as("times")) //進行分地市排序,使用到row_number函數,生成一個排名,定義為time_rank, 並且取排名前3 cityAccessTopNDF.select( cityAccessTopNDF.col("day"), cityAccessTopNDF.col("cmsId"), cityAccessTopNDF.col("times"), cityAccessTopNDF.col("city"), row_number().over(Window.partitionBy(cityAccessTopNDF.col("city")) .orderBy(cityAccessTopNDF.col("times").desc) ).as("times_rank") ).filter("times_rank <= 3").show(false) }
其他步驟和功能一一樣,但是
插入Mysql的時候報錯,原因是MySQL不支持插入中文!!!!
首先可以在mysql命令行中用SET character來改:
SET character_set_client = utf8
可通過
show variables like 'character_set_%’;
查看當前的character編碼設置
然后在jdbc連接時,加上:
useUnicode=true&characterEncoding=utf8
改了之后,雖然能夠導入MySQL了,而且不出現亂碼,但只有一部分數據,並且在控制台報錯:
com.mysql.jdbc.PreparedStatement.fillSendPacket
com.mysql.jdbc.PreparedStatement.execute
后來把批處理刪掉竟然就可以把所有數據導入了:
pstmt.executeUpdate //不使用批處理的pstmt插入
功能三:按流量來排序topN視頻
和功能一幾乎完全一樣,只不過計算流量總和時用的不是count函數而是要用sum函數
為了代碼的復用性,防止生成重復的數據,在StatDAO定義刪除的函數:
def deleteDayData(day: String) = { var connection: Connection = null var pstmt: PreparedStatement = null var tables = Array("day_topn_video", "day_city_topn_video", "traffic_topn_video" ) try { connection = MySQLUtils.getConnection() for (table <- tables) { val deleteSql = s"delete from $table where day = ?” //Scala特殊處理 pstmt = connection.prepareStatement(deleteSql) pstmt.setString(1, table) pstmt.setString(2, day) pstmt.executeUpdate() } } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } }
需要注意的是,table在pstmt中的特殊用法!!
后續會對以上內容進行可視化處理、跑在YARN上的修改、性能調優