實時--1.1 日志數據分析


 

1. App產生日志數據,發送web請求:

  gmall-mock模塊

//啟動日志 
upload{"area":"heilongjiang","uid":"2","os":"andriod","ch":"huawei","appid":"gmall","mid":"mid_71","type":"startup","vs":"1.1.2"} //事件日志
upload{"area":"heilongjiang","uid":"2","itemid":24,"npgid":41,"evid":"clickItem","os":"andriod","pgid":32,"appid":"gmall","mid":"mid_71","type":"event"}

 

2. springboot接收日志落盤發送給kafka

gmall-logger模塊--SpringBoot的部署

日志前加一個ts時間戳;org.slf4j.LoggerFactory,slf4j是一個接口,它會去找實現類;LoggeerFactory默認的會在jar包中找實現類;

logging(它是LoggeFactory默認使用的)和log4j是競爭關系,所以要在gmall-logger.pom.xml文件中加入exclusions把logging給排除了

  <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>
       <exclusions>
         <exclusion>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-logging</artifactId>
         </exclusion>
       </exclusions>
    </dependency>
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-log4j</artifactId>
       <version>1.3.8.RELEASE</version>
    </dependency>
  </dependencies>

 

com.atguigu.gmall.logger.controller.LoggerController

@RestController
public class LoggerController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    private static final Logger logger = LoggerFactory.getLogger(LoggerController.class);
    @PostMapping("log")
    public String doLog(@RequestParam("log") String log){

        JSONObject jsonObject = JSON.parseObject(log);
        jsonObject.put("ts", System.currentTimeMillis());

        //System.out.println(log);

        // 1. 落盤成為日志文件     // log4j
        logger.info(jsonObject.toJSONString());
        //2. 發送kafka
        if ("startup".equals(jsonObject.getString("type"))){
            kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_STARTUP, jsonObject.toJSONString());
        }else {
            kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_EVENT, jsonObject.toJSONString());
        }
        return "success";
    }
}

利用resources/ log4j.properties進行log日志的落盤:

log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender  //怎么寫這個日志;類型--控制台
log4j.appender.atguigu.MyConsole.target=System.err  //控制台有兩種:System.out日志顏色黑色和System.err日志是紅色的
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout    //自定義的,除了要打印的日志級別,還要打印什么
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n  //格式,p是日志級別,%m輸出的內容,%n是換行;

log4j.appender.atguigu.File=org.apache.log4j.DailyRollingFileAppender  //每日滾動文件,每天產生一個文件;
log4j.appender.atguigu.File.file=/opt/module/applog/gmall/log/app.log  //輸出的文件路徑,linux中的路徑
log4j.appender.atguigu.File.DatePattern='.'yyyy-MM-dd        //輸出文件的后綴; 當天的日志是沒有后綴的,一旦過了12點,就有后綴.'yyyy-MM-dd',后綴是日志時間
log4j.appender.atguigu.File.layout=org.apache.log4j.PatternLayout //自定義格式
log4j.appender.atguigu.File.layout.ConversionPattern=%m%n  //要干干凈凈的打印信息;

log4j.logger.com.atguigu.gmall.logger.controller.LoggerController=info,atguigu.File  
//某一個類的路徑,只監控某個類所產生的日志;log4j.rootLogger=error,atguigu.Myconsole表示根底的,除了上邊指定的都是它,首先是精確匹配到info就日志輸出就按照它的打印,它們后邊的.File或.Myconsole都會輸出

日志級別有:級別從低到高 trace、debug、info、warn、error、fatal,如果寫info,從低到高比它高的都可以輸出出來;

在linux中,log4j要有權限才能創建applog/gmall/log/app.log的目錄,也可以提前創建好這個目錄;

在linux中,非root賬號,不能使用1024以下的端口號;   Tomcat中 server.port=80

 

把日志采集模塊打包部署到Linux中

 

在idea中的maven執行package,把打好的jar包拷貝到Linux 路徑下,啟動jar包:

     可臨時指定端口號,后台執行使用 &,控制台日志不輸出使用/dev/null/ 2>&1 輸入日志到黑洞 也可以輸出到指定目錄 >./app.err

          java -jar  /app/gmall/dw-logger-0.0.1-SNAPSHOT.jar  --server.port=8080  >/dev/null  2>&1  &

  測試 由windows發送日志數據(gmall-mock/ JsonMocker類)到linux 日志落盤

在三台系統同時部署日志采集系統的jar包,分別把/applog/目錄拷貝到三台虛擬機上

java -jar /opt/module/applog/gmall/gmall-logger-0.0.1-SNAPSHOT.jar --server.port=8080 >./app.error 2>&1 & [kris@hadoop101 log]$ tail -10f app.log //監控文件測試數據是否寫入

 

 

 

3. 搭建nginx

  https://www.cnblogs.com/shengyang17/p/10836168.html ,只需一台部署nginx即可;

由windows發送模擬日志,nginx負責路由,日志服務負責接收。

window發送日志------>>niginx路由---> linux中接收日志的jar存儲日志文件並發給kafka--->kafka

更新集群啟動腳本 ,加入nginx操作: ./logger-cluster.sh start 啟動nginx路由,路由三台虛擬機給接收日志服務的jar包 ,並發給fakfa;

logger-cluster.sh

[kris@hadoop101 gmall]$ vim logger-cluster.sh    
#!/bin/bash
JAVA_BIN=/opt/module/jdk1.8.0_144/bin/java
PROJECT=gmall
APPNAME=gmall-logger-0.0.1-SNAPSHOT.jar
SERVER_PORT=8080
 
case $1 in
"start"){
    for i in hadoop101 hadoop102 hadoop103
    do
        echo "========啟動日志服務: $i==============="
        ssh $i  "$JAVA_BIN -Xms32m -Xmx64m -jar /opt/module/applog/$PROJECT/$APPNAME --server.port=$SERVER_PORT >./app.error 2>&1  &"
    done
        echo "==============啟動NGINX==============="
        /opt/module/nginx/sbin/nginx
  };;

"stop"){
        echo "=============關閉NGINX================="
        /opt/module/nginx/sbin/nginx -s stop
    for i in  hadoop101 hadoop102 hadoop103
    do
        echo "========關閉日志服務: $i==============="
        ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1
    done
 
  };;
esac
View Code

ssh之間的互相連接:三種聯通方式: ①source /etc/profile;  ②ssh 會讀.bashrc cat /etc/profile>>.brashrc   ③$JAVA_BIN

netstart -anp | more 查看端口

當前日志模塊:

                       

 

4. 日活DAU

搭建實時處理模塊gmall-realtime:

  消費kafka;利用redis過濾當日已經計入的日活設備;把每批次新增的當日日活信息保存到ES中;從ES中查出數據,發布成數據接口

 消費kafka& 利用redis去重

  1、把今日新增的活躍用戶保存到redis中;   2、每條數據經過過濾,去掉redis中的已有的用戶

  設計Redis的kv; Key:dau:2019-01-22, value: 設備id

  業務類開發

 DauApp.scala    消費kafka中數據(通過MyKafkaUtil獲取) --->>利用redis去重---->>保存到ES(通過MyEsUtil工具類)中;

object DauApp {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]")
        val streamingContext: StreamingContext = new StreamingContext(new SparkContext(conf),Seconds(5))

        val inputStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_STARTUP, streamingContext)
        //  1 把當日已訪問過的用戶保存起來 redis
        //  2  以當日已訪問用戶清單為依據 ,過濾掉再次訪問的請求

        // 轉換case class  補全日期格式
        val startupLogDStream: DStream[StartUpLog] = inputStream.map { record =>
          val jsonStr: String = record.value()
          val startUpLog: StartUpLog = JSON.parseObject(jsonStr, classOf[StartUpLog])
          //把日期進行補全
          val dateTimeString: String = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(startUpLog.ts))
          val dateTimeArray: Array[String] = dateTimeString.split(" ")
          startUpLog.logDate = dateTimeArray(0)
          startUpLog.logHour = dateTimeArray(1).split(":")(0)
          startUpLog.logHourMinute = dateTimeArray(1)
          startUpLog
        }
        // 去重操作
        val filterDStream: DStream[StartUpLog] = startupLogDStream.transform { rdd =>
          //driver 每時間間隔執行一次
          println("過濾前:" + rdd.count())
          val jedis: Jedis = RedisUtil.getJedisClient
          val curDate: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
          val key: String = "dau:" + curDate
          val dauSet: util.Set[String] = jedis.smembers(key) //當日日活用戶清單
        //使用廣播變量
        val dauBC: Broadcast[util.Set[String]] = streamingContext.sparkContext.broadcast(dauSet)

          val filterRDD: RDD[StartUpLog] = rdd.filter {
            startuplog =>
              !dauBC.value.contains(startuplog.mid)
          }
          println("過濾后:" + filterRDD.count())
          filterRDD
        }

        // 考慮到 新的訪問可能會出現重復 ,所以以mid為key進行去重,每個mid為小組 每組取其中一個
        val startupLogGroupDStream: DStream[(String, Iterable[StartUpLog])] = filterDStream.map{startuplog => (startuplog.mid, startuplog)}.groupByKey()

        val startupLogFilterDistinctDStream: DStream[StartUpLog] = startupLogGroupDStream.flatMap {
          case (mid, startupLogIter) =>
            val startupLogOneIter: Iterable[StartUpLog] = startupLogIter.take(1)
            startupLogOneIter
        }

        //  1 把當日已訪問過的用戶保存到 redis
        startupLogFilterDistinctDStream.foreachRDD{rdd =>
          rdd.foreachPartition{startupLogItr =>
            val jedis: Jedis = RedisUtil.getJedisClient
            val startupList: List[StartUpLog] = startupLogItr.toList
            for (elem <- startupList) {
              val key: String = "dau:" + elem.logDate
              jedis.sadd(key, elem.mid)
            }
            jedis.close()
            //保存到ES
            MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_DAU, startupList)
          }
    }
    streamingContext.start()
    streamingContext.awaitTermination()

  }

}

4. ES

     綜上 ,在實際環境中,需要一種能夠容納較大規模數據切交互性好的數據庫。mysql雖然交互性好,但是容量擴展性有限。

   hbase雖然能夠支持海量數據,但是查詢的靈活度不足。所以ES在容量及交互性上達到一個非常不錯的平衡,而且還能支持全文檢索。

搭建es集群 https://www.cnblogs.com/shengyang17/p/10597841.html

ES& kibana的啟動腳本: ./ek.sh start 

[kris@hadoop101 gmall]$ cat ek.sh    
#!/bin/bash
es_home=/opt/module/elasticsearch
kibana_home=/opt/module/kibana/
case $1 in
"start"){
        echo "=============啟動ES集群============="
        for i in hadoop101 hadoop102 hadoop103
        do
            ssh $i "source /etc/profile;${es_home}/bin/elasticsearch >/dev/null 2>&1 &"
        done
        echo "=============啟動kibana============="
        nohup ${kibana_home}/bin/kibana >/opt/module/kibana/kibana.log 2>&1 &
};;
"stop"){
        echo "=============關閉kibana============="
        ps -ef | grep ${kibana_home} | grep -v grep | awk '{print $2}'|xargs kill
        echo "=============關閉ES集群============="
        for i in hadoop101 hadoop102 hadoop103
        do
            ssh $i "ps -ef | grep $es_home | grep -v grep | awk '{print \$2}'|xargs kill" >/dev/null 2>&1
        done

};;
esac
View Code

設計es索引結構

case class startup

case class Startup(mid:String,
                   uid:String,
                   appid:String,
                   area:String,
                   os:String,
                   ch:String,
                   logType:String,
                   vs:String,
                   var logDate:String,
                   var logHour:String,
                   var logHourMinute:String,
                   var ts:Long
                  ) {

}
 
View Code

  text 支持分詞; keyword 只能全部內容匹配
保存數據之前一定要先定義好mapping:  每個字段的類型 ; 分清楚索引類型

1、需要索引 也需要分詞:標題,商品名稱,分類名稱,  type:“text”

2、需要索引,但不需要分詞:類型id , 日期,數量 ,年齡 ,各種id, type:"keyword";

    mid, uid,area,os ,ch ,vs,logDate,logHourMinute,ts

3、既不需要索引,也不需要分詞: 不被會用於條件過濾,經過脫敏的字段,138****0101  index:false

##############在ES中創建index
PUT gmall_dau
{
  "mappings": {
    "_doc":{
      "properties":{
         "mid":{
           "type":"keyword" ,
         },
         "uid":{
           "type":"keyword"
         },
         "area":{
           "type":"keyword"
         },
         "os":{
           "type":"keyword"
         },
         "ch":{
           "type":"keyword"
         },
         "vs":{
           "type":"keyword"
         },
         "logDate":{
           "type":"keyword"
         },
         "logHour":{
           "type":"keyword"
         },
         "logHourMinute":{
           "type":"keyword"
         },
         "ts":{
           "type":"long"
         } 
      }
    }
  }
}

 在Kibana中進行查詢

   如果在在保存| 插入數據的時候,沒有先建立mapping的數據結構,則ES是會自動推斷;當你再去聚合aggs時,text的字段是不能進行聚合的(如果想要聚合要加 字段.keyword,如下所示),但是好一點的是ES給保存了兩份,一個是text類型的字段、另外一個是keyword類型的;浪費了空間,在實際生產環境中是不能使用這種方式的;

GET /gmall_dau/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "logDate": "2019-05-04"
        }
      }
    }
  }
}

######groupby操作  聚合aggregation
GET /gmall_dau/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "logDate": "2019-04-30"
        }
      }
    }
  },
  "aggs": {
    "groupby_logHour": {
      "terms": {
        "field": "logHour.keyword",
        "size": 24
      }
    }
  }
}

保存到es中; 關於es java客戶端的選擇,目前市面上有兩類客戶端:

  一類是TransportClient 為代表的ES原生客戶端,不能執行原生dsl語句必須使用它的Java api方法。
  另外一種是以Rest Api為主的missing client,最典型的就是jest。 這種客戶端可以直接使用dsl語句拼成的字符串,直接傳給服務端,然后返回json字符串再解析。
兩種方式各有優劣,但是最近elasticsearch官網,宣布計划在7.0以后的版本中廢除TransportClient。以RestClient為主。

所以在官方的RestClient 基礎上,進行了簡單包裝的Jest客戶端,就成了首選,而且該客戶端也與springboot完美集成。

5. 數據發布接口

詳細見代碼

通過gmall-mock模塊的類JsonMocker發送數據--->nginx路由--->三台虛擬機的gmall-logger的接收數據並轉發給kafka(用的是SpringBoot)--->

 啟動:gmall-publisher--springBoot的主類: com.atguigu.gmall.publisher.GmallPublisherApplication,給chart的接口,啟動

 啟動:gmall--dw-chart---com.demo.DemoApplication的主類; 接接口展示數據的動態變化

 啟動:[kris@hadoop101 ~]$ redis-server myredis/redis.conf 

啟動:gmall-realtime的com.atguigu.gmall.realtime.app.DauApp類,

啟動:gmall-mock模塊的類JsonMocker發送數據

http://127.0.0.1:8070/realtime-total?date=2019-04-30
[{"name":"新增日活","id":"dau","value":761},{"name":"新增設備","id":"new_mid","value":233}]

http://127.0.0.1:8070/realtime-hour?id=dau&&date=2019-05-04
{"yesterday":{},"today":{"20":26,"21":96}}

通過前端頁面展示: http://localhost:8089/index

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM