大數據項目之電商數倉一(用戶行為采集)


一、數據倉庫概念

數據倉庫(Data Warehouse)

  是為企業所有決策制定過程,提供所有系統數據支持的戰略集合。

二、項目需求及架構設計

2.1 項目需求分析

  1、項目需求

   1)用戶行為數據采集平台搭建

   2)業務數據采集平台搭建

   3)數據倉庫維度建模

      4)分析:用戶、流量、會員、商品、銷售、地區、活動等電商核心主題,統計的報表指標近100。

      5)采用即席查詢工具,隨時進行指標分析

      6)對集群性能進行監控,發生異常需要報警

    7)元數據管理

      8)質量監控

2.2 項目框架

2.2.1 技術選型

技術選型主要需要考慮的因素:數據量大小、業務需求、行業內經驗、技術成熟度、開發維護成本、總成本預算

  數據采集傳輸:FlumeKafkaSqoop、Logstash、DataX、

  數據存儲:MysqlHDFS、HBase、Redis、MongoDB

  數據計算:HiveTezSpark、Flink、Storm

  數據查詢:PrestoDruid、Impala、Kylin

  數據可視化:Echarts、Superset、QuickBI、DataV

  任務調度:Azkaban、Oozie

  集群監控:Zabbix

  元數據管理:Atlas

  數據質量監控:Griffin

2.2.2 系統數據流程設計

2.2.3 框架版本選型

2.2.4 服務器選型

  服務器是選擇物理機還是雲主機?

1)物理機:

  128G內存,20核物理CPU,40線程,8THDD和2TSSD硬盤,戴爾品牌單台報價4萬出頭。一般物理機壽命5年左右

2)雲主機:

  以阿里雲為例,和上面大致相同配置,每年5萬

2.2.5 集群資源規划設計

1、集群規模

1)如何確認集群規模?(按每台服務器8T磁盤,128G內存)

(1)按每天日活躍用戶100萬,每人一天平均100條:100萬*100條 = 1億條

(2)每條日志1K左右,每天1億條:100000000 / 1024 /1024 = 約100G

(3)半年內不擴容服務器來算:100G * 180 天 = 約18T

(4)保存3個副本:18T * 3 = 54T

(5)預留20%~30%Buffer=54T/0.7=77T

(6)需要約8T*10台服務器

2)如果要考慮數倉分層?數據采用壓縮?需要重新計算

2、集群服務器規划

服務名稱

子服務

服務器

hadoop102

服務器

hadoop103

服務器

hadoop104

HDFS

NameNode

 

 

DataNode

SecondaryNameNode

 

 

Yarn

NodeManager

Resourcemanager

 

 

Zookeeper

Zookeeper Server

Flume(采集日志)

Flume

 

Kafka

Kafka

Flume(消費Kafka)

Flume

 

 

Hive

Hive

 

 

MySQL

MySQL

 

 

Sqoop

Sqoop

 

 

Presto

Coordinator

 

 

Worker

 

Azkaban

AzkabanWebServer

 

 

AzkabanExecutorServer

 

 

Druid

Druid

Kylin

 

 

 

Hbase

HMaster

 

 

HRegionServer

Superset

 

 

 

Atlas

 

 

 

Solr

Jar

 

 

Griffin

 

 

 

服務數總計

 

19

9

9

三、數據生成模塊

3.1 埋點數據基本格式

公共字段:基本所有安卓手機都包含的字段

業務字段:埋點上報的字段,有具體的業務類型

下面就是一個示例,表示業務字段的上傳。

{

"ap":"xxxxx",//項目數據來源 app pc

"cm": {  //公共字段

      "mid": "",  // (String) 設備唯一標識

        "uid": "",  // (String) 用戶標識

        "vc": "1",  // (String) versionCode,程序版本號

        "vn": "1.0",  // (String) versionName,程序版本名

        "l": "zh",  // (String) language系統語言

        "sr": "",  // (String) 渠道號,應用從哪個渠道來的

        "os": "7.1.1",  // (String) Android系統版本

        "ar": "CN",  // (String) area區域

        "md": "BBB100-1",  // (String) model手機型號

        "ba": "blackberry",  // (String) brand手機品牌

        "sv": "V2.2.1",  // (String) sdkVersion

        "g": "",  // (String) gmail

        "hw": "1620x1080",  // (String) heightXwidth,屏幕寬高

        "t": "1506047606608",  // (String) 客戶端日志產生時的時間

        "nw": "WIFI",  // (String) 網絡模式

        "ln": 0,  // (double) lng經度

        "la": 0  // (double) lat 緯度

    },

"et":  [  //事件

            {

                "ett": "1506047605364",  //客戶端事件產生時間

                "en": "display",  //事件名稱

                "kv": {  //事件結果,以key-value形式自行定義

                    "goodsid": "236",

                    "action": "1",

                    "extend1": "1",

"place": "2",

"category": "75"

                }

            }

        ]

}

示例日志(服務器時間戳 | 日志):

1540934156385|{

    "ap": "gmall",

    "cm": {

        "uid": "1234",

        "vc": "2",

        "vn": "1.0",

        "la": "EN",

        "sr": "",

        "os": "7.1.1",

        "ar": "CN",

        "md": "BBB100-1",

        "ba": "blackberry",

        "sv": "V2.2.1",

        "g": "abc@gmail.com",

        "hw": "1620x1080",

        "t": "1506047606608",

        "nw": "WIFI",

        "ln": 0

    },

        "et": [

            {

                "ett": "1506047605364",  //客戶端事件產生時間

                "en": "display",  //事件名稱

                "kv": {  //事件結果,以key-value形式自行定義

                    "goodsid": "236",

                    "action": "1",

                    "extend1": "1",

"place": "2",

"category": "75"

                }

            },{

              "ett": "1552352626835",

              "en": "active_background",

              "kv": {

                   "active_source": "1"

              }

           }

        ]

    }

}

下面是各個埋點日志格式。其中商品點擊屬於信息流的范疇

3.2 事件日志數

 

 

 

3.2.1 商品列表頁(loading)

事件名稱:loading

標簽

含義

action

動作:開始加載=1,加載成功=2,加載失敗=3

loading_time

加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)

loading_way

加載類型:1-讀取緩存,2-從接口拉新數據
(加載成功才上報加載類型)

extend1

擴展字段 Extend1

extend2

擴展字段 Extend2

type

加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載)

type1

加載失敗碼:把加載失敗狀態碼報回來(報空為加載成功,沒有失敗)

 

3.2.2 商品點擊(display)

事件標簽:display

標簽

含義

action

動作:曝光商品=1,點擊商品=2,

goodsid

商品ID(服務端下發的ID)

place

順序(第幾條商品,第一條為0,第二條為1,如此類推)

extend1

曝光類型:1 - 首次曝光 2-重復曝光

category

分類ID(服務端定義的分類ID)

     

 

3.2.3 商品詳情頁(newsdetail)

事件標簽:newsdetail

標簽

含義

entry

頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3

action

動作:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4

goodsid

商品ID(服務端下發的ID)

show_style

商品樣式:0、無圖、1、一張大圖、2、兩張圖、3、三張小圖、4、一張小圖、5、一張大圖兩張小圖

news_staytime

頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途划出的時間超過10分鍾,則本次計時作廢,不上報本次數據。如未加載成功退出,則報空。

loading_time

加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間)

type1

加載失敗碼:把加載失敗狀態碼報回來(報空為加載成功,沒有失敗)

category

分類ID(服務端定義的分類ID)

     

 

3.2.4 廣告(ad)

事件名稱:ad

標簽

含義

entry

入口:商品列表頁=1  應用首頁=2 商品詳情頁=3

action

動作: 廣告展示=1 廣告點擊=2

contentType

Type: 1 商品 2 營銷活動  

displayMills

展示時長 毫秒數

itemId

商品id

activityId

營銷活動id

 

3.2.5 消息通知(notification)

事件標簽:notification

標簽

含義

action

動作:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展示(不重復上報,一天之內只報一次)=4

type

通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4

ap_time

客戶端彈出時間

content

備用字段

3.2.6 用戶后台活躍(active_background)

事件標簽: active_background

標簽

含義

active_source

1=upgrade,2=download(下載),3=plugin_upgrade

3.2.7 評論(comment)

描述:評論表

序號

字段名稱

字段描述

字段類型

長度

允許空

缺省值

1

comment_id

評論表

int

10,0

 

 

2

userid

用戶id

int

10,0

0

3

p_comment_id

父級評論id(為0則是一級評論,不為0則是回復)

int

10,0

 

4

content

評論內容

string

1000

 

5

addtime

創建時間

string

 

 

6

other_id

評論的相關id

int

10,0

 

7

praise_count

點贊數量

int

10,0

0

8

reply_count

回復數量

int

10,0

0

 

3.2.8 收藏(favorites)

描述:收藏

序號

字段名稱

字段描述

字段類型

長度

允許空

缺省值

1

id

主鍵

int

10,0

 

 

2

course_id

商品id

int

10,0

0

3

userid

用戶ID

int

10,0

0

4

add_time

創建時間

string

 

 

3.2.9 點贊(praise)

描述:所有的點贊表

序號

字段名稱

字段描述

字段類型

長度

允許空

缺省值

1

id

主鍵id

int

10,0

 

 

2

userid

用戶id

int

10,0

 

3

target_id

點贊的對象id

int

10,0

 

4

type

點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊

int

10,0

 

5

add_time

添加時間

string

 

 

 

3.2.10 錯誤日志

 

errorBrief

錯誤摘要

errorDetail

錯誤詳情

3.3 啟動日志數據

事件標簽: start

標簽

含義

entry

入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5

open_ad_type

開屏廣告類型:  開屏原生廣告=1, 開屏插屏廣告=2

action

狀態:成功=1  失敗=2

loading_time

加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)

detail

失敗碼(沒有則上報空)

extend1

失敗的message(沒有則上報空)

en

日志類型start

 

{
    "action":"1",
    "ar":"MX",
    "ba":"HTC",
    "detail":"",
    "en":"start",
    "entry":"2",
    "extend1":"",
    "g":"43R2SEQX@gmail.com",
    "hw":"640*960",
    "l":"en",
    "la":"20.4",
    "ln":"-99.3",
    "loading_time":"2",
    "md":"HTC-2",
    "mid":"995",
    "nw":"4G",
    "open_ad_type":"2",
    "os":"8.1.2",
    "sr":"B",
    "sv":"V2.0.6",
    "t":"1561472502444",
    "uid":"995",
    "vc":"10",
    "vn":"1.3.4"
}

3.4 數據生成腳本

3.1.1 創建Mavne工程

1)創建 log-collector

GroupId : com.test

Project name : log-collector

2)創建一個包名:com.test.appclient

3)在com.test.appclient包下創建一個類,AppMain。

4)在pom.xml文件中添加如下內容

 

<!--版本號統一-->
<properties>
    <slf4j.version>1.7.20</slf4j.version>
    <logback.version>1.0.7</logback.version>
</properties>

<dependencies>
    <!--阿里巴巴開源json解析框架-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.51</version>
    </dependency>

    <!--日志生成框架-->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>${logback.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>
</dependencies>

<!--編譯打包插件-->
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin </artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.appclient.AppMain</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:com.test.appclient.AppMain要和自己建的全類名一致。

3.1.2 公共字段Bean

1)創建包名:com.test.bean

2)在com.test.bean包下依次創建如下bean對象

package com.test.bean;
/**
 * 公共日志
 */
public class AppBase{

    private String mid; // (String) 設備唯一標識
    private String uid; // (String) 用戶uid
    private String vc;  // (String) versionCode,程序版本號
    private String vn;  // (String) versionName,程序版本名
    private String l;   // (String) 系統語言
    private String sr;  // (String) 渠道號,應用從哪個渠道來的。
    private String os;  // (String) Android系統版本
    private String ar;  // (String) 區域
    private String md;  // (String) 手機型號
    private String ba;  // (String) 手機品牌
    private String sv;  // (String) sdkVersion
    private String g;   // (String) gmail
    private String hw;  // (String) heightXwidth,屏幕寬高
    private String t;   // (String) 客戶端日志產生時的時間
    private String nw;  // (String) 網絡模式
    private String ln;  // (double) lng經度
    private String la;  // (double) lat 緯度

    public String getMid() {
        return mid;
    }

    public void setMid(String mid) {
        this.mid = mid;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getVc() {
        return vc;
    }

    public void setVc(String vc) {
        this.vc = vc;
    }

    public String getVn() {
        return vn;
    }

    public void setVn(String vn) {
        this.vn = vn;
    }

    public String getL() {
        return l;
    }

    public void setL(String l) {
        this.l = l;
    }

    public String getSr() {
        return sr;
    }

    public void setSr(String sr) {
        this.sr = sr;
    }

    public String getOs() {
        return os;
    }

    public void setOs(String os) {
        this.os = os;
    }

    public String getAr() {
        return ar;
    }

    public void setAr(String ar) {
        this.ar = ar;
    }

    public String getMd() {
        return md;
    }

    public void setMd(String md) {
        this.md = md;
    }

    public String getBa() {
        return ba;
    }

    public void setBa(String ba) {
        this.ba = ba;
    }

    public String getSv() {
        return sv;
    }

    public void setSv(String sv) {
        this.sv = sv;
    }

    public String getG() {
        return g;
    }

    public void setG(String g) {
        this.g = g;
    }

    public String getHw() {
        return hw;
    }

    public void setHw(String hw) {
        this.hw = hw;
    }

    public String getT() {
        return t;
    }

    public void setT(String t) {
        this.t = t;
    }

    public String getNw() {
        return nw;
    }

    public void setNw(String nw) {
        this.nw = nw;
    }

    public String getLn() {
        return ln;
    }

    public void setLn(String ln) {
        this.ln = ln;
    }

    public String getLa() {
        return la;
    }

    public void setLa(String la) {
        this.la = la;
    }
}

3.1.3 啟動日志Bean

package com.test.bean;
/**
 * 啟動日志
 */
public class AppStart extends AppBase {

    private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5
    private String open_ad_type;//開屏廣告類型:  開屏原生廣告=1, 開屏插屏廣告=2
    private String action;//狀態:成功=1  失敗=2
    private String loading_time;//加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)
    private String detail;//失敗碼(沒有則上報空)
    private String extend1;//失敗的message(沒有則上報空)
    private String en;//啟動日志類型標記

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getOpen_ad_type() {
        return open_ad_type;
    }

    public void setOpen_ad_type(String open_ad_type) {
        this.open_ad_type = open_ad_type;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getDetail() {
        return detail;
    }

    public void setDetail(String detail) {
        this.detail = detail;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getEn() {
        return en;
    }

    public void setEn(String en) {
        this.en = en;
    }
}

3.1.4 錯誤日志Bean

package com.test.bean;
/**
 * 錯誤日志
 */
public class AppErrorLog {

    private String errorBrief;    //錯誤摘要
    private String errorDetail;   //錯誤詳情

    public String getErrorBrief() {
        return errorBrief;
    }

    public void setErrorBrief(String errorBrief) {
        this.errorBrief = errorBrief;
    }

    public String getErrorDetail() {
        return errorDetail;
    }

    public void setErrorDetail(String errorDetail) {
        this.errorDetail = errorDetail;
    }
}

3.1.5 事件日志Bean之商品詳情

package com.test.bean;
/**
 * 商品詳情
 */
public class AppNewsDetail {

    private String entry;//頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3
    private String action;//動作:開始加載=1,加載成功=2(pv),加載失敗=3, 退出頁面=4
    private String goodsid;//商品ID(服務端下發的ID)
    private String showtype;//商品樣式:0、無圖1、一張大圖2、兩張圖3、三張小圖4、一張小圖5、一張大圖兩張小圖    來源於詳情頁相關推薦的商品,上報樣式都為0(因為都是左文右圖)
    private String news_staytime;//頁面停留時長:從商品開始加載時開始計算,到用戶關閉頁面所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途划出的時間超過10分鍾,則本次計時作廢,不上報本次數據。如未加載成功退出,則報空。
    private String loading_time;//加載時長:計算頁面開始加載到接口返回數據的時間 (開始加載報0,加載成功或加載失敗才上報時間)
    private String type1;//加載失敗碼:把加載失敗狀態碼報回來(報空為加載成功,沒有失敗)
    private String category;//分類ID(服務端定義的分類ID)

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getGoodsid() {
        return goodsid;
    }

    public void setGoodsid(String goodsid) {
        this.goodsid = goodsid;
    }

    public String getShowtype() {
        return showtype;
    }

    public void setShowtype(String showtype) {
        this.showtype = showtype;
    }

    public String getNews_staytime() {
        return news_staytime;
    }

    public void setNews_staytime(String news_staytime) {
        this.news_staytime = news_staytime;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getType1() {
        return type1;
    }

    public void setType1(String type1) {
        this.type1 = type1;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

3.1.6 事件日志Bean之商品列表

package com.test.bean;
/**
 * 商品列表
 */
public class AppLoading {
    private String action;//動作:開始加載=1,加載成功=2,加載失敗=3
    private String loading_time;//加載時長:計算下拉開始到接口返回數據的時間,(開始加載報0,加載成功或加載失敗才上報時間)
    private String loading_way;//加載類型:1-讀取緩存,2-從接口拉新數據   (加載成功才上報加載類型)
    private String extend1;//擴展字段 Extend1
    private String extend2;//擴展字段 Extend2
    private String type;//加載類型:自動加載=1,用戶下拽加載=2,底部加載=3(底部條觸發點擊底部提示條/點擊返回頂部加載)
    private String type1;//加載失敗碼:把加載失敗狀態碼報回來(報空為加載成功,沒有失敗)

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getLoading_way() {
        return loading_way;
    }

    public void setLoading_way(String loading_way) {
        this.loading_way = loading_way;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getExtend2() {
        return extend2;
    }

    public void setExtend2(String extend2) {
        this.extend2 = extend2;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getType1() {
        return type1;
    }

    public void setType1(String type1) {
        this.type1 = type1;
    }
}

3.1.7 事件日志Bean之廣告

package com.test.bean;
/**
 * 廣告
 */
public class AppAd {

    private String entry;//入口:商品列表頁=1  應用首頁=2 商品詳情頁=3
    private String action;//動作: 廣告展示=1 廣告點擊=2
    private String contentType;//Type: 1 商品 2 營銷活動
    private String displayMills;//展示時長 毫秒數
    private String itemId; //商品id
    private String activityId; //營銷活動id


    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getActivityId() {
        return activityId;
    }

    public void setActivityId(String activityId) {
        this.activityId = activityId;
    }

    public String getContentType() {
        return contentType;
    }

    public void setContentType(String contentType) {
        this.contentType = contentType;
    }

    public String getDisplayMills() {
        return displayMills;
    }

    public void setDisplayMills(String displayMills) {
        this.displayMills = displayMills;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }
}

3.1.8 事件日志Bean之商品點擊

package com.test.bean;
/**
 * 商品點擊日志
 */
public class AppDisplay {

    private String action;//動作:曝光商品=1,點擊商品=2,
    private String goodsid;//商品ID(服務端下發的ID)
    private String place;//順序(第幾條商品,第一條為0,第二條為1,如此類推)
    private String extend1;//曝光類型:1 - 首次曝光 2-重復曝光(沒有使用)
    private String category;//分類ID(服務端定義的分類ID)

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getGoodsid() {
        return goodsid;
    }

    public void setGoodsid(String goodsid) {
        this.goodsid = goodsid;
    }

    public String getPlace() {
        return place;
    }

    public void setPlace(String place) {
        this.place = place;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

3.1.9 事件日志Bean之消息通知

package com.test.bean;
/**
 * 消息通知日志
 */
public class AppNotification {
    private String action;//動作:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展示(不重復上報,一天之內只報一次)=4
    private String type;//通知id:預警通知=1,天氣預報(早=2,晚=3),常駐=4
    private String ap_time;//客戶端彈出時間
    private String content;//備用字段

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getAp_time() {
        return ap_time;
    }

    public void setAp_time(String ap_time) {
        this.ap_time = ap_time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

3.1.10 事件日志Bean之用戶后台活躍

package com.test.bean;
/**
 * 用戶后台活躍
 */
public class AppActive_background {
    private String active_source;//1=upgrade,2=download(下載),3=plugin_upgrade

    public String getActive_source() {
        return active_source;
    }

    public void setActive_source(String active_source) {
        this.active_source = active_source;
    }
}

3.1.11 事件日志Bean之用戶評論

package com.test.bean;
/**
 * 評論
 */
public class AppComment {

    private int comment_id;//評論表
    private int userid;//用戶id
    private  int p_comment_id;//父級評論id(為0則是一級評論,不為0則是回復)
    private String content;//評論內容
    private String addtime;//創建時間
    private int other_id;//評論的相關id
    private int praise_count;//點贊數量
    private int reply_count;//回復數量

    public int getComment_id() {
        return comment_id;
    }

    public void setComment_id(int comment_id) {
        this.comment_id = comment_id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public int getP_comment_id() {
        return p_comment_id;
    }

    public void setP_comment_id(int p_comment_id) {
        this.p_comment_id = p_comment_id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getAddtime() {
        return addtime;
    }

    public void setAddtime(String addtime) {
        this.addtime = addtime;
    }

    public int getOther_id() {
        return other_id;
    }

    public void setOther_id(int other_id) {
        this.other_id = other_id;
    }

    public int getPraise_count() {
        return praise_count;
    }

    public void setPraise_count(int praise_count) {
        this.praise_count = praise_count;
    }

    public int getReply_count() {
        return reply_count;
    }

    public void setReply_count(int reply_count) {
        this.reply_count = reply_count;
    }
}

3.1.12 事件日志Bean之用戶收藏

package com.test.bean;
/**
 * 收藏
 */
public class AppFavorites {
    private int id;//主鍵
    private int course_id;//商品id
    private int userid;//用戶ID
    private String add_time;//創建時間

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getCourse_id() {
        return course_id;
    }

    public void setCourse_id(int course_id) {
        this.course_id = course_id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public String getAdd_time() {
        return add_time;
    }

    public void setAdd_time(String add_time) {
        this.add_time = add_time;
    }
}

3.1.13 事件日志Bean之用戶點贊

package com.test.bean;
/**
 * 點贊
 */
public class AppPraise {
    private int id; //主鍵id
    private int userid;//用戶id
    private int target_id;//點贊的對象id
    private int type;//點贊類型 1問答點贊 2問答評論點贊 3 文章點贊數4 評論點贊
    private String add_time;//添加時間

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public int getTarget_id() {
        return target_id;
    }

    public void setTarget_id(int target_id) {
        this.target_id = target_id;
    }

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public String getAdd_time() {
        return add_time;
    }

    public void setAdd_time(String add_time) {
        this.add_time = add_time;
    }
}

3.1.14 主函數

在AppMain類中添加如下內容:

package com.test.appclient;

import java.io.UnsupportedEncodingException;
import java.util.Random;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.test.bean.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 日志行為數據模擬
 */
public class AppMain {

    private final static Logger logger = LoggerFactory.getLogger(AppMain.class);
    private static Random rand = new Random();

    // 設備id
    private static int s_mid = 0;

    // 用戶id
    private static int s_uid = 0;

    // 商品id
    private static int s_goodsid = 0;

    public static void main(String[] args) {

        // 參數一:控制發送每條的延時時間,默認是0
        Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

        // 參數二:循環遍歷次數
        int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

        // 生成數據
        generateLog(delay, loop_len);
    }

    private static void generateLog(Long delay, int loop_len) {

        for (int i = 0; i < loop_len; i++) {

            int flag = rand.nextInt(2);

            switch (flag) {
                case (0):
                    //應用啟動
                    AppStart appStart = generateStart();
                    String jsonString = JSON.toJSONString(appStart);

                    //控制台打印
                    logger.info(jsonString);
                    break;

                case (1):

                    JSONObject json = new JSONObject();

                    json.put("ap", "app");
                    json.put("cm", generateComFields());

                    JSONArray eventsArray = new JSONArray();

                    // 事件日志
                    // 商品點擊,展示
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateDisplay());
                        json.put("et", eventsArray);
                    }

                    // 商品詳情頁
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewsDetail());
                        json.put("et", eventsArray);
                    }

                    // 商品列表頁
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewList());
                        json.put("et", eventsArray);
                    }

                    // 廣告
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateAd());
                        json.put("et", eventsArray);
                    }

                    // 消息通知
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNotification());
                        json.put("et", eventsArray);
                    }

                    // 用戶后台活躍
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateBackground());
                        json.put("et", eventsArray);
                    }

                    //故障日志
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateError());
                        json.put("et", eventsArray);
                    }

                    // 用戶評論
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateComment());
                        json.put("et", eventsArray);
                    }

                    // 用戶收藏
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateFavorites());
                        json.put("et", eventsArray);
                    }

                    // 用戶點贊
                    if (rand.nextBoolean()) {
                        eventsArray.add(generatePraise());
                        json.put("et", eventsArray);
                    }

                    //時間
                    long millis = System.currentTimeMillis();

                    //控制台打印
                    logger.info(millis + "|" + json.toJSONString());
                    break;
            }

            // 延遲
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 公共字段設置
     */
    private static JSONObject generateComFields() {

        AppBase appBase = new AppBase();

        //設備id
        appBase.setMid(s_mid + "");
        s_mid++;

        // 用戶id
        appBase.setUid(s_uid + "");
        s_uid++;

        // 程序版本號 5,6等
        appBase.setVc("" + rand.nextInt(20));

        //程序版本名 v1.1.1
        appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // 安卓系統版本
        appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        // 語言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appBase.setL("es");
                break;
            case (1):
                appBase.setL("en");
                break;
            case (2):
                appBase.setL("pt");
                break;
        }

        // 渠道號   從哪個渠道來的
        appBase.setSr(getRandomChar(1));

        // 區域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appBase.setAr("BR");
            case 1:
                appBase.setAr("MX");
        }

        // 手機品牌 ba ,手機型號 md,就取2位數字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setBa("Sumsung");
                appBase.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appBase.setBa("Huawei");
                appBase.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appBase.setBa("HTC");
                appBase.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appBase.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 屏幕寬高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appBase.setHw("640*960");
                break;
            case 1:
                appBase.setHw("640*1136");
                break;
            case 2:
                appBase.setHw("750*1134");
                break;
            case 3:
                appBase.setHw("1080*1920");
                break;
        }

        // 客戶端產生日志時間
        long millis = System.currentTimeMillis();
        appBase.setT("" + (millis - rand.nextInt(99999999)));

        // 手機網絡模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setNw("3G");
                break;
            case 1:
                appBase.setNw("4G");
                break;
            case 2:
                appBase.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
        // 經度
        appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 緯度
        appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        return (JSONObject) JSON.toJSON(appBase);
    }

    /**
     * 商品展示事件
     */
    private static JSONObject generateDisplay() {

        AppDisplay appDisplay = new AppDisplay();

        boolean boolFlag = rand.nextInt(10) < 7;

        // 動作:曝光商品=1,點擊商品=2,
        if (boolFlag) {
            appDisplay.setAction("1");
        } else {
            appDisplay.setAction("2");
        }

        // 商品id
        String goodsId = s_goodsid + "";
        s_goodsid++;

        appDisplay.setGoodsid(goodsId);

        // 順序  設置成6條吧
        int flag = rand.nextInt(6);
        appDisplay.setPlace("" + flag);

        // 曝光類型
        flag = 1 + rand.nextInt(2);
        appDisplay.setExtend1("" + flag);

        // 分類
        flag = 1 + rand.nextInt(100);
        appDisplay.setCategory("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay);

        return packEventJson("display", jsonObject);
    }

    /**
     * 商品詳情頁
     */
    private static JSONObject generateNewsDetail() {

        AppNewsDetail appNewsDetail = new AppNewsDetail();

        // 頁面入口來源
        int flag = 1 + rand.nextInt(3);
        appNewsDetail.setEntry(flag + "");

        // 動作
        appNewsDetail.setAction("" + (rand.nextInt(4) + 1));

        // 商品id
        appNewsDetail.setGoodsid(s_goodsid + "");

        // 商品來源類型
        flag = 1 + rand.nextInt(3);
        appNewsDetail.setShowtype(flag + "");

        // 商品樣式
        flag = rand.nextInt(6);
        appNewsDetail.setShowtype("" + flag);

        // 頁面停留時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setNews_staytime(flag + "");

        // 加載時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setLoading_time(flag + "");

        // 加載失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appNewsDetail.setType1("102");
                break;
            case 2:
                appNewsDetail.setType1("201");
                break;
            case 3:
                appNewsDetail.setType1("325");
                break;
            case 4:
                appNewsDetail.setType1("433");
                break;
            case 5:
                appNewsDetail.setType1("542");
                break;
            default:
                appNewsDetail.setType1("");
                break;
        }

        // 分類
        flag = 1 + rand.nextInt(100);
        appNewsDetail.setCategory("" + flag);

        JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail);

        return packEventJson("newsdetail", eventJson);
    }

    /**
     * 商品列表
     */
    private static JSONObject generateNewList() {

        AppLoading appLoading = new AppLoading();

        // 動作
        int flag = rand.nextInt(3) + 1;
        appLoading.setAction(flag + "");

        // 加載時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appLoading.setLoading_time(flag + "");

        // 失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appLoading.setType1("102");
                break;
            case 2:
                appLoading.setType1("201");
                break;
            case 3:
                appLoading.setType1("325");
                break;
            case 4:
                appLoading.setType1("433");
                break;
            case 5:
                appLoading.setType1("542");
                break;
            default:
                appLoading.setType1("");
                break;
        }

        // 頁面  加載類型
        flag = 1 + rand.nextInt(2);
        appLoading.setLoading_way("" + flag);

        // 擴展字段1
        appLoading.setExtend1("");

        // 擴展字段2
        appLoading.setExtend2("");

        // 用戶加載類型
        flag = 1 + rand.nextInt(3);
        appLoading.setType("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading);

        return packEventJson("loading", jsonObject);
    }

    /**
     * 廣告相關字段
     */
    private static JSONObject generateAd() {

        AppAd appAd = new AppAd();

        // 入口
        int flag = rand.nextInt(3) + 1;
        appAd.setEntry(flag + "");

        // 動作
        flag = rand.nextInt(5) + 1;
        appAd.setAction(flag + "");

        // 內容類型類型
        flag = rand.nextInt(6)+1;
        appAd.setContentType(flag+ "");

        // 展示樣式
        flag = rand.nextInt(120000)+1000;
        appAd.setDisplayMills(flag+"");

        flag=rand.nextInt(1);
        if(flag==1){
            appAd.setContentType(flag+"");
            flag =rand.nextInt(6);
            appAd.setItemId(flag+ "");
        }else{
            appAd.setContentType(flag+"");
            flag =rand.nextInt(1)+1;
            appAd.setActivityId(flag+ "");
        }

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd);

        return packEventJson("ad", jsonObject);
    }

    /**
     * 啟動日志
     */
    private static AppStart generateStart() {

        AppStart appStart = new AppStart();

        //設備id
        appStart.setMid(s_mid + "");
        s_mid++;

        // 用戶id
        appStart.setUid(s_uid + "");
        s_uid++;

        // 程序版本號 5,6等
        appStart.setVc("" + rand.nextInt(20));

        //程序版本名 v1.1.1
        appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // 安卓系統版本
        appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        //設置日志類型
        appStart.setEn("start");

        //    語言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appStart.setL("es");
                break;
            case (1):
                appStart.setL("en");
                break;
            case (2):
                appStart.setL("pt");
                break;
        }

        // 渠道號   從哪個渠道來的
        appStart.setSr(getRandomChar(1));

        // 區域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appStart.setAr("BR");
            case 1:
                appStart.setAr("MX");
        }

        // 手機品牌 ba ,手機型號 md,就取2位數字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setBa("Sumsung");
                appStart.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appStart.setBa("Huawei");
                appStart.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appStart.setBa("HTC");
                appStart.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appStart.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 屏幕寬高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appStart.setHw("640*960");
                break;
            case 1:
                appStart.setHw("640*1136");
                break;
            case 2:
                appStart.setHw("750*1134");
                break;
            case 3:
                appStart.setHw("1080*1920");
                break;
        }

        // 客戶端產生日志時間
        long millis = System.currentTimeMillis();
        appStart.setT("" + (millis - rand.nextInt(99999999)));

        // 手機網絡模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setNw("3G");
                break;
            case 1:
                appStart.setNw("4G");
                break;
            case 2:
                appStart.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
        // 經度
        appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 緯度
        appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        // 入口
        flag = rand.nextInt(5) + 1;
        appStart.setEntry(flag + "");

        // 開屏廣告類型
        flag = rand.nextInt(2) + 1;
        appStart.setOpen_ad_type(flag + "");

        // 狀態
        flag = rand.nextInt(10) > 8 ? 2 : 1;
        appStart.setAction(flag + "");

        // 加載時長
        appStart.setLoading_time(rand.nextInt(20) + "");

        // 失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appStart.setDetail("102");
                break;
            case 2:
                appStart.setDetail("201");
                break;
            case 3:
                appStart.setDetail("325");
                break;
            case 4:
                appStart.setDetail("433");
                break;
            case 5:
                appStart.setDetail("542");
                break;
            default:
                appStart.setDetail("");
                break;
        }

        // 擴展字段
        appStart.setExtend1("");

        return appStart;
    }
    /**
     * 消息通知
     */
    private static JSONObject generateNotification() {

        AppNotification appNotification = new AppNotification();

        int flag = rand.nextInt(4) + 1;

        // 動作
        appNotification.setAction(flag + "");

        // 通知id
        flag = rand.nextInt(4) + 1;
        appNotification.setType(flag + "");

        // 客戶端彈時間
        appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        // 備用字段
        appNotification.setContent("");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification);

        return packEventJson("notification", jsonObject);
    }

    /**
     * 后台活躍
     */
    private static JSONObject generateBackground() {

        AppActive_background appActive_background = new AppActive_background();

        // 啟動源
        int flag = rand.nextInt(3) + 1;
        appActive_background.setActive_source(flag + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background);

        return packEventJson("active_background", jsonObject);
    }

    /**
     * 錯誤日志數據
     */
    private static JSONObject generateError() {

        AppErrorLog appErrorLog = new AppErrorLog();

        String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"};        //錯誤摘要
        String[] errorDetails = {"java.lang.NullPointerException\\n    " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"};        //錯誤詳情

        //錯誤摘要
        appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]);
        //錯誤詳情
        appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog);

        return packEventJson("error", jsonObject);
    }

    /**
     * 為各個事件類型的公共字段(時間、事件類型、Json數據)拼接
     */
    private static JSONObject packEventJson(String eventName, JSONObject jsonObject) {

        JSONObject eventJson = new JSONObject();

        eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + "");
        eventJson.put("en", eventName);
        eventJson.put("kv", jsonObject);

        return eventJson;
    }

    /**
     * 獲取隨機字母組合
     *
     * @param length 字符串長度
     */
    private static String getRandomChar(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {
            // 字符串
            str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
        }

        return str.toString();
    }

    /**
     * 獲取隨機字母數字組合
     * @param length 字符串長度
     */
    private static String getRandomCharAndNumr(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {

            boolean b = random.nextBoolean();

            if (b) { // 字符串
                // int choice = random.nextBoolean() ? 65 : 97; 取得65大寫字母還是97小寫字母
                str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
            } else { // 數字
                str.append(String.valueOf(random.nextInt(10)));
            }
        }

        return str.toString();
    }

    /**
     * 收藏
     */
    private static JSONObject generateFavorites() {

        AppFavorites favorites = new AppFavorites();

        favorites.setCourse_id(rand.nextInt(10));
        favorites.setUserid(rand.nextInt(10));
        favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites);

        return packEventJson("favorites", jsonObject);
    }

    /**
     * 點贊
     */
    private static JSONObject generatePraise() {

        AppPraise praise = new AppPraise();

        praise.setId(rand.nextInt(10));
        praise.setUserid(rand.nextInt(10));
        praise.setTarget_id(rand.nextInt(10));
        praise.setType(rand.nextInt(4) + 1);
        praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(praise);

        return packEventJson("praise", jsonObject);
    }

    /**
     * 評論
     */
    private static JSONObject generateComment() {

        AppComment comment = new AppComment();

        comment.setComment_id(rand.nextInt(10));
        comment.setUserid(rand.nextInt(10));
        comment.setP_comment_id(rand.nextInt(5));

        comment.setContent(getCONTENT());
        comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        comment.setOther_id(rand.nextInt(10));
        comment.setPraise_count(rand.nextInt(1000));
        comment.setReply_count(rand.nextInt(200));

        JSONObject jsonObject = (JSONObject) JSON.toJSON(comment);

        return packEventJson("comment", jsonObject);
    }

    /**
     * 生成單個漢字
     */
    private static char getRandomChar() {

        String str = "";
        int hightPos; //
        int lowPos;

        Random random = new Random();

        //隨機生成漢子的兩個字節
        hightPos = (176 + Math.abs(random.nextInt(39)));
        lowPos = (161 + Math.abs(random.nextInt(93)));

        byte[] b = new byte[2];
        b[0] = (Integer.valueOf(hightPos)).byteValue();
        b[1] = (Integer.valueOf(lowPos)).byteValue();

        try {
            str = new String(b, "GBK");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            System.out.println("錯誤");
        }

        return str.charAt(0);
    }

    /**
     * 拼接成多個漢字
     */
    private static String getCONTENT() {

        StringBuilder str = new StringBuilder();

        for (int i = 0; i < rand.nextInt(100); i++) {
            str.append(getRandomChar());
        }

        return str.toString();
    }
}

3.1.15 配置日志打印Logback

Logback主要用於在磁盤和控制台打印日志

Logback具體使用:

1)在resources文件夾下創建logback.xml文件。

2)在logback.xml文件中填寫如下配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
   <!--定義日志文件的存儲地址 勿在 LogBack 的配置中使用相對路徑 -->
   <property name="LOG_HOME" value="/tmp/logs/" />

   <!-- 控制台輸出 -->
   <appender name="STDOUT"
      class="ch.qos.logback.core.ConsoleAppender">
      <encoder
         class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日志消息,%n是換行符 -->
         <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
      </encoder>
   </appender>
   
   <!-- 按照每天生成日志文件。存儲事件日志 -->
   <appender name="FILE"
      class="ch.qos.logback.core.rolling.RollingFileAppender">
      <!-- <File>${LOG_HOME}/app.log</File>設置日志不超過${log.max.size}時的保存路徑,注意,如果是web項目會保存到Tomcat的bin目錄 下 -->  
      <rollingPolicy
         class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <!--日志文件輸出的文件名 -->
         <FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern>
         <!--日志文件保留天數 -->
         <MaxHistory>30</MaxHistory>
      </rollingPolicy>
      <encoder
         class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <pattern>%msg%n</pattern>
      </encoder>
      <!--日志文件最大的大小 -->
      <triggeringPolicy
         class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
         <MaxFileSize>10MB</MaxFileSize>
      </triggeringPolicy>
   </appender>

    <!--異步打印日志-->
    <appender name ="ASYNC_FILE" class= "ch.qos.logback.classic.AsyncAppender">
        <!-- 不丟失日志.默認的,如果隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日志 -->
        <discardingThreshold >0</discardingThreshold>
        <!-- 更改默認的隊列的深度,該值會影響性能.默認值為256 -->
        <queueSize>512</queueSize>
        <!-- 添加附加的appender,最多只能添加一個 -->
        <appender-ref ref = "FILE"/>
    </appender>

    <!-- 日志輸出級別 -->
   <root level="INFO">
      <appender-ref ref="STDOUT" />
      <appender-ref ref="ASYNC_FILE" />
      <appender-ref ref="error" />
   </root>
</configuration>

3.1.16 Maven打jar包

四、數據采集模塊

4.1 Hadoop安裝

 見大數據軟件安裝之Hadoop(Apache)(數據存儲及計算)

4.1.1 項目經驗之HDFS存儲多目錄

若HDFS存儲空間緊張,需要對DataNode進行磁盤擴展。

1)在DataNode節點增加磁盤並進行掛載。

 

 

 2)在hdfs-site.xml文件中配置多目錄,注意新掛載磁盤的訪問權限問題。

<property>

    <name>dfs.datanode.data.dir</name>

    <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>

</property>

3)增加磁盤后,保證每個目錄數據均衡

  開啟數據均衡命令:bin/start-balancer.sh -threshold 10

  對於參數10,代表的是集群中各個節點的磁盤空間利用率相差不超過10%,可根據實際情況調整。

  停止數據均衡命令:bin/stop-banlancer.sh

4.1.2 項目經驗之LZO壓縮配置

1)hadoop本身並不支持壓縮,故需要使用twitter提供的hadoop-lzo開源組件。hadoop-lzo需依賴hadoop和lzo進行編譯,編譯步驟如下。

lzo需依賴hadoop和lzo進行編譯,編譯步驟如下。

Hadoop支持LZO

0. 環境准備
maven(下載安裝,配置環境變量,修改sitting.xml加阿里雲鏡像)
gcc-c++
zlib-devel
autoconf
automake
libtool
通過yum安裝即可,yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool

1. 下載、安裝並編譯LZO

wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz

tar -zxvf lzo-2.10.tar.gz

cd lzo-2.10

./configure -prefix=/usr/local/hadoop/lzo/

make

make install

2. 編譯hadoop-lzo源碼

2.1 下載hadoop-lzo的源碼,下載地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
2.2 解壓之后,修改pom.xml
    <hadoop.current.version>2.7.2</hadoop.current.version>
2.3 聲明兩個臨時環境變量
     export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
     export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 
2.4 編譯
    進入hadoop-lzo-master,執行maven編譯命令
    mvn package -Dmaven.test.skip=true
2.5 進入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即編譯成功

2)將編譯好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[test@hadoop102 common]$ pwd

/opt/module/hadoop-2.7.2/share/hadoop/common

[test@hadoop102 common]$ ls

hadoop-lzo-0.4.20.jar

3)同步hadoop-lzo-0.4.20.jar 到hadoop103、hadoop104

[test@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar

4)core-site.xml增加配置支持LZO壓縮

<configuration>

 <property>

  <name>io.compression.codecs</name>

  <value>

  org.apache.hadoop.io.compress.GzipCodec,

  org.apache.hadoop.io.compress.DefaultCodec,

  org.apache.hadoop.io.compress.BZip2Codec,

  org.apache.hadoop.io.compress.SnappyCodec,

  com.hadoop.compression.lzo.LzoCodec,

  com.hadoop.compression.lzo.LzopCodec

  </value>

 </property>

<property>

    <name>io.compression.codec.lzo.class</name>

    <value>com.hadoop.compression.lzo.LzoCodec</value>

</property>

</configuration>

5)同步core-site.xml到hadoop103、hadoop104

[test@hadoop102 hadoop]$ xsync core-site.xml

6)啟動及查看集群

[test@hadoop102 hadoop-2.7.2]$  sbin/start-dfs.sh

[test@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh

4.1.3 項目經驗之LZO創建索引

1)創建LZO文件的索引,LZO壓縮文件的可切片特性依賴其索引,故我們需要手動為LZO壓縮文件創建索引。若無索引,則LZO文件的切片只有一個。

hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo

2)測試

(1)將bigtable.lzo(150M)上傳到集群的根目錄

[test@hadoop102 module]$ hadoop fs -mkdir /input

[test@hadoop102 module]$ hadoop fs -put bigtable.lzo /input

(2)對上傳的LZO文件建索引

[test@hadoop102 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo

4.1.4 項目經驗之基准測試

1)測試HDFS寫性能

測試內容:向HDFS集群寫10個128M的文件

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

19/05/02 11:45:23 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write

19/05/02 11:45:23 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:45:23 CST 2019

19/05/02 11:45:23 INFO fs.TestDFSIO:        Number of files: 10

19/05/02 11:45:23 INFO fs.TestDFSIO: Total MBytes processed: 1280.0

19/05/02 11:45:23 INFO fs.TestDFSIO:      Throughput mb/sec: 10.69751115716984

19/05/02 11:45:23 INFO fs.TestDFSIO: Average IO rate mb/sec: 14.91699504852295

19/05/02 11:45:23 INFO fs.TestDFSIO:  IO rate std deviation: 11.160882132355928

19/05/02 11:45:23 INFO fs.TestDFSIO:     Test exec time sec: 52.315

2)測試HDFS讀性能

測試內容:讀取HDFS集群10個128M的文件

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

19/05/02 11:56:36 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read

19/05/02 11:56:36 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:56:36 CST 2019

19/05/02 11:56:36 INFO fs.TestDFSIO:        Number of files: 10

19/05/02 11:56:36 INFO fs.TestDFSIO: Total MBytes processed: 1280.0

19/05/02 11:56:36 INFO fs.TestDFSIO:      Throughput mb/sec: 16.001000062503905

19/05/02 11:56:36 INFO fs.TestDFSIO: Average IO rate mb/sec: 17.202795028686523

19/05/02 11:56:36 INFO fs.TestDFSIO:  IO rate std deviation: 4.881590515873911

19/05/02 11:56:36 INFO fs.TestDFSIO:     Test exec time sec: 49.116

19/05/02 11:56:36 INFO fs.TestDFSIO:

3)刪除測試生成數據

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean

4)使用Sort程序評測MapReduce

(1)使用RandomWriter來產生隨機數,每個節點運行10個Map任務,每個Map產生大約1G大小的二進制隨機數

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomwriter random-data

(2)執行Sort程序

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar sort random-data sorted-data

(3)驗證數據是否真正排好序了

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

4.1.5 項目經驗之Hadoop參數調優

1)HDFS參數調優hdfs-site.xml

dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群規模為8台時,此參數設置為60

The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.

NameNode有一個工作線程池,用來處理不同DataNode的並發心跳以及客戶端並發的元數據操作。對於大集群或者有大量客戶端的集群來說,通常需要增大參數dfs.namenode.handler.count的默認值10。設置該值的一般原則是將其設置為集群大小的自然對數乘以20,即20logN,N為集群大小。

2)YARN參數調優yarn-site.xml

(1)情景描述:總共7台機器,每天幾億條數據,數據源->Flume->Kafka->HDFS->Hive

面臨問題:數據統計主要用HiveSQL,沒有數據傾斜,小文件已經做了合並處理,開啟的JVM重用,而且IO沒有阻塞,內存用了不到50%。但是還是跑的非常慢,而且數據量洪峰過來時,整個集群都會宕掉。基於這種情況有沒有優化方案。

(2)解決辦法:

內存利用率不夠。這個一般是Yarn的2個配置造成的,單個任務可以申請的最大內存大小,和Hadoop單個節點可用內存大小。調節這兩個參數能提高系統內存的利用率。

(a)yarn.nodemanager.resource.memory-mb

表示該節點上YARN可使用的物理內存總量,默認是8192(MB),注意,如果你的節點內存資源不夠8GB,則需要調減小這個值,而YARN不會智能的探測節點的物理內存總量。

(b)yarn.scheduler.maximum-allocation-mb

單個任務可申請的最多物理內存量,默認是8192(MB)。

3)Hadoop宕機

(1)如果MR造成系統宕機。此時要控制Yarn同時運行的任務數,和每個任務申請的最大內存。調整參數:yarn.scheduler.maximum-allocation-mb(單個任務可申請的最多物理內存量,默認是8192MB)

(2)如果寫入文件過量造成NameNode宕機。那么調高Kafka的存儲大小,控制從Kafka到HDFS的寫入速度。高峰期的時候用Kafka進行緩存,高峰期過去數據同步會自動跟上。

4.2 Zookeeper安裝

大數據軟件安裝之ZooKeeper監控 

集群規划

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Zookeeper

Zookeeper

Zookeeper

Zookeeper

4.2.1 ZK集群啟動停止腳本

1)在hadoop102的/home/test/bin目錄下創建腳本

[test@hadoop102 bin]$ vim zk.sh

       在腳本中編寫如下內容

#! /bin/bash

 

case $1 in

"start"){

   for i in hadoop102 hadoop103 hadoop104

   do

      ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"

   done

};;

"stop"){

   for i in hadoop102 hadoop103 hadoop104

   do

      ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"

   done

};;

"status"){

   for i in hadoop102 hadoop103 hadoop104

   do

      ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"

   done

};;

esac

2)增加腳本執行權限

[test@hadoop102 bin]$ chmod 777 zk.sh

3)Zookeeper集群啟動腳本

[test@hadoop102 module]$ zk.sh start

4)Zookeeper集群停止腳本

[test@hadoop102 module]$ zk.sh stop

4.2.2 項目經驗之Linux環境變量

1)修改/etc/profile文件:用來設置系統環境參數,比如$PATH. 這里面的環境變量是對系統內所有用戶生效。使用bash命令,需要source  /etc/profile一下。

2)修改~/.bashrc文件:針對某一個特定的用戶,環境變量的設置只對該用戶自己有效。使用bash命令,只要以該用戶身份運行命令行就會讀取該文件。

3)把/etc/profile里面的環境變量追加到~/.bashrc目錄

[test@hadoop102 ~]$ cat /etc/profile >> ~/.bashrc

[test@hadoop103 ~]$ cat /etc/profile >> ~/.bashrc

[test@hadoop104 ~]$ cat /etc/profile >> ~/.bashrc

4)說明

登錄式Shell,采用用戶名比如test登錄,會自動加載/etc/profile

非登錄式Shell,采用ssh 比如ssh hadoop103登錄,不會自動加載/etc/profile,會自動加載~/.bashrc

盡量將環境變量 部署在 /etc/profile.d/env.sh

4.3 日志生成

4.3.1 日志啟動

1)代碼參數說明

// 參數一:控制發送每條的延時時間,默認是0

Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

// 參數二:循環遍歷次數

int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

2)將生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷貝到hadoop102服務器/opt/module上,並同步到hadoop103的/opt/module路徑下,

[test@hadoop102 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在hadoop102上執行jar程序

[test@hadoop102 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain  >/opt/module/test.log

說明1:

java -classpath 需要在jar包后面指定全類名;

java -jar 需要查看一下解壓的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全類名。如果有可以用java -jar,如果沒有就需要用到java -classpath

說明2:/dev/null代表linux的空設備文件,所有往這個文件里面寫入的內容都會丟失,俗稱“黑洞”。

標准輸入0:從鍵盤獲得輸入 /proc/self/fd/0

標准輸出1:輸出到屏幕(即控制台) /proc/self/fd/1

錯誤輸出2:輸出到屏幕(即控制台) /proc/self/fd/2

4)在/tmp/logs路徑下查看生成的日志文件

[test@hadoop102 module]$ cd /tmp/logs/

[test@hadoop102 logs]$ ls

app-2020-03-10.log

4.3.2 集群日志生成啟動腳本

1)在/home/test/bin目錄下創建腳本lg.sh

[test@hadoop102 bin]$ vim lg.sh

       2)在腳本中編寫如下內容

#! /bin/bash

 

   for i in hadoop102 hadoop103

   do

      ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain $1 $2 >/dev/null 2>&1 &"

   done

3)修改腳本執行權限

[test@hadoop102 bin]$ chmod 777 lg.sh

4)啟動腳本

[test@hadoop102 module]$ lg.sh

5)分別在hadoop102、hadoop103的/tmp/logs目錄上查看生成的數據

[test@hadoop102 logs]$ ls

app-2020-03-10.log

[test@hadoop103 logs]$ ls

app-2020-03-10.log

4.3.3 集群時間同步修改腳本

1)在/home/test/bin目錄下創建腳本dt.sh

[test@hadoop102 bin]$ vim dt.sh

       2)在腳本中編寫如下內容

#!/bin/bash

 

for i in hadoop102 hadoop103 hadoop104

do

        echo "========== $i =========="

        ssh -t $i "sudo date -s $1"

done

注意:ssh -t 通常用於ssh遠程執行sudo命令

3)修改腳本執行權限

[test@hadoop102 bin]$ chmod 777 dt.sh

4)啟動腳本

[test@hadoop102 bin]$ dt.sh 2020-03-10

4.3.4 集群所有進程查看腳本

1)在/home/test/bin目錄下創建腳本xcall.sh

[test@hadoop102 bin]$ vim xcall.sh

       2)在腳本中編寫如下內容

#! /bin/bash

 

for i in hadoop102 hadoop103 hadoop104

do

        echo --------- $i ----------

        ssh $i "$*"

done

3)修改腳本執行權限

[test@hadoop102 bin]$ chmod 777 xcall.sh

4)啟動腳本

[test@hadoop102 bin]$ xcall.sh jps

4.4 采集日志Flume

4.4.1 日志采集Flume安裝

見 大數據軟件安裝之Flume(日志采集)

集群規划:

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Flume(采集日志)

Flume

Flume

 

4.4.2 項目經驗之Flume組件

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的優勢

TailDir Source:斷點續傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現斷點續傳。

Exec Source可以實時搜集數據,但是在Flume不運行或者Shell命令出錯的情況下,數據將會丟失。

Spooling Directory Source監控目錄,不支持斷點續傳。

(2)batchSize大小如何設置?

答:Event 1K左右時,500-1000合適(默認為100)

2)Channel

采用Kafka Channel,省去了Sink,提高了效率。

注意在Flume1.7以前,Kafka Channel很少有人使用,因為發現parseAsFlumeEvent這個配置起不了作用。也就是無論parseAsFlumeEvent配置為true還是false,都會轉為Flume Event。

這樣的話,造成的結果是,會始終都把Flume的headers中的信息混合着內容一起寫入Kafka的消息中,這顯然不是我所需要的,我只是需要把內容寫入即可。

4.4.3 日志采集Flume配置

1)Flume 配置分析

Flume直接讀log日志的數據,log日志的格式是app-yyyy-mm-dd.log。

2)Flume的配置如下:

(1)在/opt/module/flume/conf目錄下創建file-flume-kafka.conf文件

[test@hadoop102 conf]$ vim file-flume-kafka.conf

在文件配置如下內容

# 組件定義

a1.sources=r1

a1.channels=c1 c2

 

# taildir方式數據

# configure source

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json  #記錄日志讀取位置

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /tmp/logs/app.+   #讀取日志位置

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1 c2

 

#interceptor

a1.sources.r1.interceptors =  i1 i2

a1.sources.r1.interceptors.i1.type = com.test.flume.interceptor.LogETLInterceptor$Builder  #ETL攔截器

a1.sources.r1.interceptors.i2.type = com.test.flume.interceptor.LogTypeInterceptor$Builder   #日志類型攔截器

 

a1.sources.r1.selector.type = multiplexing  # 根據日志類型分數據

a1.sources.r1.selector.header = topic

a1.sources.r1.selector.mapping.topic_start = c1

a1.sources.r1.selector.mapping.topic_event = c2

 

# configure channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c1.kafka.topic = topic_start    #日志類型是start ,數據發往channel1

a1.channels.c1.parseAsFlumeEvent = false

a1.channels.c1.kafka.consumer.group.id = flume-consumer 

 

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c2.kafka.topic = topic_event    #日志類型是event,數據發往channel2

a1.channels.c2.parseAsFlumeEvent = false

a1.channels.c2.kafka.consumer.group.id = flume-consumer

            注意:com.test.flume.interceptor.LogETLInterceptor和com.test.flume.interceptor.LogTypeInterceptor是自定義的攔截器的全類名。需要根據用戶自定義的攔截器做相應修改。

4.4.4 Flume的ETL和分類型攔截器

本項目中自定義了兩個攔截器,分別是:ETL攔截器、日志類型區分攔截器。

ETL攔截器主要用於,過濾時間戳不合法和Json數據不完整的日志

日志類型區分攔截器主要用於,將啟動日志和事件日志區分開來,方便發往Kafka的不同Topic。

1)創建Maven工程flume-interceptor

2)創建包名:com.test.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4)在com.test.flume.interceptor包下創建LogETLInterceptor類名

Flume ETL攔截器LogETLInterceptor

package com.test.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1 獲取數據
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 判斷數據類型並向Header中賦值
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)){
                return event;
            }
        }else {
            if (LogUtils.validateEvent(log)){
                return event;
            }
        }

        // 3 返回校驗結果
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            if (intercept1 != null){
                interceptors.add(intercept1);
            }
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

5)Flume日志過濾工具類

package com.test.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

    public static boolean validateEvent(String log) {
        // 服務器時間 | json
        // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}

        // 1 切割
        String[] logContents = log.split("\\|");

        // 2 校驗
        if(logContents.length != 2){
            return false;
        }

        //3 校驗服務器時間
        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
            return false;
        }

        // 4 校驗json
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
            return false;
        }

        return true;
    }

    public static boolean validateStart(String log) {
 // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}

        if (log == null){
            return false;
        }

        // 校驗json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
            return false;
        }

        return true;
    }
}
5)Flume日志類型區分攔截器LogTypeInterceptor
package com.test.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 區分日志類型:   body  header
        // 1 獲取body數據
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 獲取header
        Map<String, String> headers = event.getHeaders();

        // 3 判斷數據類型並向Header中賦值
        if (log.contains("start")) {
            headers.put("topic","topic_start");
        }else {
            headers.put("topic","topic_event");
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            interceptors.add(intercept1);
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

6)打包

攔截器打包之后,只需要單獨包,不需要將依賴的包上傳。打包之后要放入Flume的lib文件夾下面。

4.4.5 日志采集Flume啟動停止腳本

1)在/home/test/bin目錄下創建腳本f1.sh

[test@hadoop102 bin]$ vim f1.sh

       在腳本中填寫如下內容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------啟動 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &"
        done
};;    
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

說明1:nohup,該命令可以在你退出帳戶/關閉終端之后繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令

說明2:awk 默認分隔符為空格

說明3:xargs 表示取出前面命令運行的結果,作為后面命令的輸入參數。

2)增加腳本執行權限

[test@hadoop102 bin]$ chmod 777 f1.sh

3)f1集群啟動腳本

[test@hadoop102 module]$ f1.sh start

4)f1集群停止腳本

[test@hadoop102 module]$ f1.sh stop

4.5 Kafka安裝

4.5.1 Kafka集群安裝

見 大數據安裝之Kafka(用於實時處理的消息隊列)

集群規划:

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Kafka

Kafka

Kafka

Kafka

4.5.2 Kafka集群啟動停止腳本

1)在/home/test/bin目錄下創建腳本kf.sh

[test@hadoop102 bin]$ vim kf.sh

       在腳本中填寫如下內容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------啟動 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------停止 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
};;
esac

2)增加腳本執行權限

[test@hadoop102 bin]$ chmod 777 kf.sh

3)kf集群啟動腳本

[test@hadoop102 module]$ kf.sh start

4)kf集群停止腳本

[test@hadoop102 module]$ kf.sh stop

4.5.3 查看Kafka Topic列表

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

4.5.4 創建Kafka Topic

進入到/opt/module/kafka/目錄下分別創建:啟動日志主題、事件日志主題。

1)創建啟動日志主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions 1 --topic topic_start

2)創建事件日志主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions 1 --topic topic_event

4.5.5 刪除Kafka Topic

1)刪除啟動日志主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start

2)刪除事件日志主題

[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_event

4.5.6 Kafka生產消息

[test@hadoop102 kafka]$ bin/kafka-console-producer.sh \

--broker-list hadoop102:9092 --topic topic_start

>hello world

>test  test

4.5.7 Kafka消費消息

[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic topic_start

--from-beginning:會把主題中以往所有的數據都讀取出來。根據業務場景選擇是否增加該配置。

4.5.8 查看Kafka Topic詳情

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \

--describe --topic topic_start

4.5.9 項目經驗之Kafka壓力測試

1)Kafka壓測

用Kafka官方自帶的腳本,對Kafka進行壓測。Kafka壓測時,可以查看到哪個地方出現了瓶頸(CPU,內存,網絡IO)。一般都是網絡IO達到瓶頸

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

2)Kafka Producer壓力測試

(1)在/opt/module/kafka/bin目錄下面有這兩個文件。我們來測試一下

[test@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

說明:

record-size是一條信息有多大,單位是字節。

num-records是總共發送多少條信息。

throughput 是每秒多少條信息,設成-1,表示不限流,可測出生產者最大吞吐量。

(2)Kafka會打印下面的信息

100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.

參數解析:本例中一共寫入10w條消息,吞吐量為9.14 MB/sec,每次寫入的平均延遲為187.68毫秒,最大的延遲為424.00毫秒。

3)Kafka Consumer壓力測試

Consumer的測試,如果這四個指標(IO,CPU,內存,網絡)都不能改變,考慮增加分區數來提升性能。

[test@hadoop102 kafka]$

bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

參數說明:

--zookeeper 指定zookeeper的鏈接信息

--topic 指定topic的名稱

--fetch-size 指定每次fetch的數據的大小

--messages 總共要消費的消息個數

測試結果說明:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec

2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153

開始測試時間,測試結束數據,共消費數據9.5368MB,吞吐量2.0714MB/s,共消費100010條,平均每秒消費21722.4153條。

4.5.10 項目經驗之Kafka機器數量計算

Kafka機器數量(經驗公式)=2*(峰值生產速度*副本數/100)+1

先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署Kafka的數量。

比如我們的峰值生產速度是50M/s。副本數為2。

Kafka機器數量=2*(50*2/100)+ 1=3台

 

4.6 消費Kafka數據Flume

 

集群規划

 

 

服務器hadoop102

服務器hadoop103

服務器hadoop104

Flume(消費Kafka)

 

 

Flume

4.6.1 日志消費Flume配置

1)Flume配置分析

2)Flume的具體配置如下:

       (1)在hadoop104的/opt/module/flume/conf目錄下創建kafka-flume-hdfs.conf文件

[test@hadoop104 conf]$ vim kafka-flume-hdfs.conf

在文件配置如下內容

## 組件定義
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1   #kafka start主題源數據
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start

## source2   #kafka event主題源數據
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

## channel1   
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1    #start主題數據輸出到HDFS路徑
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
 
##sink2           #event 主題數據輸出到HDFS路徑如果hadoop和flume不在一台服務器需要在路徑前邊增加hdfs://hadoop102:9000/
a1.sinks.k2.type = hdfs     
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-

## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10    #生成文件大小設定
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制輸出文件是原生文件。      
a1.sinks.k1.hdfs.fileType = CompressedStream      #支持LZO數據壓縮設置
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.6.2 項目經驗之Flume組件

1)FileChannel和MemoryChannel區別

MemoryChannel傳輸數據速度更快,但因為數據保存在JVM的堆內存中,Agent進程掛掉會導致數據丟失,適用於對數據質量要求不高的需求。

FileChannel傳輸速度相對於Memory慢,但數據安全保障高,Agent進程掛掉也可以從失敗中恢復數據。

2)FileChannel優化

通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。

官方說明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影響?

元數據層面:每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命

計算層面:默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。

       (2)HDFS小文件處理

官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:

(1)文件在達到128M時會滾動生成新文件

(2)文件創建超3600秒時會滾動生成新文件

4.6.3 日志消費Flume啟動停止腳本

1)在/home/test/bin目錄下創建腳本f2.sh

[test@hadoop102 bin]$ vim f2.sh

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------啟動 $i 消費flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消費flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

2)增加腳本執行權限

[test@hadoop102 bin]$ chmod 777 f2.sh

3)f2集群啟動腳本

[test@hadoop102 module]$ f2.sh start

4)f2集群停止腳本

[test@hadoop102 module]$ f2.sh stop

4.6.4 項目經驗之Flume內存優化

1)問題描述:如果啟動消費Flume拋出如下異常

ERROR hdfs.HDFSEventSink: process failed

java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解決方案步驟:

(1)在hadoop102服務器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop103、hadoop104服務器

[test@hadoop102 conf]$ xsync flume-env.sh

3)Flume內存參數設置及優化

JVM heap一般設置為4G或更高,部署在單獨的服務器上(4核8線程16G內存)

-Xmx與-Xms最好設置一致,減少內存抖動帶來的性能影響,如果設置不一致容易導致頻繁fullgc。

-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大允許的尺寸,按需分配。如果不設置一致,容易在初始化時,由於內存不夠,頻繁觸發fullgc。

 

4.7 采集通道啟動/停止腳本

1)在/home/test/bin目錄下創建腳本cluster.sh

[test@hadoop102 bin]$ vim cluster.sh

#! /bin/bash

case $1 in
"start"){
    echo " -------- 啟動 集群 -------"

    echo " -------- 啟動 hadoop集群 -------"
    /opt/module/hadoop-2.7.2/sbin/start-dfs.sh 
    ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"

    #啟動 Zookeeper集群
    zk.sh start

sleep 4s;

    #啟動 Flume采集集群
    f1.sh start

    #啟動 Kafka采集集群
    kf.sh start

sleep 6s;

    #啟動 Flume消費集群
    f2.sh start

    };;
"stop"){
    echo " -------- 停止 集群 -------"


    #停止 Flume消費集群
    f2.sh stop

    #停止 Kafka采集集群
    kf.sh stop

    sleep 6s;

    #停止 Flume采集集群
    f1.sh stop

    #停止 Zookeeper集群
    zk.sh stop

    echo " -------- 停止 hadoop集群 -------"
    ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
    /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh 
};;
esac

2)增加腳本執行權限

[test@hadoop102 bin]$ chmod 777 cluster.sh

3)cluster集群啟動腳本

[test@hadoop102 module]$ cluster.sh start

4)cluster集群停止腳本

[test@hadoop102 module]$ cluster.sh stop

五、總結

5.1 數倉概念總結

數據倉庫的輸入數據源和輸出系統分別是什么?

輸入系統:埋點產生的用戶給行為數據、JavaEE后台產生的業務數據。

輸出系統:報表系統、用戶畫像系統、推薦系統

5.2 項目需求及架構總結

5.2.1 集群規模計算

 

5.2.2 框架版本選型

1)Apache:運維麻煩,組件間兼容性需要自己調研。(一般大廠使用,技術實力雄厚,有專業的運維人員)(建議使用)

2)CDH:國內使用最多的版本,但CM不開源,但其實對中、小公司使用來說沒有影響

3)HDP:開源,可以進行二次開發,但是沒有CDH穩定,國內使用較少

5.2.3 服務器選型

5.3 數據采集模塊總結

5.3.1 Linxu&Shell相關總結

1)Linux常用命令

序號

命令

命令解釋

1

top

查看內存

2

df -h

查看磁盤存儲情況

3

iotop

查看磁盤IO讀寫(yum install iotop安裝)

4

iotop -o

直接查看比較高的磁盤讀寫程序

5

netstat -tunlp | grep 端口號

查看端口占用情況

6

uptime

查看報告系統運行時長及平均負載

7

ps  aux

查看進程

2)Shell常用工具

awk、sed、cut、sort

5.3.2 Hadoop相關總結

1)Hadoop默認不支持LZO壓縮,如果需要支持LZO壓縮,需要添加jar包,並在hadoop的cores-site.xml文件中添加相關壓縮配置。

見  項目經驗之LZO創建索引

2)Hadoop常用端口號

50070 hdfs,8088 mr任務,19888 歷史服務器,9000 客戶端訪問集群

3)Hadoop配置文件以及簡單的Hadoop集群搭建

core-site.xml hadoop-env.sh
hdfs-site.xml yarn-env.sh
yarn-site.xml mapred-env.sh
mapred-site.xml slaves

4)HDFS讀流程和寫流程

5)MapReduce的Shuffle過程及Hadoop優化(包括:壓縮、小文件、集群優化)

Shuffer在map方法之后,reduce方法之前
數據出來后首先進入getpartition(),然后進入還原緩沖區,還原緩沖區一側存數據,一側存索引,到達80%進行反向溢寫,
還原緩沖區默認大小是100M。溢寫過程中(進行排序,按照快排的手段排序,對key的索引排序,按照字典順序排),溢寫 之前要進行各種排序,排完序之后把溢寫文件存進來(產生大量溢寫文件),對溢寫文件進行歸並排序,歸並完之后按照指定分 區存好數據。等待reduce端來拉去數據,拉取自己指定分區數據,拉取過來先放到內存,內存不夠溢寫到磁盤,不管事內存 還是磁盤數據都進行歸並,歸並過程當中進行分組排序,最后進入到對應的reduce方法里去。
Shuffer優化 還原緩沖區默認大小是100 調到200M ;設置到90%溢寫(減少溢寫文件個數,起到優化作用);
溢寫文件可以提前采用一次combiner(前提條件是求和);默認一次歸並個數是10個,可以調到20個-30個;
為了減少磁盤IO在map端對數據采用壓縮;有幾個地方可以壓縮Map輸入端、Map輸出端、Reduce輸出端可以進行壓縮;

6)Yarn的Job提交流程

7)Yarn的默認調度器、調度器分類、以及他們之間的區別

默認是FIFO調度器
FIFO調度器、容量調度器、公平調度器
FIFO調度器:先進先出
選型:
對並發度要求搞,且錢的公司:公平調度器(中、大公司)
對並發度要求不是太高,且不是特別錢:容量(中小公司)
容量調度器:默認只一個default隊列,在開發時會用多個隊列
技術框架:hive、spark、flink
業務創建隊列:登陸注冊、購物車、用戶行為、業務數據。。。分開放的好處是解耦、降低風險

8)HDFS存儲多目錄

9)Hadoop參數調優

10)項目經驗之基准測試

5.3.3 Zookeeper相關總結

1)選舉機制   

半數機制,安裝奇數台服務器
10台服務器安裝幾個zookeeper:3台。
20台服務器安裝幾個zookeeper:5台。
100台服務器安裝幾個zookeeper:11台。
不是越多越好,也不是越少越好。如果多,通信時間常,效率低;如太少,可靠性差。

2)常用命令

       ls、get、create

5.3.4 Flume相關總結

1)Flume組成,Put事務,Take事務

       Taildir Source:斷點續傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現斷點續傳。

       File Channel:數據存儲在磁盤,宕機數據可以保存。但是傳輸速率慢。適合對數據傳輸可靠性要求高的場景,比如,金融行業。

       Memory Channel:數據存儲在內存中,宕機數據丟失。傳輸速率快。適合對數據傳輸可靠性要求不高的場景,比如,普通的日志數據。

       Kafka Channel:減少了Flume的Sink階段,提高了傳輸效率。          

       Source到Channel是Put事務

       Channel到Sink是Take事務

2)Flume攔截器

       (1)攔截器注意事項

              項目中自定義了:ETL攔截器和區分類型攔截器。

采用兩個攔截器的優缺點:優點,模塊化開發和可移植性;缺點,性能會低一些

       (2)自定義攔截器步驟

a)實現 Interceptor

b)重寫四個方法

  • initialize 初始化
  • public Event intercept(Event event) 處理單個Event
  • public List<Event> intercept(List<Event> events) 處理多個Event,在這個方法中調用Event intercept(Event event)
  • close 方法

c)靜態內部類,實現Interceptor.Builder

3)Flume Channel選擇器

 

4)Flume 監控器

Ganglia

5)Flume采集數據會丟失嗎?

不會,Channel存儲可以存儲在File中,數據傳輸自身有事務。

6)Flume內存

開發中在flume-env.sh中設置JVM heap為4G或更高,部署在單獨的服務器上(4核8線程16G內存)

-Xmx與-Xms最好設置一致,減少內存抖動帶來的性能影響,如果設置不一致容易導致頻繁fullgc。

-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大允許的尺寸,按需分配。如果不設置一致,容易在初始化時,由於內存不夠,頻繁觸發fullgc。

7)FileChannel優化

通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。

官方說明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據

8)Sink:HDFS Sink小文件處理

(1)HDFS存入大量小文件,有什么影響?

元數據層面:每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命

計算層面:默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。

       (2)HDFS小文件處理

官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:

(1)文件在達到128M時會滾動生成新文件

(2)文件創建超3600秒時會滾動生成新文件

舉例:在2018-01-01 05:23的時侯sink接收到數據,那會產生如下tmp文件:

 5.3.5 Kafka相關總結

 

 

 

 

 

 

 

 

 

 

 

 

 

1)Kafka壓測

Kafka官方自帶壓力測試腳本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka壓測時,可以查看到哪個地方出現了瓶頸(CPU,內存,網絡IO)。一般都是網絡IO達到瓶頸。

2)Kafka的機器數量

Kafka機器數量=2*(峰值生產速度*副本數/100)+1

3)Kafka的日志保存時間

3天

4)Kafka的硬盤大小

每天的數據量*3天

5)Kafka監控

公司自己開發的監控器;

開源的監控器:KafkaManager、KafkaMonitor

6)Kakfa分區數。

(1)創建一個只有1個分區的topic

(2)測試這個topic的producer吞吐量和consumer吞吐量。

(3)假設他們的值分別是Tp和Tc,單位可以是MB/s。

(4)然后假設總的目標吞吐量是Tt,那么分區數=Tt / min(Tp,Tc)

例如:producer吞吐量=10m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;

分區數=100 / 10 =10分區

分區數一般設置為:3-10個

7)副本數設定

一般我們設置成2個或3個,很多企業設置為2個。

8)多少個Topic

     通常情況:多少個日志類型就多少個Topic。也有對日志類型進行合並的。

9)Kafka丟不丟數據

Ack=0,producer不等待kafka broker的ack,一直生產數據。

Ack=1,leader數據落盤就發送ack,producer收到ack才繼續生產數據。

Ack=-1,ISR中的所有副本數據羅盤才發送ack,producer收到ack才繼續生產數據。

10)Kafka的ISR副本同步隊列

ISR(In-Sync Replicas),副本同步隊列。ISR中包括Leader和Follower。如果Leader進程掛掉,會在ISR隊列中選擇一個服務作為新的Leader。有replica.lag.max.messages(延遲條數)和replica.lag.time.max.ms(延遲時間)兩個參數決定一台服務是否可以加入ISR副本隊列,在0.10版本移除了replica.lag.max.messages參數,防止服務頻繁的進去隊列。

任意一個維度超過閾值都會把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會先存放在OSR中。

11)Kafka分區分配

Range和RoundRobin

12)Kafka中數據量計算

每天總數據量100g,每天產生1億條日志, 10000萬/24/60/60=1150條/每秒鍾

平均每秒鍾:1150條

低谷每秒鍾:400條

高峰每秒鍾:1150條*(2-20倍)=2300條-23000條

每條日志大小:0.5k-2k(取1k)

每秒多少數據量:2.0M-20MB

13) Kafka掛掉

(1)Flume記錄

(2)日志有記錄

(3)短期沒事

14)Kafka消息數據積壓,Kafka消費能力不足怎么處理?

(1)如果是Kafka消費能力不足,則可以考慮增加Topic的分區數,並且同時提升消費組的消費者數量,消費者數=分區數。(兩者缺一不可)

(2)如果是下游的數據處理不及時:提高每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間<生產速度),使處理的數據小於生產的數據,也會造成數據積壓。

15)Kafka冪等性

Kafka0.11版本引入了冪等性,冪等性配合at least once語義可以實現exactly once語義。但只能保證單次會話的冪等。

16)Kafka事務

Kafka0.11版本引入Kafka的事務機制,其可以保證生產者發往多個分區的一批數據的原子性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM