需求说明:
如下图:要用Flume进行用户行为日志数据采集到Hdfs目录下,以便为hive数据仓库提供用户行为数据
大致数据流程如下:
1)页面或者app前端,通过采集用户的页面行为(如点击某商品,浏览了什么商品,停留了在那个页面这些行为),通过页面JS发送数据
到后台的日志服务器,日志服务器为集群结构,通过nginx做集群代理
2)日志服务器,根据页面发送的消息数据,将日志通过log4j方式写入服务器目录文件夹(按天生成日志文件,格式如2020-01-02.log)
3) 在日志服务器配置FLUME进行日志监控,采集(通过Flumed的TailDir),将采集的日志写入到Kafka的两个topic,一个是topic_start(放启动日志)
另外一个放topic_event(事件日志),这里为啥用flume,而没有直接把日志写入hDFS,原因是,如果日志数据量大,kafka起到削峰的作用
4) 在启动一个FLUME进行消费kafka里的topic_event,topic_start ,Flume配置KafkaSource,FileChanle,HDFSSink
- 完成步骤一:模拟日志服务器的写日志的程序
日志的模拟程序如下
其中AppMain:启动运行主类
package com.computech.appclient; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.computech.bean.AppActive_background; import com.computech.bean.AppAd; import com.computech.bean.AppBase; import com.computech.bean.AppStart; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; public class AppMain { private final static Logger log=LoggerFactory.getLogger(AppMain.class); private static int s_mid=0; private static int s_uid=100; private static Random rand=new Random(); public static void main(String[] args) { //接收2个参数 1:表示格多少秒生成一条 2:表示一次生成多少条记录 //默认是 0 和1000 //参数一: long delay=args.length>0?Long.parseLong(args[0]):0L; //参数二:循环次数 int loop_len=args.length>1?Integer.parseInt(args[1]):1000; for(int i=0;i<loop_len;i++){ //循环生成数据 int flage=rand.nextInt(2); switch (flage){ case (0): //todo 生成启动日志 AppStart start= generateStartLog(); String s_start = JSON.toJSONString(start); //控制台打印 log.info(s_start); break; case(1): //todo 事件日志 2部分 1:公共信息 2 事件数组 JSONObject jsobj = new JSONObject(); jsobj.put("ap","app"); jsobj.put("cm",generateCommfield()); //事件日志 JSONArray jsonarray = new JSONArray(); //添加广告事件 if(rand.nextBoolean()){ jsonarray.add(generateAppAd()); } //添加后台活动事件 if(rand.nextBoolean()){ jsonarray.add(generateActiveBackGorupd()); } jsobj.put("et",jsonarray); long currtime = System.currentTimeMillis(); log.info(currtime+"|"+jsobj.toJSONString()); break; } } /// try { Thread.sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); } } //生成后台活跃事件 private static JSONObject generateActiveBackGorupd() { AppActive_background appActive_background = new AppActive_background(); //1=upgroup,2=download(下载) 3=plung_upgrade int f=rand.nextInt(3)+1; appActive_background.setActive_source(f+""); return packEventObj("activebk",(JSONObject) JSON.toJSON(appActive_background)); } // 生成广告事件 private static JSONObject generateAppAd() { AppAd appAd = new AppAd(); //入口 商品列表页=1 应用首页=2 商品详情=3 appAd.setEntry(rand.nextInt(3)+1+""); ////动作 广告展示=1 广告展示=2 appAd.setAction(rand.nextInt(2)+1+""); ////type:1 商品 2:活动 appAd.setContentType(rand.nextInt(2)+1+""); //展示时长 appAd.setDisplayMill(rand.nextInt(100)+""); //商品id appAd.setItemId("item"+rand.nextInt(1000)+""); //活动id appAd.setActivityId(rand.nextInt(20)+""); return packEventObj("appad",(JSONObject) JSON.toJSON(appAd)); } //生成公共字段信息 private static JSONObject generateCommfield() { AppBase appBase = new AppBase(); appBase.setUid(""+s_uid); s_uid++; appBase.setMid(""+s_mid); s_mid++; //设置版本序号 appBase.setVc(""+rand.nextInt(20)); //设置版本名称 v1.1.1 appBase.setVn("v"+rand.nextInt(4)+"."+rand.nextInt(10)); //设置安卓的版本 appBase.setOs("o"+rand.nextInt(3)+"."+rand.nextInt(10)); //生成语言 en ch pt int flage=rand.nextInt(3); switch (flage){ case (0): appBase.setL("en"); break; case (1): appBase.setL("ch"); break; case (2): appBase.setL("pt"); break; } //生成渠道 appBase.setSc("SC-001"); //区域 flage=rand.nextInt(2); switch (flage){ case (0): appBase.setAr("BR"); break; case (1): appBase.setAr("MX"); break; } //手机品牌 flage=rand.nextInt(3); switch (flage){ case 0: appBase.setBa("sumsung"); appBase.setMd("sumsung-"+rand.nextInt(20)); break; case 1: appBase.setBa("HUWEI"); appBase.setMd("HUWEI-"+rand.nextInt(20)); break; case 2: appBase.setBa("IPHONE"); appBase.setMd("IPHONE-"+rand.nextInt(20)); break; } //设置sdk版本 appBase.setSv("v2."+rand.nextInt(10)+"."+rand.nextInt(20)); appBase.setG(""+rand.nextInt(100)+"@gomo.com"); //设置屏幕宽度 flage=rand.nextInt(3); switch (flage){ case 0: appBase.setHw("640*900"); break; case 1: appBase.setHw("640*1130"); break; case 2: appBase.setHw("750*1130"); break; } //客户端生成的时间 long mills = System.currentTimeMillis(); appBase.setT(""+(mills-rand.nextInt(999999999))); //设置手机网络 flage=rand.nextInt(3); switch (flage){ case 0: appBase.setNw("3"); break; case 1: appBase.setHw("4G"); break; case 2: appBase.setHw("wifi"); break; } //设置经纬度 appBase.setLn(rand.nextInt(60)+""); appBase.setLa(rand.nextInt(60)+""); return (JSONObject)JSON.toJSON(appBase); } //封装返回 private static JSONObject packEventObj(String eventname,JSONObject jsonObject) { JSONObject jsnobj = new JSONObject(); jsnobj.put("ett",(System.currentTimeMillis()-rand.nextInt(99999999)+"")); jsnobj.put("en",eventname); jsnobj.put("kv",jsonObject); return jsnobj; } //生成启动日志 private static AppStart generateStartLog() { AppStart appStart = new AppStart(); appStart.setMd(s_mid+""); s_mid++; appStart.setUid(s_uid+""); s_uid++; //设置版本序号 appStart.setVc(""+rand.nextInt(20)); //设置版本名称 v1.1.1 appStart.setVn("v"+rand.nextInt(4)+"."+rand.nextInt(10)); //设置安卓的版本 appStart.setOs("o"+rand.nextInt(3)+"."+rand.nextInt(10)); //设置日志类别 appStart.setEn("start");//类别为启动日志 //生成语言 en ch pt int flage=rand.nextInt(3); switch (flage){ case (0): appStart.setL("en"); break; case (1): appStart.setL("ch"); break; case (2): appStart.setL("pt"); break; } //生成渠道 appStart.setSc("SC-001"); //区域 flage=rand.nextInt(2); switch (flage){ case (0): appStart.setAr("BR"); break; case (1): appStart.setAr("MX"); break; } //手机品牌 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setBa("sumsung"); appStart.setMd("sumsung-"+rand.nextInt(20)); break; case 1: appStart.setBa("HUWEI"); appStart.setMd("HUWEI-"+rand.nextInt(20)); break; case 2: appStart.setBa("IPHONE"); appStart.setMd("IPHONE-"+rand.nextInt(20)); break; } //设置sdk版本 appStart.setSv("v2."+rand.nextInt(10)+"."+rand.nextInt(20)); appStart.setG(""+rand.nextInt(100)+"@gomo.com"); //设置屏幕宽度 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setHw("640*900"); break; case 1: appStart.setHw("640*1130"); break; case 2: appStart.setHw("750*1130"); break; } //客户端生成的时间 long mills = System.currentTimeMillis(); appStart.setT(""+(mills-rand.nextInt(999999999))); //设置手机网络 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setNw("3"); break; case 1: appStart.setHw("4G"); break; case 2: appStart.setHw("wifi"); break; } //设置经纬度 appStart.setLn(rand.nextInt(60)+""); appStart.setLa(rand.nextInt(60)+""); //入口 appStart.setEntry(rand.nextInt(5)+""); //开屏广告类型 appStart.setOpen_ad_type(rand.nextInt(2)+1+""); //状态 appStart.setAction(rand.nextInt(2)+1+""); //加载时常 appStart.setLoading_time(rand.nextInt(20)+""); //失败代码 flage=rand.nextInt(3); switch (flage){ case 0: appStart.setDetail("103"); break; case 1: appStart.setDetail("203"); break; case 2: appStart.setDetail("301"); break; } appStart.setExtend1(""); return appStart; } }
其他 事件类Bean
package com.computech.bean; /** * 公共日志类 */ public class AppBase { private String mid;//设备唯一编号 private String uid;//用户编号 private String vc;//版本号 private String vn;//版本名称 private String l;//系统语言 private String sc;//渠道号 private String os;//Andord系统版本 private String ar;//区域 private String md;//手机型号 private String ba;//手机品牌 private String sv;//sdkVersion private String g;//gomil private String hw;//屏幕的宽度 private String t;//客户日志生成的时间 private String nw;//网络模式 private String ln;//经度 private String la;//维度 public String getMid() { return mid; } public void setMid(String mid) { this.mid = mid; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getVc() { return vc; } public void setVc(String vc) { this.vc = vc; } public String getVn() { return vn; } public void setVn(String vn) { this.vn = vn; } public String getL() { return l; } public void setL(String l) { this.l = l; } public String getSc() { return sc; } public void setSc(String sc) { this.sc = sc; } public String getOs() { return os; } public void setOs(String os) { this.os = os; } public String getAr() { return ar; } public void setAr(String ar) { this.ar = ar; } public String getMd() { return md; } public void setMd(String md) { this.md = md; } public String getBa() { return ba; } public void setBa(String ba) { this.ba = ba; } public String getSv() { return sv; } public void setSv(String sv) { this.sv = sv; } public String getG() { return g; } public void setG(String g) { this.g = g; } public String getHw() { return hw; } public void setHw(String hw) { this.hw = hw; } public String getT() { return t; } public void setT(String t) { this.t = t; } public String getNw() { return nw; } public void setNw(String nw) { this.nw = nw; } public String getLn() { return ln; } public void setLn(String ln) { this.ln = ln; } public String getLa() { return la; } public void setLa(String la) { this.la = la; } }
package com.computech.bean; /** * 启动日志 在 base日志的基础上多了下面的属性 */ public class AppStart extends AppBase{ private String entry;//入口 push=1 widget=2 icon=3 nofiyication=4 lockscreem_widge=5 private String open_ad_type;//开启广告类型: 开屏原生广告=1 开启插播广告=2 private String action;//状态:成功=1,失败=2 private String loading_time;//加载时间 private String detail;//失败码 private String extend1;//失败消息 private String en;//启动日志类标记 public String getEntry() { return entry; } public void setEntry(String entry) { this.entry = entry; } public String getOpen_ad_type() { return open_ad_type; } public void setOpen_ad_type(String open_ad_type) { this.open_ad_type = open_ad_type; } public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getLoading_time() { return loading_time; } public void setLoading_time(String loading_time) { this.loading_time = loading_time; } public String getDetail() { return detail; } public void setDetail(String detail) { this.detail = detail; } public String getExtend1() { return extend1; } public void setExtend1(String extend1) { this.extend1 = extend1; } public String getEn() { return en; } public void setEn(String en) { this.en = en; } }
package com.computech.bean; /** * 后台活跃事件 */ public class AppActive_background { private String active_source;//1=upgroup,2=download(下载) 3=plung_upgrade public String getActive_source() { return active_source; } public void setActive_source(String active_source) { this.active_source = active_source; } }
package com.computech.bean; /** * 广告事件 */ public class AppAd { private String entry;//入口 商品列表页=1 应用首页=2 商品详情=3 private String action;//动作 广告展示=1 广告展示=2 private String contentType;//type:1 商品 2:活动 private String displayMill;//展示时常 private String itemId;// 商品id private String activityId;//网络活动id public String getEntry() { return entry; } public void setEntry(String entry) { this.entry = entry; } public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getContentType() { return contentType; } public void setContentType(String contentType) { this.contentType = contentType; } public String getDisplayMill() { return displayMill; } public void setDisplayMill(String displayMill) { this.displayMill = displayMill; } public String getItemId() { return itemId; } public void setItemId(String itemId) { this.itemId = itemId; } public String getActivityId() { return activityId; } public void setActivityId(String activityId) { this.activityId = activityId; } }
logback.xml文件配置

<?xmlversion="1.0"encoding="UTF-8"?><configurationdebug="true"><!--用于指定logger上下文名称,默认为default--><contextName>logback</contextName><!--用于指定logger文件存放的路径--><propertyname="LOG_HOME"value="/tmp/logs"/><!--该appender的功能是将记录信息以特定格式写到控制台--><appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender"><!--encoder:将事件转换为字符串默认配置为PatternLayoutEncoder类encoder用于替代Layout,encoder扩展了Layout功能Layout功能:只负责把事件转换为字符串,但是不能指定何时将记录写入到指定目的地encoder功能:即负责把事件转换为字符串,也可以指定何时将记录写入到指定目的地--><encoder><!--指定输出格式%d{}:表示时间%thread:请求记录所在的线程名%-5level:用5位显示请求级别%logger{36}:输出logger名,{36}表示logger名最大占用的字符位数,{0}表示最简短logger名(不包含包名)。--><pattern>%d{yyyy-MM-ddHH:mm:ss.SSS}[%thread]%-5level%logger{0}-%msg%n</pattern></encoder></appender><!--按每天生产日志文件,存储日志文件--><appendername="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><!--滚动日志策略--><rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!--日志文件的格式--><fileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</fileNamePattern><!--日志文件最大保留天数--><maxHistory>30</maxHistory></rollingPolicy><encoderclass="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--指定输出格式--><pattern>%msg%n</pattern></encoder><!--日志文件最大大小--><triggeringPolicyclass="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"><maxFileSize>20MB</maxFileSize></triggeringPolicy></appender><!--异步打印--><appendername="ASYNC_FILE"class="ch.qos.logback.classic.AsyncAppender"><discardingThreshold>0</discardingThreshold><queueSize>512</queueSize><appender-refref="FILE"/></appender><!--level属性:指定根logger的分配级别--><rootlevel="INFO"><!--ref属性:指定根logger关联的appender--><appender-refref="STDOUT"></appender-ref><appender-refref="ASYNC_FILE"></appender-ref><appender-refref="error"></appender-ref></root></configuration>
对完成的程序进行打包,打jar包后放入2天服务器的指定目录
编写日志收集脚本(log_collect.sh)

#!/bin/bashforiindtinone20dtinone21doecho"---$i生成日志-------"ssh$i"source/etc/profile;java-cp/home/hadoop/jars/elehshop/logcollect.jarcom.computech.appclient.AppMain>/dev/null2>&1"done
脚本运行后,在/tmp/logs目录下生产 2020-01-02.log这样的日志文件
- 完成步骤二:完成Flume生产者配置以及编写拦截器
flume读取了日志文件,需要根据日志文件的类别,进行区分,事件日志写入topic_log_event主题,启动日志 写入topic_log_start主题,这就需要对flume的event数据进行标记,需要在在拦截器中根据每个数据,在event的头部信息打上topic的区分标记
需要编写flume的拦截器,并打包,放入flume的lib目录下
代码如下:
LogIntercepotr:

package com.computech.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; //自定义拦截器 public class LogInterceptor implements Interceptor { public void initialize() { } public Event intercept(Event event) { //单事件拦截 判断收据的数据格式是否都是{} 这种格式 byte[] body = event.getBody();//获取event内容 try { String s_body=new String(body,"UTF-8"); //转成字符串 if(s_body==null){ return null; } return event; } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } public List<Event> intercept(List<Event> events) { List<Event> outlist=new ArrayList<Event>(); for(Event e:events){ Event ev = intercept(e); if(ev!=null){ outlist.add(ev); } } return outlist; } public void close() { } public static class Builder implements Interceptor.Builder{ public Interceptor build() { return new LogInterceptor(); } public void configure(Context context) { //todo 无配置参数 } } }
LogTypeInterCeoptor:

package com.computech.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * 分类拦截器,根据数据,区分出启动日志和业务日志,在 * event的header中加入 topic 属性 * 为后面的选择器,根据不同的类型,选择不同的channle */ public class LogTypeInterceptor implements Interceptor { private HashMap<String,String> map; public void initialize() { //todo map=new HashMap<String, String>(); } public Event intercept(Event event) { byte[] body = event.getBody(); try { String s_body=new String(body,"UTF-8"); if(s_body!=null){ if(s_body.contains("start")){ //启动日志 map.put("topic","topic_start"); event.setHeaders(map); } else { // 事件日志 map.put("topic","topic_event"); event.setHeaders(map); } return event; } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } public List<Event> intercept(List<Event> events) { List<Event> outlist=new ArrayList<Event>(); for(Event e:events){ Event ev = intercept(e); if(ev!=null){ outlist.add(ev); } } return outlist; } public void close() { } public static class Builder implements Interceptor.Builder{ public Interceptor build() { return new LogTypeInterceptor(); } public void configure(Context context) { } } }
编写Flume配置文件:log-flume-kafka.conf

a1.sources=r1a1.channels=c1c2##taildirsourcesa1.sources.r1.type=TAILDIR###positionlocala1.sources.r1.positionFile=/home/hadoop/data/flumedata/log_position.jsona1.sources.r1.filegroups=f1a1.sources.r1.filegroups.f1=/tmp/logs/app.+a1.sources.r1.fileHeader=true###拦截器a1.sources.r1.interceptors=i1i2a1.sources.r1.interceptors.i1.type=com.computech.flume.LogInterceptor$Builder##类型拦截器,为even加上topic头信息a1.sources.r1.interceptors.i2.type=com.computech.flume.LogTypeInterceptor$Builder###source的选择器a1.sources.r1.selector.type=multiplexinga1.sources.r1.selector.header=topica1.sources.r1.selector.mapping.topic_start=c1a1.sources.r1.selector.mapping.topic_event=c2##设置kafka的channela1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers=dtinone20:9092,dtinone21:9092,dtinone22:9092a1.channels.c1.kafka.topic=topic_log_starta1.channels.c1.parseAsFlumeEvent=falsea1.channels.c1.kafka.consumer.group.id=flume-consumera1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c2.kafka.bootstrap.servers=dtinone20:9092,dtinone21:9092,dtinone22:9092a1.channels.c2.kafka.topic=topic_log_eventa1.channels.c1.parseAsFlumeEvent=falsea1.channels.c2.kafka.consumer.group.id=flume-consumer#绑定关系a1.sources.r1.channels=c1c2
完成代码后,也需要将程序打成jar包,放入 flume主目录的lib文件夹下面
- 完成步骤三:创建kafka的2个topic主题
kafka-topics.sh --create --zookeeper dtinone20:2181,dtinone21:2181/kafka --topic topic_log_event --partitions 2 --replication-factor 2
kafka-topics.sh --create --zookeeper dtinone20:2181,dtinone21:2181/kafka --topic topic_log_start --partitions 2 --replication-factor 2
- 完成步骤四:配置从kafka消费记录,到HDFS的FLUME
flume的配置文件

#定义名称 a1.sources = r1 r2 a1.channels = c1 c2 a1.sinks = k1 k2 #配置source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 #从kafka消费数据的批次时间 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = dtinone20:9092,dtinone21:9092,dtinone22:9092 a1.sources.r1.kafka.topics = topic_log_start a1.sources.r1.kafka.consumer.group.id = custom.g.id #配置source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 #从kafka消费数据的批次时间 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = dtinone20:9092,dtinone21:9092,dtinone22:9092 a1.sources.r2.kafka.topics = topic_log_event a1.sources.r2.kafka.consumer.group.id = custom.g.id #配置c1 a1.channels.c1.type = file #文件索引缓存目录 a1.channels.c1.checkpointDir = /home/hadoop/data/flumedata/checkpoint01 #文件目录 a1.channels.c1.dataDirs = /home/hadoop/data/flumedata/behavoir01 #source从putlist 里面往channel放数据的等待时间 a1.channels.c1.keep-alive = 6 #配置c2 a1.channels.c2.type = file #文件索引缓存目录 a1.channels.c2.checkpointDir = /home/hadoop/data/flumedata/checkpoint02 #文件目录 a1.channels.c2.dataDirs = /home/hadoop/data/flumedata/behavoir02 #source从putlist 里面往channel放数据的等待时间 a1.channels.c2.keep-alive = 6 #配置k1 hddfsink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /orgin_data/gmail/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- #配置k2 hddfsink a1.sinks.k2.type = hdfs a1.sinks.k2.channel = c1 a1.sinks.k2.hdfs.path = /orgin_data/gmail/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- #配置生成大文件的方式 rollInterval 滚动时间 a1.sinks.k1.hdfs.minBlockReplicas = 1 a1.sinks.k2.hdfs.minBlockReplicas = 1 a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollInterval = 3600 #配置文件大小128M a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollSize = 134217728 #不根据事件个数生成文件 a1.sinks.k1.rollCount = 0 a1.sinks.k2.rollCount = 0 #配置压缩文件格式 #a1.sinks.k1.hdfs.fileType = CompressedStream #a1.sinks.k2.hdfs.fileType = CompressedStream #配置压缩文件编码 #a1.sinks.k1.hdfs.codeC = lzop #a1.sinks.k2.hdfs.codeC = lzop #进行绑定 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sources.r1.channels=c1 a1.sources.r2.channels=c2
- 完成步骤五:编写启动脚本
启动脚本1:启动fluem01.sh 启动fulume的采集数据到kafak
#!/bin/bash #启动停止flume log 收集的服务 FLUME_HOME=/home/hadoop/apps/flume case $1 in "start"){ for i in dtinone20 dtinone21 do echo "----启动 $i 的 flumle服务-----" ssh $i "source /etc/profile;nohup /home/hadoop/apps/flume/bin/flume-ng agent -n a1 --conf conf -f /home/hadoop/apps/flume/conf/log-flume-kafka.conf -Dflume.root.logger=INFO,console >n ohupt.out 2>&1 &" done };; "stop"){ for i in dtinone20 dtinone21 do echo "----停止 $i 的 flumle服务-----" ssh $i "source /etc/profile;ps -ef|grep flume|grep -v grep|awk '{print \$2}'|xargs kill -9 " echo "---end-------------" done };; esac
启动Flume02.sh 消费kafka数据到hdfs
#!/bin/bash #启动 关闭 dtinone22机器上的 flume采集kafaka数据到hdfs目录下的程序 case $1 in "start"){ for i in dtinone22 do echo "---开启 $i ---fluem 程序------" ssh $i "source /etc/profile;nohup /home/hadoop/apps/flume/bin/flume-ng agent -n a1 --con f conf -f /home/hadoop/apps/flume/conf/kafka-flume-hdfs.conf -Dflume.root.logger=INFO,console >nohupt.out 2>&1 &" done };; "stop"){ for i in dtinone22 do echo "----停止 $i 的 flumle服务-----" ssh $i "source /etc/profile;ps -ef|grep flume|grep -v grep|awk '{print \$2}'|xargs kill - 9 " done };; esac