1. App產生日志數據,發送web請求:
gmall-mock模塊
//啟動日志
upload{"area":"heilongjiang","uid":"2","os":"andriod","ch":"huawei","appid":"gmall","mid":"mid_71","type":"startup","vs":"1.1.2"} //事件日志
upload{"area":"heilongjiang","uid":"2","itemid":24,"npgid":41,"evid":"clickItem","os":"andriod","pgid":32,"appid":"gmall","mid":"mid_71","type":"event"}
2. springboot接收日志落盤並發送給kafka:
gmall-logger模塊--SpringBoot的部署
日志前加一個ts時間戳;org.slf4j.LoggerFactory,slf4j是一個接口,它會去找實現類;LoggeerFactory默認的會在jar包中找實現類;
logging(它是LoggeFactory默認使用的)和log4j是競爭關系,所以要在gmall-logger.pom.xml文件中加入exclusions把logging給排除了
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j</artifactId> <version>1.3.8.RELEASE</version> </dependency> </dependencies>
com.atguigu.gmall.logger.controller.LoggerController @RestController public class LoggerController { @Autowired KafkaTemplate kafkaTemplate; private static final Logger logger = LoggerFactory.getLogger(LoggerController.class); @PostMapping("log") public String doLog(@RequestParam("log") String log){ JSONObject jsonObject = JSON.parseObject(log); jsonObject.put("ts", System.currentTimeMillis()); //System.out.println(log); // 1. 落盤成為日志文件 // log4j logger.info(jsonObject.toJSONString()); //2. 發送kafka if ("startup".equals(jsonObject.getString("type"))){ kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_STARTUP, jsonObject.toJSONString()); }else { kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_EVENT, jsonObject.toJSONString()); } return "success"; } }
利用resources/ log4j.properties進行log日志的落盤:
log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender //怎么寫這個日志;類型--控制台 log4j.appender.atguigu.MyConsole.target=System.err //控制台有兩種:System.out日志顏色黑色和System.err日志是紅色的 log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout //自定義的,除了要打印的日志級別,還要打印什么 log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n //格式,p是日志級別,%m輸出的內容,%n是換行; log4j.appender.atguigu.File=org.apache.log4j.DailyRollingFileAppender //每日滾動文件,每天產生一個文件; log4j.appender.atguigu.File.file=/opt/module/applog/gmall/log/app.log //輸出的文件路徑,linux中的路徑 log4j.appender.atguigu.File.DatePattern='.'yyyy-MM-dd //輸出文件的后綴; 當天的日志是沒有后綴的,一旦過了12點,就有后綴.'yyyy-MM-dd',后綴是日志時間 log4j.appender.atguigu.File.layout=org.apache.log4j.PatternLayout //自定義格式 log4j.appender.atguigu.File.layout.ConversionPattern=%m%n //要干干凈凈的打印信息; log4j.logger.com.atguigu.gmall.logger.controller.LoggerController=info,atguigu.File
//某一個類的路徑,只監控某個類所產生的日志;log4j.rootLogger=error,atguigu.Myconsole表示根底的,除了上邊指定的都是它,首先是精確匹配到info就日志輸出就按照它的打印,它們后邊的.File或.Myconsole都會輸出
日志級別有:級別從低到高 trace、debug、info、warn、error、fatal,如果寫info,從低到高比它高的都可以輸出出來;
在linux中,log4j要有權限才能創建applog/gmall/log/app.log的目錄,也可以提前創建好這個目錄;
在linux中,非root賬號,不能使用1024以下的端口號; Tomcat中 server.port=80
把日志采集模塊打包部署到Linux中
在idea中的maven執行package,把打好的jar包拷貝到Linux 路徑下,啟動jar包:
可臨時指定端口號,后台執行使用 &,控制台日志不輸出使用/dev/null/ 2>&1 輸入日志到黑洞 也可以輸出到指定目錄 >./app.err
java -jar /app/gmall/dw-logger-0.0.1-SNAPSHOT.jar --server.port=8080 >/dev/null 2>&1 &
測試 由windows發送日志數據(gmall-mock/ JsonMocker類)到linux 日志落盤
在三台系統同時部署日志采集系統的jar包,分別把/applog/目錄拷貝到三台虛擬機上
java -jar /opt/module/applog/gmall/gmall-logger-0.0.1-SNAPSHOT.jar --server.port=8080 >./app.error 2>&1 & [kris@hadoop101 log]$ tail -10f app.log //監控文件測試數據是否寫入
3. 搭建nginx
https://www.cnblogs.com/shengyang17/p/10836168.html ,只需一台部署nginx即可;
由windows發送模擬日志,nginx負責路由,日志服務負責接收。
window發送日志------>>niginx路由---> linux中接收日志的jar存儲日志文件並發給kafka--->kafka
更新集群啟動腳本 ,加入nginx操作: ./logger-cluster.sh start 啟動nginx路由,路由三台虛擬機給接收日志服務的jar包 ,並發給fakfa;
logger-cluster.sh

[kris@hadoop101 gmall]$ vim logger-cluster.sh #!/bin/bash JAVA_BIN=/opt/module/jdk1.8.0_144/bin/java PROJECT=gmall APPNAME=gmall-logger-0.0.1-SNAPSHOT.jar SERVER_PORT=8080 case $1 in "start"){ for i in hadoop101 hadoop102 hadoop103 do echo "========啟動日志服務: $i===============" ssh $i "$JAVA_BIN -Xms32m -Xmx64m -jar /opt/module/applog/$PROJECT/$APPNAME --server.port=$SERVER_PORT >./app.error 2>&1 &" done echo "==============啟動NGINX===============" /opt/module/nginx/sbin/nginx };; "stop"){ echo "=============關閉NGINX=================" /opt/module/nginx/sbin/nginx -s stop for i in hadoop101 hadoop102 hadoop103 do echo "========關閉日志服務: $i===============" ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1 done };; esac
ssh之間的互相連接:三種聯通方式: ①source /etc/profile; ②ssh 會讀.bashrc cat /etc/profile>>.brashrc ③$JAVA_BIN
netstart -anp | more 查看端口
當前日志模塊:
4. 日活DAU
搭建實時處理模塊gmall-realtime:
消費kafka;利用redis過濾當日已經計入的日活設備;把每批次新增的當日日活信息保存到ES中;從ES中查出數據,發布成數據接口
消費kafka& 利用redis去重
1、把今日新增的活躍用戶保存到redis中; 2、每條數據經過過濾,去掉redis中的已有的用戶
設計Redis的kv; Key:dau:2019-01-22, value: 設備id
業務類開發
DauApp.scala 消費kafka中數據(通過MyKafkaUtil獲取) --->>利用redis去重---->>保存到ES(通過MyEsUtil工具類)中;
object DauApp { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]") val streamingContext: StreamingContext = new StreamingContext(new SparkContext(conf),Seconds(5)) val inputStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_STARTUP, streamingContext) // 1 把當日已訪問過的用戶保存起來 redis // 2 以當日已訪問用戶清單為依據 ,過濾掉再次訪問的請求 // 轉換case class 補全日期格式 val startupLogDStream: DStream[StartUpLog] = inputStream.map { record => val jsonStr: String = record.value() val startUpLog: StartUpLog = JSON.parseObject(jsonStr, classOf[StartUpLog]) //把日期進行補全 val dateTimeString: String = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(startUpLog.ts)) val dateTimeArray: Array[String] = dateTimeString.split(" ") startUpLog.logDate = dateTimeArray(0) startUpLog.logHour = dateTimeArray(1).split(":")(0) startUpLog.logHourMinute = dateTimeArray(1) startUpLog } // 去重操作 val filterDStream: DStream[StartUpLog] = startupLogDStream.transform { rdd => //driver 每時間間隔執行一次 println("過濾前:" + rdd.count()) val jedis: Jedis = RedisUtil.getJedisClient val curDate: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) val key: String = "dau:" + curDate val dauSet: util.Set[String] = jedis.smembers(key) //當日日活用戶清單 //使用廣播變量 val dauBC: Broadcast[util.Set[String]] = streamingContext.sparkContext.broadcast(dauSet) val filterRDD: RDD[StartUpLog] = rdd.filter { startuplog => !dauBC.value.contains(startuplog.mid) } println("過濾后:" + filterRDD.count()) filterRDD } // 考慮到 新的訪問可能會出現重復 ,所以以mid為key進行去重,每個mid為小組 每組取其中一個 val startupLogGroupDStream: DStream[(String, Iterable[StartUpLog])] = filterDStream.map{startuplog => (startuplog.mid, startuplog)}.groupByKey() val startupLogFilterDistinctDStream: DStream[StartUpLog] = startupLogGroupDStream.flatMap { case (mid, startupLogIter) => val startupLogOneIter: Iterable[StartUpLog] = startupLogIter.take(1) startupLogOneIter } // 1 把當日已訪問過的用戶保存到 redis startupLogFilterDistinctDStream.foreachRDD{rdd => rdd.foreachPartition{startupLogItr => val jedis: Jedis = RedisUtil.getJedisClient val startupList: List[StartUpLog] = startupLogItr.toList for (elem <- startupList) { val key: String = "dau:" + elem.logDate jedis.sadd(key, elem.mid) } jedis.close() //保存到ES MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_DAU, startupList) } } streamingContext.start() streamingContext.awaitTermination() } }
4. ES
綜上 ,在實際環境中,需要一種能夠容納較大規模數據切交互性好的數據庫。mysql雖然交互性好,但是容量擴展性有限。
hbase雖然能夠支持海量數據,但是查詢的靈活度不足。所以ES在容量及交互性上達到一個非常不錯的平衡,而且還能支持全文檢索。
搭建es集群 https://www.cnblogs.com/shengyang17/p/10597841.html
ES& kibana的啟動腳本: ./ek.sh start

[kris@hadoop101 gmall]$ cat ek.sh #!/bin/bash es_home=/opt/module/elasticsearch kibana_home=/opt/module/kibana/ case $1 in "start"){ echo "=============啟動ES集群=============" for i in hadoop101 hadoop102 hadoop103 do ssh $i "source /etc/profile;${es_home}/bin/elasticsearch >/dev/null 2>&1 &" done echo "=============啟動kibana=============" nohup ${kibana_home}/bin/kibana >/opt/module/kibana/kibana.log 2>&1 & };; "stop"){ echo "=============關閉kibana=============" ps -ef | grep ${kibana_home} | grep -v grep | awk '{print $2}'|xargs kill echo "=============關閉ES集群=============" for i in hadoop101 hadoop102 hadoop103 do ssh $i "ps -ef | grep $es_home | grep -v grep | awk '{print \$2}'|xargs kill" >/dev/null 2>&1 done };; esac
設計es索引結構
case class startup

case class Startup(mid:String, uid:String, appid:String, area:String, os:String, ch:String, logType:String, vs:String, var logDate:String, var logHour:String, var logHourMinute:String, var ts:Long ) { }
text 支持分詞; keyword 只能全部內容匹配
保存數據之前一定要先定義好mapping: 每個字段的類型 ; 分清楚索引類型
1、需要索引 也需要分詞:標題,商品名稱,分類名稱, type:“text”
2、需要索引,但不需要分詞:類型id , 日期,數量 ,年齡 ,各種id, type:"keyword";
mid, uid,area,os ,ch ,vs,logDate,logHourMinute,ts
3、既不需要索引,也不需要分詞: 不被會用於條件過濾,經過脫敏的字段,138****0101 index:false
##############在ES中創建index PUT gmall_dau { "mappings": { "_doc":{ "properties":{ "mid":{ "type":"keyword" , }, "uid":{ "type":"keyword" }, "area":{ "type":"keyword" }, "os":{ "type":"keyword" }, "ch":{ "type":"keyword" }, "vs":{ "type":"keyword" }, "logDate":{ "type":"keyword" }, "logHour":{ "type":"keyword" }, "logHourMinute":{ "type":"keyword" }, "ts":{ "type":"long" } } } } }
在Kibana中進行查詢
如果在在保存| 插入數據的時候,沒有先建立mapping的數據結構,則ES是會自動推斷;當你再去聚合aggs時,text的字段是不能進行聚合的(如果想要聚合要加 字段.keyword,如下所示),但是好一點的是ES給保存了兩份,一個是text類型的字段、另外一個是keyword類型的;浪費了空間,在實際生產環境中是不能使用這種方式的;
GET /gmall_dau/_search { "query": { "bool": { "filter": { "term": { "logDate": "2019-05-04" } } } } } ######groupby操作 聚合aggregation GET /gmall_dau/_search { "query": { "bool": { "filter": { "term": { "logDate": "2019-04-30" } } } }, "aggs": { "groupby_logHour": { "terms": { "field": "logHour.keyword", "size": 24 } } } }
保存到es中; 關於es java客戶端的選擇,目前市面上有兩類客戶端:
一類是TransportClient 為代表的ES原生客戶端,不能執行原生dsl語句必須使用它的Java api方法。
另外一種是以Rest Api為主的missing client,最典型的就是jest。 這種客戶端可以直接使用dsl語句拼成的字符串,直接傳給服務端,然后返回json字符串再解析。
兩種方式各有優劣,但是最近elasticsearch官網,宣布計划在7.0以后的版本中廢除TransportClient。以RestClient為主。
所以在官方的RestClient 基礎上,進行了簡單包裝的Jest客戶端,就成了首選,而且該客戶端也與springboot完美集成。
5. 數據發布接口
詳細見代碼
通過gmall-mock模塊的類JsonMocker發送數據--->nginx路由--->三台虛擬機的gmall-logger的接收數據並轉發給kafka(用的是SpringBoot)--->
啟動:gmall-publisher--springBoot的主類: com.atguigu.gmall.publisher.GmallPublisherApplication,給chart的接口,啟動
啟動:gmall--dw-chart---com.demo.DemoApplication的主類; 接接口展示數據的動態變化
啟動:[kris@hadoop101 ~]$ redis-server myredis/redis.conf
啟動:gmall-realtime的com.atguigu.gmall.realtime.app.DauApp類,
啟動:gmall-mock模塊的類JsonMocker發送數據
http://127.0.0.1:8070/realtime-total?date=2019-04-30
[{"name":"新增日活","id":"dau","value":761},{"name":"新增設備","id":"new_mid","value":233}]
http://127.0.0.1:8070/realtime-hour?id=dau&&date=2019-05-04
{"yesterday":{},"today":{"20":26,"21":96}}
通過前端頁面展示: http://localhost:8089/index