需求说明:
如下图:要用Flume进行用户行为日志数据采集到Hdfs目录下,以便为hive数据仓库提供用户行为数据
大致数据流程如下:
1)页面或者app前端,通过采集用户的页面行为(如点击某商品,浏览了什么商品,停留了在那个页面这些行为),通过页面JS发送数据
到后台的日志服务器,日志服务器为集群结构,通过nginx做集群代理
2)日志服务器,根据页面发送的消息数据,将日志通过log4j方式写入服务器目录文件夹(按天生成日志文件,格式如2020-01-02.log)
3) 在日志服务器配置FLUME进行日志监控,采集(通过Flumed的TailDir),将采集的日志写入到Kafka的两个topic,一个是topic_start(放启动日志)
另外一个放topic_event(事件日志),这里为啥用flume,而没有直接把日志写入hDFS,原因是,如果日志数据量大,kafka起到削峰的作用
4) 在启动一个FLUME进行消费kafka里的topic_event,topic_start ,Flume配置KafkaSource,FileChanle,HDFSSink

- 完成步骤一:模拟日志服务器的写日志的程序
日志的模拟程序如下

其中AppMain:启动运行主类
package com.computech.appclient; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.computech.bean.AppActive_background; import com.computech.bean.AppAd; import com.computech.bean.AppBase; import com.computech.bean.AppStart; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; public class AppMain { private final static Logger log=LoggerFactory.getLogger(AppMain.class); private static int s_mid=0; private static int s_uid=100; private static Random rand=new Random(); public static void main(String[] args) { //接收2个参数 1:表示格多少秒生成一条 2:表示一次生成多少条记录 //默认是 0 和1000 //参数一: long delay=args.length>0?Long.parseLong(args[0]):0L; //参数二:循环次数 int loop_len=args.length>1?Integer.parseInt(args[1]):1000; for(int i=0;i<loop_len;i++){ //循环生成数据 int flage=rand.nextInt(2); switch (flage){ case (0): //todo 生成启动日志 AppStart start= generateStartLog(); String s_start = JSON.toJSONString(start); //控制台打印 log.info(s_start); break; case(1): //todo 事件日志 2部分 1:公共信息 2 事件数组 JSONObject jsobj = new JSONObject(); jsobj.put("ap","app"); jsobj.put("cm",generateCommfield()); //事件日志 JSONArray jsonarray = new JSONArray(); //添加广告事件 if(rand.nextBoolean()){ jsonarray.add(generateAppAd()); } //添加后台活动事件 if(rand.nextBoolean()){ jsonarray.add(generateActiveBackGorupd()); } jsobj.put("et",jsonarray); long currtime = System.currentTimeMillis(); log.info(currtime+"|"+jsobj.toJSONString()); break; } } /// try { Thread.sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); } } //生成后台活跃事件 private static JSONObject generateActiveBackGorupd() { AppActive_background appActive_background = new AppActive_background(); //1=upgroup,2=download(下载) 3=plung_upgrade int f=rand.nextInt(3)+1; appActive_background.setActive_source(f+""); return packEventObj("activebk",(JSONObject) JSON.toJSON(appActive_background)); } // 生成广告事件 private static JSONObject generateAppAd() { AppAd appAd = new AppAd(); //入口 商品列表页=1 应用首页=2 商品详情=3 appAd.setEntry(rand.nextInt(3)+1+""); ////动作 广告展示=1 广告展示=2 appAd.setAction(rand.nextInt(2)+1+""); ////type:1 商品 2:活动 appAd.setContentType(rand.nextInt(2)+1+""); //展示时长 appAd.setDisplayMill(rand.nextInt(100)+""); //商品id appAd.setItemId("item"+rand.nextInt(1000)+""); //活动id appAd.setActivityId(rand.nextInt(20)+""); return packEventObj("appad",(JSONObject) JSON.toJSON(appAd)); } //生成公共字段信息 private static JSONObject generateCommfield() { AppBase appBase = new AppBase(); appBase.setUid(""+s_uid); s_uid++; appBase.setMid(""+s_mid); s_mid++; //设置版本序号 appBase.setVc(""+rand.nextInt(20)); //设置版本名称 v1.1.1 appBase.setVn("v"+rand.nextInt(4)+"."+rand.nextInt(10)); //设置安卓的版本 appBase.setOs("o"+rand.nextInt(3)+"."+rand.nextInt(10)); //生成语言 en ch pt int flage=rand.nextInt(3); switch (flage){ case (0): appBase.setL("en"); break; case (1): appBase.setL("ch"); break; case (2): appBase.setL("pt"); break; } //生成渠道 appBase.setSc("SC-001"); //区域 flage=rand.nextInt(2); switch (flage){ case (0): appBase.setAr("BR"); break; case (1): appBase.setAr("MX"); break; } //手机品牌 flage=rand.nextInt(3); switch (flage){ case 0: appBase.setBa("sumsung"); appBase.setMd("sumsung-"+rand.nextInt(20)); break; case 1: appBase.setBa("HUWEI"); appBase.setMd("HUWEI-"+rand.nextInt(20)); break; case 2: appBase.setBa("IPHONE"); appBase.setMd("IPHONE-"+rand.nextInt(20)); break; } //设置sdk版本 appBase.setSv("v2."+rand.nextInt(10)+"."+rand.nextInt(20)); appBase.setG(""+rand.nextInt(100)+"@gomo.com"); //设置屏幕宽度 flage=rand.nextInt(3); switch (flage){ case 0: appBase.setHw("640*900"); break; case 1: appBase.setHw("640*1130"); break; case 2: appBase.setHw("750*1130"); break; } //客户端生成的时间 long mills = System.currentTimeMillis(); appBase.setT(""+(mills-rand.nextInt(999999999))); //设置手机网络 flage=rand.nextInt(3); switch (flage){ case 0: appBase.setNw("3"); break; case 1: appBase.setHw("4G"); break; case 2: appBase.setHw("wifi"); break; } //设置经纬度 appBase.setLn(rand.nextInt(60)+""); appBase.setLa(rand.nextInt(60)+""); return (JSONObject)JSON.toJSON(appBase); } //封装返回 private static JSONObject packEventObj(String eventname,JSONObject jsonObject) { JSONObject jsnobj = new JSONObject(); jsnobj.put("ett",(System.currentTimeMillis()-rand.nextInt(99999999)+"")); jsnobj.put("en",eventname); jsnobj.put("kv",jsonObject); return jsnobj; } //生成启动日志 private static AppStart generateStartLog() { AppStart appStart = new AppStart(); appStart.setMd(s_mid+""); s_mid++; appStart.setUid(s_uid+""); s_uid++; //设置版本序号 appStart.setVc(""+rand.nextInt(20)); //设置版本名称 v1.1.1 appStart.setVn("v"+rand.nextInt(4)+"."+rand.nextInt(10)); //设置安卓的版本 appStart.setOs("o"+rand.nextInt(3)+"."+rand.nextInt(10)); //设置日志类别 appStart.setEn("start");//类别为启动日志 //生成语言 en ch pt int flage=rand.nextInt(3); switch (flage){ case (0): appStart.setL("en"); break; case (1): appStart.setL("ch"); break; case (2): appStart.setL("pt"); break; } //生成渠道 appStart.setSc("SC-001"); //区域 flage=rand.nextInt(2); switch (flage){ case (0): appStart.setAr("BR"); break; case (1): appStart.setAr("MX"); break; } //手机品牌 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setBa("sumsung"); appStart.setMd("sumsung-"+rand.nextInt(20)); break; case 1: appStart.setBa("HUWEI"); appStart.setMd("HUWEI-"+rand.nextInt(20)); break; case 2: appStart.setBa("IPHONE"); appStart.setMd("IPHONE-"+rand.nextInt(20)); break; } //设置sdk版本 appStart.setSv("v2."+rand.nextInt(10)+"."+rand.nextInt(20)); appStart.setG(""+rand.nextInt(100)+"@gomo.com"); //设置屏幕宽度 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setHw("640*900"); break; case 1: appStart.setHw("640*1130"); break; case 2: appStart.setHw("750*1130"); break; } //客户端生成的时间 long mills = System.currentTimeMillis(); appStart.setT(""+(mills-rand.nextInt(999999999))); //设置手机网络 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setNw("3"); break; case 1: appStart.setHw("4G"); break; case 2: appStart.setHw("wifi"); break; } //设置经纬度 appStart.setLn(rand.nextInt(60)+""); appStart.setLa(rand.nextInt(60)+""); //入口 appStart.setEntry(rand.nextInt(5)+""); //开屏广告类型 appStart.setOpen_ad_type(rand.nextInt(2)+1+""); //状态 appStart.setAction(rand.nextInt(2)+1+""); //加载时常 appStart.setLoading_time(rand.nextInt(20)+""); //失败代码 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setDetail("103"); break; case 1: appStart.setDetail("203"); break; case 2: appStart.setDetail("301"); break; } appStart.setExtend1(""); return appStart; } }
其他 事件类Bean
package com.computech.bean;
/**
* 公共日志类
*/
public class AppBase {
private String mid;//设备唯一编号
private String uid;//用户编号
private String vc;//版本号
private String vn;//版本名称
private String l;//系统语言
private String sc;//渠道号
private String os;//Andord系统版本
private String ar;//区域
private String md;//手机型号
private String ba;//手机品牌
private String sv;//sdkVersion
private String g;//gomil
private String hw;//屏幕的宽度
private String t;//客户日志生成的时间
private String nw;//网络模式
private String ln;//经度
private String la;//维度
public String getMid() {
return mid;
}
public void setMid(String mid) {
this.mid = mid;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getVc() {
return vc;
}
public void setVc(String vc) {
this.vc = vc;
}
public String getVn() {
return vn;
}
public void setVn(String vn) {
this.vn = vn;
}
public String getL() {
return l;
}
public void setL(String l) {
this.l = l;
}
public String getSc() {
return sc;
}
public void setSc(String sc) {
this.sc = sc;
}
public String getOs() {
return os;
}
public void setOs(String os) {
this.os = os;
}
public String getAr() {
return ar;
}
public void setAr(String ar) {
this.ar = ar;
}
public String getMd() {
return md;
}
public void setMd(String md) {
this.md = md;
}
public String getBa() {
return ba;
}
public void setBa(String ba) {
this.ba = ba;
}
public String getSv() {
return sv;
}
public void setSv(String sv) {
this.sv = sv;
}
public String getG() {
return g;
}
public void setG(String g) {
this.g = g;
}
public String getHw() {
return hw;
}
public void setHw(String hw) {
this.hw = hw;
}
public String getT() {
return t;
}
public void setT(String t) {
this.t = t;
}
public String getNw() {
return nw;
}
public void setNw(String nw) {
this.nw = nw;
}
public String getLn() {
return ln;
}
public void setLn(String ln) {
this.ln = ln;
}
public String getLa() {
return la;
}
public void setLa(String la) {
this.la = la;
}
}
package com.computech.bean;
/**
* 启动日志 在 base日志的基础上多了下面的属性
*/
public class AppStart extends AppBase{
private String entry;//入口 push=1 widget=2 icon=3 nofiyication=4 lockscreem_widge=5
private String open_ad_type;//开启广告类型: 开屏原生广告=1 开启插播广告=2
private String action;//状态:成功=1,失败=2
private String loading_time;//加载时间
private String detail;//失败码
private String extend1;//失败消息
private String en;//启动日志类标记
public String getEntry() {
return entry;
}
public void setEntry(String entry) {
this.entry = entry;
}
public String getOpen_ad_type() {
return open_ad_type;
}
public void setOpen_ad_type(String open_ad_type) {
this.open_ad_type = open_ad_type;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getLoading_time() {
return loading_time;
}
public void setLoading_time(String loading_time) {
this.loading_time = loading_time;
}
public String getDetail() {
return detail;
}
public void setDetail(String detail) {
this.detail = detail;
}
public String getExtend1() {
return extend1;
}
public void setExtend1(String extend1) {
this.extend1 = extend1;
}
public String getEn() {
return en;
}
public void setEn(String en) {
this.en = en;
}
}
package com.computech.bean;
/**
* 后台活跃事件
*/
public class AppActive_background {
private String active_source;//1=upgroup,2=download(下载) 3=plung_upgrade
public String getActive_source() {
return active_source;
}
public void setActive_source(String active_source) {
this.active_source = active_source;
}
}
package com.computech.bean;
/**
* 广告事件
*/
public class AppAd {
private String entry;//入口 商品列表页=1 应用首页=2 商品详情=3
private String action;//动作 广告展示=1 广告展示=2
private String contentType;//type:1 商品 2:活动
private String displayMill;//展示时常
private String itemId;// 商品id
private String activityId;//网络活动id
public String getEntry() {
return entry;
}
public void setEntry(String entry) {
this.entry = entry;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getContentType() {
return contentType;
}
public void setContentType(String contentType) {
this.contentType = contentType;
}
public String getDisplayMill() {
return displayMill;
}
public void setDisplayMill(String displayMill) {
this.displayMill = displayMill;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public String getActivityId() {
return activityId;
}
public void setActivityId(String activityId) {
this.activityId = activityId;
}
}
logback.xml文件配置
<?xmlversion="1.0"encoding="UTF-8"?><configurationdebug="true"><!--用于指定logger上下文名称,默认为default--><contextName>logback</contextName><!--用于指定logger文件存放的路径--><propertyname="LOG_HOME"value="/tmp/logs"/><!--该appender的功能是将记录信息以特定格式写到控制台--><appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender"><!--encoder:将事件转换为字符串默认配置为PatternLayoutEncoder类encoder用于替代Layout,encoder扩展了Layout功能Layout功能:只负责把事件转换为字符串,但是不能指定何时将记录写入到指定目的地encoder功能:即负责把事件转换为字符串,也可以指定何时将记录写入到指定目的地--><encoder><!--指定输出格式%d{}:表示时间%thread:请求记录所在的线程名%-5level:用5位显示请求级别%logger{36}:输出logger名,{36}表示logger名最大占用的字符位数,{0}表示最简短logger名(不包含包名)。--><pattern>%d{yyyy-MM-ddHH:mm:ss.SSS}[%thread]%-5level%logger{0}-%msg%n</pattern></encoder></appender><!--按每天生产日志文件,存储日志文件--><appendername="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><!--滚动日志策略--><rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!--日志文件的格式--><fileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</fileNamePattern><!--日志文件最大保留天数--><maxHistory>30</maxHistory></rollingPolicy><encoderclass="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--指定输出格式--><pattern>%msg%n</pattern></encoder><!--日志文件最大大小--><triggeringPolicyclass="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"><maxFileSize>20MB</maxFileSize></triggeringPolicy></appender><!--异步打印--><appendername="ASYNC_FILE"class="ch.qos.logback.classic.AsyncAppender"><discardingThreshold>0</discardingThreshold><queueSize>512</queueSize><appender-refref="FILE"/></appender><!--level属性:指定根logger的分配级别--><rootlevel="INFO"><!--ref属性:指定根logger关联的appender--><appender-refref="STDOUT"></appender-ref><appender-refref="ASYNC_FILE"></appender-ref><appender-refref="error"></appender-ref></root></configuration>
对完成的程序进行打包,打jar包后放入2天服务器的指定目录
![]()
编写日志收集脚本(log_collect.sh)
#!/bin/bashforiindtinone20dtinone21doecho"---$i生成日志-------"ssh$i"source/etc/profile;java-cp/home/hadoop/jars/elehshop/logcollect.jarcom.computech.appclient.AppMain>/dev/null2>&1"done
脚本运行后,在/tmp/logs目录下生产 2020-01-02.log这样的日志文件
- 完成步骤二:完成Flume生产者配置以及编写拦截器
flume读取了日志文件,需要根据日志文件的类别,进行区分,事件日志写入topic_log_event主题,启动日志 写入topic_log_start主题,这就需要对flume的event数据进行标记,需要在在拦截器中根据每个数据,在event的头部信息打上topic的区分标记
需要编写flume的拦截器,并打包,放入flume的lib目录下
代码如下:

LogIntercepotr:
package com.computech.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; //自定义拦截器 public class LogInterceptor implements Interceptor { public void initialize() { } public Event intercept(Event event) { //单事件拦截 判断收据的数据格式是否都是{} 这种格式 byte[] body = event.getBody();//获取event内容 try { String s_body=new String(body,"UTF-8"); //转成字符串 if(s_body==null){ return null; } return event; } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } public List<Event> intercept(List<Event> events) { List<Event> outlist=new ArrayList<Event>(); for(Event e:events){ Event ev = intercept(e); if(ev!=null){ outlist.add(ev); } } return outlist; } public void close() { } public static class Builder implements Interceptor.Builder{ public Interceptor build() { return new LogInterceptor(); } public void configure(Context context) { //todo 无配置参数 } } }
LogTypeInterCeoptor:
package com.computech.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * 分类拦截器,根据数据,区分出启动日志和业务日志,在 * event的header中加入 topic 属性 * 为后面的选择器,根据不同的类型,选择不同的channle */ public class LogTypeInterceptor implements Interceptor { private HashMap<String,String> map; public void initialize() { //todo map=new HashMap<String, String>(); } public Event intercept(Event event) { byte[] body = event.getBody(); try { String s_body=new String(body,"UTF-8"); if(s_body!=null){ if(s_body.contains("start")){ //启动日志 map.put("topic","topic_start"); event.setHeaders(map); } else { // 事件日志 map.put("topic","topic_event"); event.setHeaders(map); } return event; } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } public List<Event> intercept(List<Event> events) { List<Event> outlist=new ArrayList<Event>(); for(Event e:events){ Event ev = intercept(e); if(ev!=null){ outlist.add(ev); } } return outlist; } public void close() { } public static class Builder implements Interceptor.Builder{ public Interceptor build() { return new LogTypeInterceptor(); } public void configure(Context context) { } } }
编写Flume配置文件:log-flume-kafka.conf
a1.sources=r1a1.channels=c1c2##taildirsourcesa1.sources.r1.type=TAILDIR###positionlocala1.sources.r1.positionFile=/home/hadoop/data/flumedata/log_position.jsona1.sources.r1.filegroups=f1a1.sources.r1.filegroups.f1=/tmp/logs/app.+a1.sources.r1.fileHeader=true###拦截器a1.sources.r1.interceptors=i1i2a1.sources.r1.interceptors.i1.type=com.computech.flume.LogInterceptor$Builder##类型拦截器,为even加上topic头信息a1.sources.r1.interceptors.i2.type=com.computech.flume.LogTypeInterceptor$Builder###source的选择器a1.sources.r1.selector.type=multiplexinga1.sources.r1.selector.header=topica1.sources.r1.selector.mapping.topic_start=c1a1.sources.r1.selector.mapping.topic_event=c2##设置kafka的channela1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers=dtinone20:9092,dtinone21:9092,dtinone22:9092a1.channels.c1.kafka.topic=topic_log_starta1.channels.c1.parseAsFlumeEvent=falsea1.channels.c1.kafka.consumer.group.id=flume-consumera1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c2.kafka.bootstrap.servers=dtinone20:9092,dtinone21:9092,dtinone22:9092a1.channels.c2.kafka.topic=topic_log_eventa1.channels.c1.parseAsFlumeEvent=falsea1.channels.c2.kafka.consumer.group.id=flume-consumer#绑定关系a1.sources.r1.channels=c1c2
完成代码后,也需要将程序打成jar包,放入 flume主目录的lib文件夹下面
- 完成步骤三:创建kafka的2个topic主题
kafka-topics.sh --create --zookeeper dtinone20:2181,dtinone21:2181/kafka --topic topic_log_event --partitions 2 --replication-factor 2
kafka-topics.sh --create --zookeeper dtinone20:2181,dtinone21:2181/kafka --topic topic_log_start --partitions 2 --replication-factor 2
- 完成步骤四:配置从kafka消费记录,到HDFS的FLUME
flume的配置文件
#定义名称 a1.sources = r1 r2 a1.channels = c1 c2 a1.sinks = k1 k2 #配置source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 #从kafka消费数据的批次时间 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = dtinone20:9092,dtinone21:9092,dtinone22:9092 a1.sources.r1.kafka.topics = topic_log_start a1.sources.r1.kafka.consumer.group.id = custom.g.id #配置source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 #从kafka消费数据的批次时间 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = dtinone20:9092,dtinone21:9092,dtinone22:9092 a1.sources.r2.kafka.topics = topic_log_event a1.sources.r2.kafka.consumer.group.id = custom.g.id #配置c1 a1.channels.c1.type = file #文件索引缓存目录 a1.channels.c1.checkpointDir = /home/hadoop/data/flumedata/checkpoint01 #文件目录 a1.channels.c1.dataDirs = /home/hadoop/data/flumedata/behavoir01 #source从putlist 里面往channel放数据的等待时间 a1.channels.c1.keep-alive = 6 #配置c2 a1.channels.c2.type = file #文件索引缓存目录 a1.channels.c2.checkpointDir = /home/hadoop/data/flumedata/checkpoint02 #文件目录 a1.channels.c2.dataDirs = /home/hadoop/data/flumedata/behavoir02 #source从putlist 里面往channel放数据的等待时间 a1.channels.c2.keep-alive = 6 #配置k1 hddfsink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /orgin_data/gmail/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- #配置k2 hddfsink a1.sinks.k2.type = hdfs a1.sinks.k2.channel = c1 a1.sinks.k2.hdfs.path = /orgin_data/gmail/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- #配置生成大文件的方式 rollInterval 滚动时间 a1.sinks.k1.hdfs.minBlockReplicas = 1 a1.sinks.k2.hdfs.minBlockReplicas = 1 a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollInterval = 3600 #配置文件大小128M a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollSize = 134217728 #不根据事件个数生成文件 a1.sinks.k1.rollCount = 0 a1.sinks.k2.rollCount = 0 #配置压缩文件格式 #a1.sinks.k1.hdfs.fileType = CompressedStream #a1.sinks.k2.hdfs.fileType = CompressedStream #配置压缩文件编码 #a1.sinks.k1.hdfs.codeC = lzop #a1.sinks.k2.hdfs.codeC = lzop #进行绑定 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sources.r1.channels=c1 a1.sources.r2.channels=c2
- 完成步骤五:编写启动脚本
启动脚本1:启动fluem01.sh 启动fulume的采集数据到kafak
#!/bin/bash #启动停止flume log 收集的服务 FLUME_HOME=/home/hadoop/apps/flume case $1 in "start"){ for i in dtinone20 dtinone21 do echo "----启动 $i 的 flumle服务-----" ssh $i "source /etc/profile;nohup /home/hadoop/apps/flume/bin/flume-ng agent -n a1 --conf conf -f /home/hadoop/apps/flume/conf/log-flume-kafka.conf -Dflume.root.logger=INFO,console >n ohupt.out 2>&1 &" done };; "stop"){ for i in dtinone20 dtinone21 do echo "----停止 $i 的 flumle服务-----" ssh $i "source /etc/profile;ps -ef|grep flume|grep -v grep|awk '{print \$2}'|xargs kill -9 " echo "---end-------------" done };; esac
启动Flume02.sh 消费kafka数据到hdfs
#!/bin/bash #启动 关闭 dtinone22机器上的 flume采集kafaka数据到hdfs目录下的程序 case $1 in "start"){ for i in dtinone22 do echo "---开启 $i ---fluem 程序------" ssh $i "source /etc/profile;nohup /home/hadoop/apps/flume/bin/flume-ng agent -n a1 --con f conf -f /home/hadoop/apps/flume/conf/kafka-flume-hdfs.conf -Dflume.root.logger=INFO,console >nohupt.out 2>&1 &" done };; "stop"){ for i in dtinone22 do echo "----停止 $i 的 flumle服务-----" ssh $i "source /etc/profile;ps -ef|grep flume|grep -v grep|awk '{print \$2}'|xargs kill - 9 " done };; esac
