FLUME學習筆記--模擬Flume用戶行為日志采集


需求說明:

如下圖:要用Flume進行用戶行為日志數據采集到Hdfs目錄下,以便為hive數據倉庫提供用戶行為數據

大致數據流程如下:

    1)頁面或者app前端,通過采集用戶的頁面行為(如點擊某商品,瀏覽了什么商品,停留了在那個頁面這些行為),通過頁面JS發送數據

     到后台的日志服務器,日志服務器為集群結構,通過nginx做集群代理

    2)日志服務器,根據頁面發送的消息數據,將日志通過log4j方式寫入服務器目錄文件夾(按天生成日志文件,格式如2020-01-02.log)

    3)  在日志服務器配置FLUME進行日志監控,采集(通過Flumed的TailDir),將采集的日志寫入到Kafka的兩個topic,一個是topic_start(放啟動日志)

    另外一個放topic_event(事件日志),這里為啥用flume,而沒有直接把日志寫入hDFS,原因是,如果日志數據量大,kafka起到削峰的作用

    4) 在啟動一個FLUME進行消費kafka里的topic_event,topic_start ,Flume配置KafkaSource,FileChanle,HDFSSink

 

  •      完成步驟一:模擬日志服務器的寫日志的程序

          日志的模擬程序如下

         

 

         其中AppMain:啟動運行主類

        

package com.computech.appclient;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.computech.bean.AppActive_background;
import com.computech.bean.AppAd;
import com.computech.bean.AppBase;
import com.computech.bean.AppStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;

public class AppMain {

    private final static Logger log=LoggerFactory.getLogger(AppMain.class);

    private  static  int s_mid=0;
    private static   int s_uid=100;
    private  static  Random rand=new Random();

    public static void main(String[] args) {
        //接收2個參數 1:表示格多少秒生成一條  2:表示一次生成多少條記錄
        //默認是 0 和1000
        //參數一:
        long delay=args.length>0?Long.parseLong(args[0]):0L;
        //參數二:循環次數
        int loop_len=args.length>1?Integer.parseInt(args[1]):1000;
        for(int i=0;i<loop_len;i++){
            //循環生成數據
            int flage=rand.nextInt(2);
            switch (flage){
                case (0):
                    //todo 生成啟動日志
                    AppStart start= generateStartLog();
                    String s_start = JSON.toJSONString(start);
                    //控制台打印
                    log.info(s_start);
                    break;
                case(1):
                    //todo 事件日志 2部分 1:公共信息  2 事件數組
                    JSONObject jsobj = new JSONObject();
                    jsobj.put("ap","app");
                    jsobj.put("cm",generateCommfield());
                    //事件日志
                    JSONArray jsonarray = new JSONArray();
                    //添加廣告事件
                    if(rand.nextBoolean()){
                        jsonarray.add(generateAppAd());
                    }
                    //添加后台活動事件
                    if(rand.nextBoolean()){
                        jsonarray.add(generateActiveBackGorupd());
                    }
                    jsobj.put("et",jsonarray);
                    long currtime = System.currentTimeMillis();
                    log.info(currtime+"|"+jsobj.toJSONString());
                    break;
            }


        }
        ///
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
    //生成后台活躍事件
    private static JSONObject generateActiveBackGorupd() {
        AppActive_background appActive_background = new AppActive_background();
        //1=upgroup,2=download(下載) 3=plung_upgrade
        int f=rand.nextInt(3)+1;
        appActive_background.setActive_source(f+"");
        return packEventObj("activebk",(JSONObject) JSON.toJSON(appActive_background));
    }

    // 生成廣告事件
    private static JSONObject generateAppAd() {
        AppAd appAd = new AppAd();
        //入口 商品列表頁=1 應用首頁=2 商品詳情=3
          appAd.setEntry(rand.nextInt(3)+1+"");
          ////動作 廣告展示=1 廣告展示=2
          appAd.setAction(rand.nextInt(2)+1+"");
          ////type:1 商品 2:活動
           appAd.setContentType(rand.nextInt(2)+1+"");
           //展示時長
           appAd.setDisplayMill(rand.nextInt(100)+"");
           //商品id
            appAd.setItemId("item"+rand.nextInt(1000)+"");
            //活動id
            appAd.setActivityId(rand.nextInt(20)+"");
        return packEventObj("appad",(JSONObject) JSON.toJSON(appAd));
    }

    //生成公共字段信息
    private static JSONObject generateCommfield() {
        AppBase appBase = new AppBase();
         appBase.setUid(""+s_uid);
         s_uid++;
         appBase.setMid(""+s_mid);
         s_mid++;
        //設置版本序號
        appBase.setVc(""+rand.nextInt(20));
        //設置版本名稱 v1.1.1
        appBase.setVn("v"+rand.nextInt(4)+"."+rand.nextInt(10));
        //設置安卓的版本
        appBase.setOs("o"+rand.nextInt(3)+"."+rand.nextInt(10));

        //生成語言 en ch pt
        int flage=rand.nextInt(3);
        switch (flage){
            case (0):
                appBase.setL("en");
                break;
            case (1):
                appBase.setL("ch");
                break;
            case (2):
                appBase.setL("pt");
                break;
        }
        //生成渠道
        appBase.setSc("SC-001");
        //區域
        flage=rand.nextInt(2);
        switch (flage){
            case (0):
                appBase.setAr("BR");
                break;
            case (1):
                appBase.setAr("MX");
                break;
        }
        //手機品牌
        flage=rand.nextInt(3);
        switch (flage){
            case 0:
                appBase.setBa("sumsung");
                appBase.setMd("sumsung-"+rand.nextInt(20));
                break;
            case 1:
                appBase.setBa("HUWEI");
                appBase.setMd("HUWEI-"+rand.nextInt(20));
                break;
            case 2:
                appBase.setBa("IPHONE");
                appBase.setMd("IPHONE-"+rand.nextInt(20));
                break;
        }
        //設置sdk版本
        appBase.setSv("v2."+rand.nextInt(10)+"."+rand.nextInt(20));
        appBase.setG(""+rand.nextInt(100)+"@gomo.com");
        //設置屏幕寬度
        flage=rand.nextInt(3);
        switch (flage){
            case 0:
                appBase.setHw("640*900");
                break;
            case 1:
                appBase.setHw("640*1130");
                break;
            case 2:
                appBase.setHw("750*1130");
                break;
        }
        //客戶端生成的時間
        long mills = System.currentTimeMillis();
        appBase.setT(""+(mills-rand.nextInt(999999999)));
        //設置手機網絡
        flage=rand.nextInt(3);
        switch (flage){
            case 0:
                appBase.setNw("3");
                break;
            case 1:
                appBase.setHw("4G");
                break;
            case 2:
                appBase.setHw("wifi");
                break;
        }
        //設置經緯度
        appBase.setLn(rand.nextInt(60)+"");
        appBase.setLa(rand.nextInt(60)+"");
        return  (JSONObject)JSON.toJSON(appBase);
    }
     //封裝返回
    private static JSONObject packEventObj(String eventname,JSONObject jsonObject) {
        JSONObject jsnobj = new JSONObject();
        jsnobj.put("ett",(System.currentTimeMillis()-rand.nextInt(99999999)+""));
        jsnobj.put("en",eventname);
        jsnobj.put("kv",jsonObject);
        return jsnobj;
    }

    //生成啟動日志
    private static AppStart generateStartLog() {
        AppStart appStart = new AppStart();
         appStart.setMd(s_mid+"");
         s_mid++;
         appStart.setUid(s_uid+"");
         s_uid++;
         //設置版本序號
         appStart.setVc(""+rand.nextInt(20));
         //設置版本名稱 v1.1.1
         appStart.setVn("v"+rand.nextInt(4)+"."+rand.nextInt(10));
         //設置安卓的版本
         appStart.setOs("o"+rand.nextInt(3)+"."+rand.nextInt(10));
         //設置日志類別
          appStart.setEn("start");//類別為啟動日志
          //生成語言 en ch pt
           int flage=rand.nextInt(3);
           switch (flage){
               case (0):
                   appStart.setL("en");
                   break;
               case (1):
                    appStart.setL("ch");
                    break;
               case (2):
                    appStart.setL("pt");
                    break;
           }
           //生成渠道
           appStart.setSc("SC-001");
           //區域
           flage=rand.nextInt(2);
            switch (flage){
                case (0):
                    appStart.setAr("BR");
                    break;
                case (1):
                    appStart.setAr("MX");
                    break;
            }
            //手機品牌
           flage=rand.nextInt(3);
            switch (flage){
                case 0:
                    appStart.setBa("sumsung");
                    appStart.setMd("sumsung-"+rand.nextInt(20));
                    break;
                case 1:
                    appStart.setBa("HUWEI");
                    appStart.setMd("HUWEI-"+rand.nextInt(20));
                    break;
                case 2:
                    appStart.setBa("IPHONE");
                    appStart.setMd("IPHONE-"+rand.nextInt(20));
                    break;
            }
            //設置sdk版本
             appStart.setSv("v2."+rand.nextInt(10)+"."+rand.nextInt(20));
             appStart.setG(""+rand.nextInt(100)+"@gomo.com");
            //設置屏幕寬度
            flage=rand.nextInt(3);
            switch (flage){
                case 0:
                    appStart.setHw("640*900");
                    break;
                case 1:
                    appStart.setHw("640*1130");
                    break;
                case 2:
                    appStart.setHw("750*1130");
                    break;
            }
            //客戶端生成的時間
           long mills = System.currentTimeMillis();
            appStart.setT(""+(mills-rand.nextInt(999999999)));
            //設置手機網絡
            flage=rand.nextInt(3);
            switch (flage){
                case 0:
                    appStart.setNw("3");
                    break;
                case 1:
                    appStart.setHw("4G");
                    break;
                case 2:
                    appStart.setHw("wifi");
                    break;
            }
            //設置經緯度
           appStart.setLn(rand.nextInt(60)+"");
           appStart.setLa(rand.nextInt(60)+"");
           //入口
           appStart.setEntry(rand.nextInt(5)+"");
           //開屏廣告類型
           appStart.setOpen_ad_type(rand.nextInt(2)+1+"");
           //狀態
           appStart.setAction(rand.nextInt(2)+1+"");
           //加載時常
           appStart.setLoading_time(rand.nextInt(20)+"");
           //失敗代碼
            flage=rand.nextInt(3);
            switch (flage){
                case 0:
                    appStart.setDetail("103");
                    break;
                case 1:
                    appStart.setDetail("203");
                    break;
                case 2:
                    appStart.setDetail("301");
                    break;
            }
            appStart.setExtend1("");

        return appStart;
    }
}

 

 

其他 事件類Bean

package com.computech.bean;

/**
 * 公共日志類
 */
public class AppBase {
    private String mid;//設備唯一編號
    private String uid;//用戶編號
    private String vc;//版本號
    private String vn;//版本名稱
    private String l;//系統語言
    private String sc;//渠道號
    private String os;//Andord系統版本
    private String ar;//區域
    private String md;//手機型號
    private String ba;//手機品牌
    private String sv;//sdkVersion
    private String g;//gomil
    private String hw;//屏幕的寬度
    private String t;//客戶日志生成的時間
    private String nw;//網絡模式
    private String ln;//經度
    private String la;//維度

    public String getMid() {
        return mid;
    }

    public void setMid(String mid) {
        this.mid = mid;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getVc() {
        return vc;
    }

    public void setVc(String vc) {
        this.vc = vc;
    }

    public String getVn() {
        return vn;
    }

    public void setVn(String vn) {
        this.vn = vn;
    }

    public String getL() {
        return l;
    }

    public void setL(String l) {
        this.l = l;
    }

    public String getSc() {
        return sc;
    }

    public void setSc(String sc) {
        this.sc = sc;
    }

    public String getOs() {
        return os;
    }

    public void setOs(String os) {
        this.os = os;
    }

    public String getAr() {
        return ar;
    }

    public void setAr(String ar) {
        this.ar = ar;
    }

    public String getMd() {
        return md;
    }

    public void setMd(String md) {
        this.md = md;
    }

    public String getBa() {
        return ba;
    }

    public void setBa(String ba) {
        this.ba = ba;
    }

    public String getSv() {
        return sv;
    }

    public void setSv(String sv) {
        this.sv = sv;
    }

    public String getG() {
        return g;
    }

    public void setG(String g) {
        this.g = g;
    }

    public String getHw() {
        return hw;
    }

    public void setHw(String hw) {
        this.hw = hw;
    }

    public String getT() {
        return t;
    }

    public void setT(String t) {
        this.t = t;
    }

    public String getNw() {
        return nw;
    }

    public void setNw(String nw) {
        this.nw = nw;
    }

    public String getLn() {
        return ln;
    }

    public void setLn(String ln) {
        this.ln = ln;
    }

    public String getLa() {
        return la;
    }

    public void setLa(String la) {
        this.la = la;
    }
}

  

package com.computech.bean;

/**
 * 啟動日志 在 base日志的基礎上多了下面的屬性
 */
public class AppStart  extends AppBase{
    private String entry;//入口 push=1 widget=2 icon=3 nofiyication=4 lockscreem_widge=5
    private String open_ad_type;//開啟廣告類型: 開屏原生廣告=1 開啟插播廣告=2
    private String action;//狀態:成功=1,失敗=2
    private String loading_time;//加載時間
    private String detail;//失敗碼
    private String extend1;//失敗消息
    private String en;//啟動日志類標記

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getOpen_ad_type() {
        return open_ad_type;
    }

    public void setOpen_ad_type(String open_ad_type) {
        this.open_ad_type = open_ad_type;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getDetail() {
        return detail;
    }

    public void setDetail(String detail) {
        this.detail = detail;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getEn() {
        return en;
    }

    public void setEn(String en) {
        this.en = en;
    }
}

  

package com.computech.bean;

/**
 * 后台活躍事件
 */
public class AppActive_background {
    private  String active_source;//1=upgroup,2=download(下載) 3=plung_upgrade

    public String getActive_source() {
        return active_source;
    }

    public void setActive_source(String active_source) {
        this.active_source = active_source;
    }
}
package com.computech.bean;

/**
 * 廣告事件
 */
public class AppAd {
    private String entry;//入口 商品列表頁=1 應用首頁=2 商品詳情=3
    private String action;//動作 廣告展示=1 廣告展示=2
    private String contentType;//type:1 商品 2:活動
    private String displayMill;//展示時常
    private String itemId;// 商品id
    private String activityId;//網絡活動id

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getContentType() {
        return contentType;
    }

    public void setContentType(String contentType) {
        this.contentType = contentType;
    }

    public String getDisplayMill() {
        return displayMill;
    }

    public void setDisplayMill(String displayMill) {
        this.displayMill = displayMill;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }

    public String getActivityId() {
        return activityId;
    }

    public void setActivityId(String activityId) {
        this.activityId = activityId;
    }
}

  

  

  

logback.xml文件配置

<?xmlversion="1.0"encoding="UTF-8"?><configurationdebug="true"><!--用於指定logger上下文名稱,默認為default--><contextName>logback</contextName><!--用於指定logger文件存放的路徑--><propertyname="LOG_HOME"value="/tmp/logs"/><!--該appender的功能是將記錄信息以特定格式寫到控制台--><appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender"><!--encoder:將事件轉換為字符串默認配置為PatternLayoutEncoder類encoder用於替代Layout,encoder擴展了Layout功能Layout功能:只負責把事件轉換為字符串,但是不能指定何時將記錄寫入到指定目的地encoder功能:即負責把事件轉換為字符串,也可以指定何時將記錄寫入到指定目的地--><encoder><!--指定輸出格式%d{}:表示時間%thread:請求記錄所在的線程名%-5level:用5位顯示請求級別%logger{36}:輸出logger名,{36}表示logger名最大占用的字符位數,{0}表示最簡短logger名(不包含包名)。--><pattern>%d{yyyy-MM-ddHH:mm:ss.SSS}[%thread]%-5level%logger{0}-%msg%n</pattern></encoder></appender><!--按每天生產日志文件,存儲日志文件--><appendername="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><!--滾動日志策略--><rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!--日志文件的格式--><fileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</fileNamePattern><!--日志文件最大保留天數--><maxHistory>30</maxHistory></rollingPolicy><encoderclass="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--指定輸出格式--><pattern>%msg%n</pattern></encoder><!--日志文件最大大小--><triggeringPolicyclass="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"><maxFileSize>20MB</maxFileSize></triggeringPolicy></appender><!--異步打印--><appendername="ASYNC_FILE"class="ch.qos.logback.classic.AsyncAppender"><discardingThreshold>0</discardingThreshold><queueSize>512</queueSize><appender-refref="FILE"/></appender><!--level屬性:指定根logger的分配級別--><rootlevel="INFO"><!--ref屬性:指定根logger關聯的appender--><appender-refref="STDOUT"></appender-ref><appender-refref="ASYNC_FILE"></appender-ref><appender-refref="error"></appender-ref></root></configuration>
logback.xml

 對完成的程序進行打包,打jar包后放入2天服務器的指定目錄

 

編寫日志收集腳本(log_collect.sh)

 

#!/bin/bashforiindtinone20dtinone21doecho"---$i生成日志-------"ssh$i"source/etc/profile;java-cp/home/hadoop/jars/elehshop/logcollect.jarcom.computech.appclient.AppMain>/dev/null2>&1"done
log_collect.sh
 

        腳本運行后,在/tmp/logs目錄下生產 2020-01-02.log這樣的日志文件

  • 完成步驟二:完成Flume生產者配置以及編寫攔截器

            flume讀取了日志文件,需要根據日志文件的類別,進行區分,事件日志寫入topic_log_event主題,啟動日志 寫入topic_log_start主題,這就需要對flume的event數據進行標記,需要在在攔截器中根據每個數據,在event的頭部信息打上topic的區分標記

                需要編寫flume的攔截器,並打包,放入flume的lib目錄下

              代碼如下:

              

 

 

             LogIntercepotr:

package com.computech.flume;


import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

//自定義攔截器
public class LogInterceptor implements Interceptor {
    public void initialize() {

    }

    public Event intercept(Event event) {
        //單事件攔截 判斷收據的數據格式是否都是{} 這種格式
        byte[] body = event.getBody();//獲取event內容
        try {
            String s_body=new String(body,"UTF-8"); //轉成字符串
            if(s_body==null){
                return null;
            }
            return event;

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return null;
    }

    public List<Event> intercept(List<Event> events) {
        List<Event> outlist=new ArrayList<Event>();
        for(Event e:events){
            Event ev = intercept(e);
            if(ev!=null){
                outlist.add(ev);
            }
        }
        return outlist;
    }

    public void close() {

    }

    public static  class Builder implements Interceptor.Builder{

        public Interceptor build() {
            return new LogInterceptor();
        }

        public void configure(Context context) {
          //todo 無配置參數
        }
    }
}
LogInterCeptor

 

            LogTypeInterCeoptor:

package com.computech.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * 分類攔截器,根據數據,區分出啟動日志和業務日志,在
 * event的header中加入 topic 屬性
 * 為后面的選擇器,根據不同的類型,選擇不同的channle
 */
public class LogTypeInterceptor implements Interceptor {
    private HashMap<String,String> map;
    public void initialize() {
     //todo
        map=new HashMap<String, String>();
    }

    public Event intercept(Event event) {
        byte[] body = event.getBody();
        try {
            String s_body=new String(body,"UTF-8");
            if(s_body!=null){
                if(s_body.contains("start")){
                    //啟動日志
                    map.put("topic","topic_start");
                    event.setHeaders(map);
                }
                else
                {
                    // 事件日志
                    map.put("topic","topic_event");
                    event.setHeaders(map);
                }
                return event;
            }

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public List<Event> intercept(List<Event> events) {
        List<Event> outlist=new ArrayList<Event>();
        for(Event e:events){
            Event ev = intercept(e);
            if(ev!=null){
                outlist.add(ev);
            }
        }
        return outlist;
    }

    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        public void configure(Context context) {

        }
    }
}
LogTypeInterCeptor

        編寫Flume配置文件:log-flume-kafka.conf

        

a1.sources=r1a1.channels=c1c2##taildirsourcesa1.sources.r1.type=TAILDIR###positionlocala1.sources.r1.positionFile=/home/hadoop/data/flumedata/log_position.jsona1.sources.r1.filegroups=f1a1.sources.r1.filegroups.f1=/tmp/logs/app.+a1.sources.r1.fileHeader=true###攔截器a1.sources.r1.interceptors=i1i2a1.sources.r1.interceptors.i1.type=com.computech.flume.LogInterceptor$Builder##類型攔截器,為even加上topic頭信息a1.sources.r1.interceptors.i2.type=com.computech.flume.LogTypeInterceptor$Builder###source的選擇器a1.sources.r1.selector.type=multiplexinga1.sources.r1.selector.header=topica1.sources.r1.selector.mapping.topic_start=c1a1.sources.r1.selector.mapping.topic_event=c2##設置kafka的channela1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers=dtinone20:9092,dtinone21:9092,dtinone22:9092a1.channels.c1.kafka.topic=topic_log_starta1.channels.c1.parseAsFlumeEvent=falsea1.channels.c1.kafka.consumer.group.id=flume-consumera1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c2.kafka.bootstrap.servers=dtinone20:9092,dtinone21:9092,dtinone22:9092a1.channels.c2.kafka.topic=topic_log_eventa1.channels.c1.parseAsFlumeEvent=falsea1.channels.c2.kafka.consumer.group.id=flume-consumer#綁定關系a1.sources.r1.channels=c1c2
Log-flume-kafka.conf

 

      完成代碼后,也需要將程序打成jar包,放入 flume主目錄的lib文件夾下面

 

 

 

  • 完成步驟三:創建kafka的2個topic主題

          kafka-topics.sh --create --zookeeper dtinone20:2181,dtinone21:2181/kafka --topic topic_log_event --partitions 2 --replication-factor 2
          kafka-topics.sh --create --zookeeper dtinone20:2181,dtinone21:2181/kafka --topic topic_log_start --partitions 2 --replication-factor 2

        

  • 完成步驟四:配置從kafka消費記錄,到HDFS的FLUME

             flume的配置文件

          

#定義名稱
a1.sources = r1 r2
a1.channels = c1 c2
a1.sinks = k1 k2

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
#從kafka消費數據的批次時間
a1.sources.r1.batchDurationMillis  = 2000
a1.sources.r1.kafka.bootstrap.servers = dtinone20:9092,dtinone21:9092,dtinone22:9092
a1.sources.r1.kafka.topics = topic_log_start
a1.sources.r1.kafka.consumer.group.id = custom.g.id

#配置source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
#從kafka消費數據的批次時間
a1.sources.r2.batchDurationMillis  = 2000
a1.sources.r2.kafka.bootstrap.servers = dtinone20:9092,dtinone21:9092,dtinone22:9092
a1.sources.r2.kafka.topics = topic_log_event
a1.sources.r2.kafka.consumer.group.id = custom.g.id

#配置c1
a1.channels.c1.type = file
#文件索引緩存目錄
a1.channels.c1.checkpointDir = /home/hadoop/data/flumedata/checkpoint01
#文件目錄
a1.channels.c1.dataDirs = /home/hadoop/data/flumedata/behavoir01
#source從putlist 里面往channel放數據的等待時間
a1.channels.c1.keep-alive = 6

#配置c2
a1.channels.c2.type = file
#文件索引緩存目錄
a1.channels.c2.checkpointDir = /home/hadoop/data/flumedata/checkpoint02
#文件目錄
a1.channels.c2.dataDirs = /home/hadoop/data/flumedata/behavoir02
#source從putlist 里面往channel放數據的等待時間
a1.channels.c2.keep-alive = 6

#配置k1 hddfsink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /orgin_data/gmail/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

#配置k2 hddfsink
a1.sinks.k2.type = hdfs
a1.sinks.k2.channel = c1
a1.sinks.k2.hdfs.path = /orgin_data/gmail/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
                        
#配置生成大文件的方式  rollInterval 滾動時間
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k2.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k2.hdfs.rollInterval = 3600 
#配置文件大小128M
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollSize = 134217728
#不根據事件個數生成文件
a1.sinks.k1.rollCount = 0 
a1.sinks.k2.rollCount = 0 
#配置壓縮文件格式
#a1.sinks.k1.hdfs.fileType = CompressedStream  
#a1.sinks.k2.hdfs.fileType = CompressedStream
#配置壓縮文件編碼
#a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.codeC = lzop

#進行綁定
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
kafka-flume-hdfs.conf

 

 

 

  • 完成步驟五:編寫啟動腳本

       啟動腳本1:啟動fluem01.sh 啟動fulume的采集數據到kafak

       

#!/bin/bash
#啟動停止flume log 收集的服務
FLUME_HOME=/home/hadoop/apps/flume
case $1 in
"start"){
    for i in dtinone20 dtinone21
    do
        echo "----啟動 $i 的 flumle服務-----"
     ssh $i "source /etc/profile;nohup /home/hadoop/apps/flume/bin/flume-ng agent -n a1 --conf
 conf -f /home/hadoop/apps/flume/conf/log-flume-kafka.conf -Dflume.root.logger=INFO,console >n
ohupt.out 2>&1 &"
    done
};;
"stop"){
    for i in dtinone20 dtinone21
    do
        echo "----停止 $i 的 flumle服務-----"
      ssh $i "source /etc/profile;ps -ef|grep flume|grep -v grep|awk '{print \$2}'|xargs kill 
-9 "
       echo "---end-------------"
    done
};;
esac

 

   啟動Flume02.sh  消費kafka數據到hdfs

   

#!/bin/bash
#啟動 關閉 dtinone22機器上的 flume采集kafaka數據到hdfs目錄下的程序
case $1 in
"start"){
   for i in dtinone22
   do
      echo "---開啟 $i ---fluem 程序------"
      ssh $i "source /etc/profile;nohup /home/hadoop/apps/flume/bin/flume-ng agent -n a1 --con
f conf -f /home/hadoop/apps/flume/conf/kafka-flume-hdfs.conf -Dflume.root.logger=INFO,console 
>nohupt.out 2>&1 &"
   done

};;
"stop"){
    for i in dtinone22
    do
        echo "----停止 $i 的 flumle服務-----"
     ssh $i "source /etc/profile;ps -ef|grep flume|grep -v grep|awk '{print \$2}'|xargs kill -
9 "
    done
};;
esac

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM