Flume架构与源码分析-整体架构


最近在学习Flume源码,所以想写一份Flume源码学习的笔记供需要的朋友一起学习参考。

 

1、Flume介绍

Flume是cloudera公司开源的一款分布式、可靠地进行大量日志数据采集、聚合和并转移到存储中;通过事务机制提供了可靠的消息传输支持,自带负载均衡机制来支撑水平扩展;并且提供了一些默认组件供直接使用。

Flume目前常见的应用场景:日志--->Flume--->实时计算(如Kafka+Storm) 、日志--->Flume--->离线计算(如HDFS、HBase)、日志--->Flume--->ElasticSearch。

2、整体架构

Flume主要分为三个组件:Source、Channel、Sink;数据流如下图所示:


 

1、Source负责日志流入,比如从文件、网络、Kafka等数据源流入数据,数据流入的方式有两种轮训拉取和事件驱动;

2、Channel负责数据聚合/暂存,比如暂存到内存、本地文件、数据库、Kafka等,日志数据不会在管道停留很长时间,很快会被Sink消费掉;

3、Sink负责数据转移到存储,比如从Channel拿到日志后直接存储到HDFS、HBase、Kafka、ElasticSearch等,然后再有如Hadoop、Storm、ElasticSearch之类的进行数据分析或查询。

 

一个Agent会同时存在这三个组件,Source和Sink都是异步执行的,相互之间不会影响。

 

假设我们有采集并索引Nginx访问日志,我们可以按照如下方式部署:


 

1、Agent和Web Server是部署在同一台机器;

2、Source使用ExecSource并使用tail命令采集日志;

3、Channel使用MemoryChannel,因为日志数据丢点也不算什么大问题;

4、Sink使用ElasticSearchSink写入到ElasticSearch,此处可以配置多个ElasticSearch服务器IP:PORT列表以便提升处理能力。

 

以上介绍了日志是如何流的,对于复杂的日志采集,我们需要对Source日志进行过滤、写到多个Channel、对Sink进行失败处理/负载均衡等处理,这些Flume默认都提供了支持:


 

1、Source采集的日志会传入ChannelProcessor组件,其首先通过Interceptor进行日志过滤,如果接触过Servlet的话这个概念是类似的,可以参考《Servlet3.1规范翻译——过滤器 》 ;过滤器可以过滤掉日志,也可以修改日志内容;

2、过滤完成后接下来会交给ChannelSelector进行处理,默认提供了两种选择器:复制或多路复用选择器;复制即把一个日志复制到多个Channel;而多路复用会根据配置的选择器条件,把符合条件的路由到相应的Channel;在写多个Channel时可能存在存在失败的情况,对于失败的处理有两种:稍后重试或者忽略。重试一般采用指数级时间进行重试。

 

我们之前说过Source生产日志给Channel、Sink从Channel消费日志;它俩完全是异步的,因此Sink只需要监听自己关系的Channel变化即可。

 

到此我们可以对Source日志进行过滤/修改,把一个消息复制/路由到多个Channel,对于Sink的话也应该存在写失败的情况,Flume默认提供了如下策略:



 

默认策略就是一个Sink,失败了则这个事务就失败了,会稍后重试。

 

Flume还提供了故障转移策略:


 

Failover策略是给多个Sink定义优先级,假设其中一个失败了,则路由到下一个优先级的Sink;Sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s。

 

Flume也提供了负载均衡策略:

 

 

负载均衡算法默认提供了两种:轮训和随机;其通过抽象一个类似ChannelSelector的SinkSelector进行选择,失败补偿机制和Failover中的算法类似,但是默认是关闭失败补偿的,需要配置backoff参数为true开启。

 

到此Flume涉及的一些核心组件就介绍完了,对于Source和Sink如何异步、Channel提供的事务机制等我们后续分析组件时再讲。

 

假设我们需要采集非常多的客户端日志并对他们进行一些缓冲或集中的处理,就可以部署一个聚合层,整体架构类似于如下:


 1、首先是日志采集层,该层的Agent和应用部署在同一台机器上,负责采集如Nginx访问日志;然后通过RPC将日志流入到收集/聚合层;在这一层应该快速的采集到日志然后流入到收集/聚合层;

2、收集/聚合层进行日志的收集或聚合,并且可以进行容错处理,如故障转移或负载均衡,以提升可靠性;另外可以在该层开启文件Channel,做数据缓冲区;

3、收集/聚合层对数据进行过滤或修改然后进行存储或处理;比如存储到HDFS,或者流入Kafka然后通过Storm对数据进行实时处理。

 

 

到此从Flume核心组件到一般的部署架构我们就大体了解了,而涉及的一些实现细节在接下来的部分进行详细介绍。

 

Flume架构与源码分析-核心组件分析-1

博客分类:

 

首先所有核心组件都会实现org.apache.flume.lifecycle.LifecycleAware接口:

Java代码  
  1. public interface LifecycleAware {  
  2.   public void start();  
  3.   public void stop();  
  4.   public LifecycleState getLifecycleState();  
  5. }  

start方法在整个Flume启动时或者初始化组件时都会调用start方法进行组件初始化,Flume组件出现异常停止时会调用stop,getLifecycleState返回组件的生命周期状态,有IDLE, START, STOP, ERROR四个状态。

 

如果开发的组件需要配置,如设置一些属性;可以实现org.apache.flume.conf.Configurable接口: 

Java代码  
  1. public interface Configurable {  
  2.    public void configure(Context context);  
  3. }  

Flume在启动组件之前会调用configure来初始化组件一些配置。

 

1、Source

Source用于采集日志数据,有两种实现方式:轮训拉取和事件驱动机制;Source接口如下:

Java代码  
  1. public interface Source extends LifecycleAware, NamedComponent {  
  2.   public void setChannelProcessor(ChannelProcessor channelProcessor);  
  3.   public ChannelProcessor getChannelProcessor();  
  4. }   

Source接口首先继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是说它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;ChannelProcessor之前介绍过用来进行日志流的过滤和Channel的选择及调度。

 

而Source是通过SourceFactory工厂创建,默认提供了DefaultSourceFactory,其首先通过Enum类型org.apache.flume.conf.source.SourceType查找默认实现,如exec,则找到org.apache.flume.source.ExecSource实现,如果找不到直接Class.forName(className)创建。 

 

Source提供了两种机制: PollableSource(轮训拉取)和EventDrivenSource(事件驱动):


   

PollableSource默认提供了如下实现:

 

比如JMSSource实现使用javax.jms.MessageConsumer.receive(pollTimeout)主动去拉取消息。

 

EventDrivenSource默认提供了如下实现:


  

比如NetcatSource、HttpSource就是事件驱动,即被动等待;比如HttpSource就是内部启动了一个内嵌的Jetty启动了一个Servlet容器,通过FlumeHTTPServlet去接收消息。

 

Flume提供了SourceRunner用来启动Source的流转:

 

 

Java代码  
  1. public class EventDrivenSourceRunner extends SourceRunner {  
  2.   private LifecycleState lifecycleState;  
  3.   public EventDrivenSourceRunner() {  
  4.       lifecycleState = LifecycleState.IDLE; //启动之前是空闲状态  
  5.   }  
  6.   
  7.   @Override  
  8.   public void start() {  
  9.     Source source = getSource(); //获取Source  
  10.     ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器  
  11.     cp.initialize(); //初始化Channel处理器  
  12.     source.start();  //启动Source  
  13.     lifecycleState = LifecycleState.START; //本组件状态改成启动状态  
  14.   }  
  15.   @Override  
  16.   public void stop() {  
  17.     Source source = getSource(); //先停Source  
  18.     source.stop();  
  19.     ChannelProcessor cp = source.getChannelProcessor();  
  20.     cp.close();//再停Channel处理器  
  21.     lifecycleState = LifecycleState.STOP; //本组件状态改成停止状态  
  22.   }  
  23. }   

从本组件也可以看出:1、首先要初始化ChannelProcessor,其实现时初始化过滤器链;2、接着启动Source并更改本组件的状态。

 

Java代码  
  1. public class PollableSourceRunner extends SourceRunner {  
  2.  @Override  
  3.  public void start() {  
  4.   PollableSource source = (PollableSource) getSource();  
  5.   ChannelProcessor cp = source.getChannelProcessor();  
  6.   cp.initialize();  
  7.   source.start();  
  8.   
  9.   runner = new PollingRunner();  
  10.   runner.source = source;  
  11.   runner.counterGroup = counterGroup;  
  12.   runner.shouldStop = shouldStop;  
  13.   
  14.   runnerThread = new Thread(runner);  
  15.   runnerThread.setName(getClass().getSimpleName() + "-" +   
  16.       source.getClass().getSimpleName() + "-" + source.getName());  
  17.   runnerThread.start();   
  18.   
  19.   lifecycleState = LifecycleState.START;  
  20.  }  
  21. }   

 

而PollingRunner首先初始化组件,但是又启动了一个线程PollingRunner,其作用就是轮训拉取数据: 

Java代码  
  1.   @Override  
  2.   public void run() {  
  3.     while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行  
  4.       counterGroup.incrementAndGet("runner.polls");  
  5.   
  6.       try {  
  7.         //调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿  
  8.         if (source.process().equals(PollableSource.Status.BACKOFF)) {/  
  9.           counterGroup.incrementAndGet("runner.backoffs");  
  10.   
  11.           //失败补偿时暂停线程处理,等待超时时间之后重试  
  12.           Thread.sleep(Math.min(  
  13.               counterGroup.incrementAndGet("runner.backoffs.consecutive")  
  14.               * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));  
  15.         } else {  
  16.           counterGroup.set("runner.backoffs.consecutive", 0L);  
  17.         }  
  18.       } catch (InterruptedException e) {  
  19.                 }  
  20.       }  
  21.     }  
  22.   }  
  23. }   

Flume在启动时会判断Source是PollableSource还是EventDrivenSource来选择使用PollableSourceRunner还是EventDrivenSourceRunner。

 

 

比如HttpSource实现,其通过FlumeHTTPServlet接收消息然后: 

Java代码  
  1. List<Event> events = Collections.emptyList(); //create empty list  
  2. //首先从请求中获取Event  
  3. events = handler.getEvents(request);  
  4. //然后交给ChannelProcessor进行处理  
  5. getChannelProcessor().processEventBatch(events);   

到此基本的Source流程就介绍完了,其作用就是监听日志,采集,然后交给ChannelProcessor进行处理。

 

 

2、Channel

Channel用于连接Source和Sink,Source生产日志发送到Channel,Sink从Channel消费日志;也就是说通过Channel实现了Source和Sink的解耦,可以实现多对多的关联,和Source、Sink的异步化。     

 

之前Source采集到日志后会交给ChannelProcessor处理,那么接下来我们先从ChannelProcessor入手,其依赖三个组件: 

Java代码  
  1. private final ChannelSelector selector;  //Channel选择器  
  2. private final InterceptorChain interceptorChain; //拦截器链  
  3. private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现   

 

接下来看下其是如何处理Event的: 

Java代码  
  1. public void processEvent(Event event) {  
  2.   event = interceptorChain.intercept(event); //首先进行拦截器链过滤  
  3.   if (event == null) {  
  4.     return;  
  5.   }  
  6.   List<Event> events = new ArrayList<Event>(1);  
  7.   events.add(event);  
  8.   
  9.   //通过Channel选择器获取必须成功处理的Channel,然后事务中执行  
  10.   List<Channel> requiredChannels = selector.getRequiredChannels(event);  
  11.   for (Channel reqChannel : requiredChannels) {   
  12.     executeChannelTransaction(reqChannel, events, false);  
  13.   }  
  14.   
  15.   //通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理  
  16.   List<Channel> optionalChannels = selector.getOptionalChannels(event);  
  17.   for (Channel optChannel : optionalChannels) {  
  18.     execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));  
  19.   }  
  20. }   

 

另外内部还提供了批处理实现方法processEventBatch;对于内部事务实现的话可以参考executeChannelTransaction方法,整体事务机制类似于JDBC:

Java代码  
  1. private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {  
  2.   //1、获取Channel上的事务  
  3.   Transaction tx = channel.getTransaction();  
  4.   Preconditions.checkNotNull(tx, "Transaction object must not be null");  
  5.   try {  
  6.     //2、开启事务  
  7.     tx.begin();  
  8.     //3、在Channel上执行批量put操作  
  9.     for (Event event : batch) {  
  10.       channel.put(event);  
  11.     }  
  12.     //4、成功后提交事务  
  13.     tx.commit();  
  14.   } catch (Throwable t) {  
  15.     //5、异常后回滚事务  
  16.     tx.rollback();  
  17.     if (t instanceof Error) {  
  18.        LOG.error("Error while writing to channel: " +  
  19.            channel, t);  
  20.        throw (Error) t;  
  21.     } else if(!isOptional) {//如果是可选的Channel,异常忽略  
  22.        throw new ChannelException("Unable to put batch on required " +  
  23.              "channel: " + channel, t);  
  24.     }  
  25.   } finally {  
  26.     //最后关闭事务  
  27.     tx.close();  
  28.   }  
  29. }  

 

Interceptor用于过滤Event,即传入一个Event然后进行过滤加工,然后返回一个新的Event,接口如下:   

Java代码  
  1. public interface Interceptor {  
  2.     public void initialize();  
  3.     public Event intercept(Event event);  
  4.     public List<Event> intercept(List<Event> events);  
  5.     public void close();  
  6. }   

可以看到其提供了initialize和close方法用于启动和关闭;intercept方法用于过滤或加工Event。比如HostInterceptor拦截器用于获取本机IP然后默认添加到Event的字段为host的Header中。

  

接下来就是ChannelSelector选择器了,其通过如下方式创建: 

Java代码  
  1. //获取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating  
  2. ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();  
  3. //使用Source关联的Channel创建,比如agent.sources.s1.channels = c1 c2  
  4. ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);   

 

ChannelSelector默认提供了两种实现:复制和多路复用:


默认实现是复制选择器ReplicatingChannelSelector,即把接收到的消息复制到每一个Channel;多路复用选择器MultiplexingChannelSelector会根据Event Header中的参数进行选择,以此来选择使用哪个Channel。

 

而Channel是Event中转的地方,Source发布Event到Channel,Sink消费Channel的Event;Channel接口提供了如下接口用来实现Event流转:  

Java代码  
  1. public interface Channel extends LifecycleAware, NamedComponent {  
  2.   public void put(Event event) throws ChannelException;  
  3.   public Event take() throws ChannelException;  
  4.   public Transaction getTransaction();  
  5. }   

put用于发布Event,take用于消费Event,getTransaction用于事务支持。默认提供了如下Channel的实现: 



 对于Channel的实现我们后续单独章节介绍。

 

3、Sink

Sink从Channel消费Event,然后进行转移到收集/聚合层或存储层。Sink接口如下所示: 

Java代码  
  1. public interface Sink extends LifecycleAware, NamedComponent {  
  2.   public void setChannel(Channel channel);  
  3.   public Channel getChannel();  
  4.   public Status process() throws EventDeliveryException;  
  5.   public static enum Status {  
  6.     READY, BACKOFF  
  7.   }  
  8. }   

类似于Source,其首先继承了LifecycleAware,然后提供了Channel的getter/setter方法,并提供了process方法进行消费,此方法会返回消费的状态,READY或BACKOFF。

 

Sink也是通过SinkFactory工厂来创建,其也提供了DefaultSinkFactory默认工厂,比如传入hdfs,会先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相应的默认处理类org.apache.flume.sink.hdfs.HDFSEventSink,如果没找到默认处理类,直接通过Class.forName(className)进行反射创建。  

 

我们知道Sink还提供了分组功能,用于把多个Sink聚合为一组进行使用,内部提供了SinkGroup用来完成这个事情。此时问题来了,如何去调度多个Sink,其内部使用了SinkProcessor来完成这个事情,默认提供了故障转移和负载均衡两个策略。

 

首先SinkGroup就是聚合多个Sink为一组,然后将多个Sink传给SinkProcessorFactory进行创建SinkProcessor,而策略是根据配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance来选择的。

 

SinkProcessor提供了如下实现:

 

DefaultSinkProcessor:默认实现,用于单个Sink的场景使用。

FailoverSinkProcessor:故障转移实现: 

Java代码  
  1. public Status process() throws EventDeliveryException {  
  2.   Long now = System.currentTimeMillis();  
  3.     //1、首先检查失败队列的头部的Sink是否已经过了失败补偿等待时间了  
  4.   while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {  
  5.     //2、如果可以使用了,则从失败Sink队列获取队列第一个Sink  
  6.     FailedSink cur = failedSinks.poll();  
  7.     Status s;  
  8.     try {  
  9.       s = cur.getSink().process(); //3、使用此Sink进行处理  
  10.       if (s  == Status.READY) { //4、如果处理成功  
  11.         liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink队列  
  12.         activeSink = liveSinks.get(liveSinks.lastKey());  
  13.       } else {  
  14.         failedSinks.add(cur); //4.2、如果此时不是READY,即BACKOFF期间,再次放回失败队列  
  15.       }  
  16.       return s;  
  17.     } catch (Exception e) {  
  18.       cur.incFails(); //5、如果遇到异常了,则增加失败次数,并放回失败队列  
  19.       failedSinks.add(cur);  
  20.     }  
  21.   }  
  22.   
  23.   Status ret = null;  
  24.   while(activeSink != null) { //6、此时失败队列中没有Sink能处理了,那么需要使用存活Sink队列进行处理  
  25.     try {  
  26.       ret = activeSink.process();  
  27.       return ret;  
  28.     } catch (Exception e) { //7、处理失败进行转移到失败队列  
  29.       activeSink = moveActiveToDeadAndGetNext();  
  30.     }  
  31.   }  
  32.   
  33.   throw new EventDeliveryException("All sinks failed to process, " +  
  34.       "nothing left to failover to");  
  35. }  

 

失败队列是一个优先级队列,使用refresh属性排序,而refresh是通过如下机制计算的: 

Java代码  
  1. refresh = System.currentTimeMillis()  
  2.         + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);   

其中maxPenalty是最大等待时间,默认30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用于实现指数级等待时间递增, FAILURE_PENALTY是1s。

 

LoadBalanceSinkProcessor:用于实现Sink的负载均衡,其通过SinkSelector进行实现,类似于ChannelSelector。LoadBalanceSinkProcessor在启动时会根据配置,如agent.sinkgroups.g1.processor.selector = random进行选择,默认提供了两种选择器:


  

LoadBalanceSinkProcessor使用如下机制进行负载均衡: 

Java代码  
  1. public Status process() throws EventDeliveryException {  
  2.   Status status = null;  
  3.   //1、使用选择器创建相应的迭代器,也就是用来选择Sink的迭代器  
  4.   Iterator<Sink> sinkIterator = selector.createSinkIterator();  
  5.   while (sinkIterator.hasNext()) {  
  6.     Sink sink = sinkIterator.next();  
  7.     try {  
  8.       //2、选择器迭代Sink进行处理,如果成功直接break掉这次处理,此次负载均衡就算完成了  
  9.       status = sink.process();  
  10.       break;  
  11.     } catch (Exception ex) {  
  12.       //3、失败后会通知选择器,采取相应的失败退避补偿算法进行处理  
  13.       selector.informSinkFailed(sink);  
  14.       LOGGER.warn("Sink failed to consume event. "  
  15.           + "Attempting next sink if available.", ex);  
  16.     }  
  17.   }  
  18.   if (status == null) {  
  19.     throw new EventDeliveryException("All configured sinks have failed");  
  20.   }  
  21.   return status;  
  22. }   

 

如上的核心就是怎么创建迭代器,如何进行失败退避补偿处理,首先我们看下RoundRobinSinkSelector实现,其内部是通过通用的RoundRobinOrderSelector选择器实现: 

Java代码  
  1. public Iterator<T> createIterator() {  
  2.   //1、获取存活的Sink索引,  
  3.   List<Integer> activeIndices = getIndexList();  
  4.   int size = activeIndices.size();  
  5.   //2、如果上次记录的下一个存活Sink的位置超过了size,那么从队列头重新开始计数  
  6.   if (nextHead >= size) {  
  7.     nextHead = 0;  
  8.   }  
  9.   //3、获取本次使用的起始位置  
  10.   int begin = nextHead++;  
  11.   if (nextHead == activeIndices.size()) {  
  12.     nextHead = 0;  
  13.   }  
  14.   //4、从该位置开始迭代,其实现类似于环形队列,比如整个队列是5,起始位置是3,则按照 3、4、0、1、2的顺序进行轮训,实现了轮训算法   
  15.   int[] indexOrder = new int[size];  
  16.   for (int i = 0; i < size; i++) {  
  17.     indexOrder[i] = activeIndices.get((begin + i) % size);  
  18.   }  
  19.   //indexOrder是迭代顺序,getObjects返回相关的Sinks;  
  20.   return new SpecificOrderIterator<T>(indexOrder, getObjects());  
  21. }   

 

getIndexList实现如下: 

Java代码  
  1. protected List<Integer> getIndexList() {  
  2.   long now = System.currentTimeMillis();  
  3.   List<Integer> indexList = new ArrayList<Integer>();  
  4.   int i = 0;  
  5.   for (T obj : stateMap.keySet()) {  
  6.     if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {  
  7.       indexList.add(i);  
  8.     }  
  9.     i++;  
  10.   }  
  11.   return indexList;  
  12. }  

isShouldBackOff()表示是否开启退避算法支持,如果不开启,则认为每个Sink都是存活的,每次都会重试,通过agent.sinkgroups.g1.processor.backoff = true配置开启,默认false;restoreTime和之前介绍的refresh一样,是退避补偿等待时间,算法类似,就不多介绍了。 

 

那么什么时候调用Sink进行消费呢?其类似于SourceRunner,Sink提供了SinkRunner进行轮训拉取处理,SinkRunner会轮训调度SinkProcessor消费Channel的消息,然后调用Sink进行转移。SinkProcessor之前介绍过,其负责消息复制/路由。

 

SinkRunner实现如下: 

Java代码  
  1. public void start() {  
  2.   SinkProcessor policy = getPolicy();  
  3.   policy.start();  
  4.   runner = new PollingRunner();  
  5.   runner.policy = policy;  
  6.   runner.counterGroup = counterGroup;  
  7.   runner.shouldStop = new AtomicBoolean();  
  8.   runnerThread = new Thread(runner);  
  9.   runnerThread.setName("SinkRunner-PollingRunner-" +  
  10.       policy.getClass().getSimpleName());  
  11.   runnerThread.start();  
  12.   lifecycleState = LifecycleState.START;  
  13. }   

 

即获取SinkProcessor然后启动它,接着启动轮训线程去处理。PollingRunner线程负责轮训消息,核心实现如下: 

Java代码  
  1. public void run() {  
  2.   while (!shouldStop.get()) { //如果没有停止  
  3.     try {  
  4.       if (policy.process().equals(Sink.Status.BACKOFF)) {//如果处理失败了,进行退避补偿处理  
  5.         counterGroup.incrementAndGet("runner.backoffs");  
  6.         Thread.sleep(Math.min(  
  7.             counterGroup.incrementAndGet("runner.backoffs.consecutive")  
  8.             * backoffSleepIncrement, maxBackoffSleep)); //暂停退避补偿设定的超时时间  
  9.       } else {  
  10.         counterGroup.set("runner.backoffs.consecutive", 0L);  
  11.       }  
  12.     } catch (Exception e) {  
  13.       try {  
  14.         Thread.sleep(maxBackoffSleep); //如果遇到异常则等待最大退避时间  
  15.       } catch (InterruptedException ex) {  
  16.         Thread.currentThread().interrupt();  
  17.       }  
  18.     }  
  19.   }  
  20. }   

 

整体实现类似于PollableSourceRunner实现,整体处理都是交给SinkProcessor完成的。SinkProcessor会轮训Sink的process方法进行处理;此处以LoggerSink为例:

Java代码  
  1. @Override  
  2. public Status process() throws EventDeliveryException {  
  3.   Status result = Status.READY;  
  4.   Channel channel = getChannel();  
  5.   //1、获取事务  
  6.   Transaction transaction = channel.getTransaction();  
  7.   Event event = null;  
  8.   
  9.   try {  
  10.     //2、开启事务  
  11.     transaction.begin();  
  12.     //3、从Channel获取Event  
  13.     event = channel.take();  
  14.     if (event != null) {  
  15.       if (logger.isInfoEnabled()) {  
  16.         logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));  
  17.       }  
  18.     } else {//4、如果Channel中没有Event,则默认进入故障补偿机制,即防止死循环造成CPU负载高  
  19.       result = Status.BACKOFF;  
  20.     }  
  21.     //5、成功后提交事务  
  22.     transaction.commit();  
  23.   } catch (Exception ex) {  
  24.     //6、失败后回滚事务  
  25.     transaction.rollback();  
  26.     throw new EventDeliveryException("Failed to log event: " + event, ex);  
  27.   } finally {  
  28.     //7、关闭事务  
  29.     transaction.close();  
  30.   }  
  31.   return result;  
  32. }   

 

Sink中一些实现是支持批处理的,比如RollingFileSink:

Java代码  
  1. //1、开启事务  
  2. //2、批处理  
  3. for (int i = 0; i < batchSize; i++) {  
  4.   event = channel.take();  
  5.   if (event != null) {  
  6.     sinkCounter.incrementEventDrainAttemptCount();  
  7.     eventAttemptCounter++;  
  8.     serializer.write(event);  
  9.   }  
  10. }  
  11. //3、提交/回滚事务、关闭事务  

 

定义一个批处理大小然后在事务中执行批处理。 

 

Flume架构与源码分析-核心组件分析-2

博客分类:

 

4、整体流程

从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。

 

Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。

 

首先进入org.apache.flume.node.Application的main方法启动:

Java代码  
  1. //1、设置默认值启动参数、参数是否必须的  
  2. Options options = new Options();  
  3. Option option = new Option("n", "name", true, "the name of this agent");  
  4. option.setRequired(true);  
  5. options.addOption(option);  
  6.   
  7. option = new Option("f", "conf-file", true,  
  8. "specify a config file (required if -z missing)");  
  9. option.setRequired(false);  
  10. options.addOption(option);  
  11.   
  12. //2、接着解析命令行参数  
  13. CommandLineParser parser = new GnuParser();  
  14. CommandLine commandLine = parser.parse(options, args);  
  15.   
  16. String agentName = commandLine.getOptionValue('n');  
  17. boolean reload = !commandLine.hasOption("no-reload-conf");  
  18.   
  19. if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {  
  20.   isZkConfigured = true;  
  21. }  
  22.   
  23. if (isZkConfigured) {  
  24.     //3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解  
  25. } else {  
  26.   //4、打开配置文件,如果不存在则快速失败  
  27.   File configurationFile = new File(commandLine.getOptionValue('f'));  
  28.   if (!configurationFile.exists()) {  
  29.          throw new ParseException(  
  30.         "The specified configuration file does not exist: " + path);  
  31.   }  
  32.   List<LifecycleAware> components = Lists.newArrayList();  
  33.   
  34.   if (reload) { //5、如果需要定期reload配置文件,则走如下方式  
  35.     //5.1、此处使用Guava提供的事件总线  
  36.     EventBus eventBus = new EventBus(agentName + "-event-bus");  
  37.     //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次  
  38.     PollingPropertiesFileConfigurationProvider configurationProvider =  
  39.         new PollingPropertiesFileConfigurationProvider(  
  40.           agentName, configurationFile, eventBus, 30);  
  41.     components.add(configurationProvider);  
  42.     application = new Application(components); //5.3、向Application注册组件  
  43.     //5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法  
  44.     eventBus.register(application);  
  45.   
  46.   } else { //5、配置文件不支持定期reload  
  47.     PropertiesFileConfigurationProvider configurationProvider =  
  48.         new PropertiesFileConfigurationProvider(  
  49.           agentName, configurationFile);  
  50.     application = new Application();  
  51.     //6.2、直接使用配置文件初始化Flume组件  
  52.     application.handleConfigurationEvent(configurationProvider  
  53.       .getConfiguration());  
  54.   }  
  55. }  
  56. //7、启动Flume应用  
  57. application.start();  
  58.   
  59. //8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止  
  60. final Application appReference = application;  
  61. Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {  
  62.   @Override  
  63.   public void run() {  
  64.     appReference.stop();  
  65.   }  
  66. });  

 

以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:

1、读取命令行参数;

2、读取配置文件;

3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;

4、启动Application,并注册虚拟机关闭钩子。

 

handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件: 

Java代码  
  1. @Subscribe  
  2. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {  
  3.   stopAllComponents();  
  4.   startAllComponents(conf);  
  5. }   

MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。

 

对于startAllComponents实现大体如下: 

Java代码  
  1. //1、首先启动Channel  
  2. supervisor.supervise(Channels,  
  3.       new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);  
  4. //2、确保所有Channel是否都已启动  
  5. for(Channel ch: materializedConfiguration.getChannels().values()){  
  6.   while(ch.getLifecycleState() != LifecycleState.START  
  7.       && !supervisor.isComponentInErrorState(ch)){  
  8.     try {  
  9.       Thread.sleep(500);  
  10.     } catch (InterruptedException e) {  
  11.         Throwables.propagate(e);  
  12.     }  
  13.   }  
  14. }  
  15. //3、启动SinkRunner  
  16. supervisor.supervise(SinkRunners,    
  17. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);  
  18. //4、启动SourceRunner  
  19. supervisor.supervise(SourceRunner,  
  20. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);  
  21. //5、初始化监控服务  
  22. this.loadMonitoring();   

从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。

 

对于stopAllComponents实现大体如下:

Java代码  
  1. //1、首先停止SourceRunner  
  2. supervisor.unsupervise(SourceRunners);  
  3. //2、接着停止SinkRunner  
  4. supervisor.unsupervise(SinkRunners);  
  5. //3、然后停止Channel  
  6. supervisor.unsupervise(Channels);  
  7. //4、最后停止MonitorService  
  8. monitorServer.stop();   

此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。

 

Application中的start方法代码实现如下:

Java代码  
  1. public synchronized void start() {  
  2.   for(LifecycleAware component : components) {  
  3.     supervisor.supervise(component,  
  4.         new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);  
  5.   }  
  6. }   

其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。

 

而Application关闭执行了如下动作:    

Java代码  
  1. public synchronized void stop() {  
  2.   supervisor.stop();  
  3.   if(monitorServer != null) {  
  4.     monitorServer.stop();  
  5.   }  
  6. }   

即关闭守护哨兵和监控服务。

 

到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。 

 

整体流程可以总结为:

1、首先初始化命令行配置;

2、接着读取配置文件;

3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;

4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;

5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;

6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

 

 

轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。

 

首先创建LifecycleSupervisor:

Java代码  
  1. //1、用于存放被守护的组件  
  2. supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();  
  3. //2、用于存放正在被监控的组件  
  4. monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();  
  5. //3、创建监控服务线程池  
  6. monitorService = new ScheduledThreadPoolExecutor(10,  
  7.     new ThreadFactoryBuilder().setNameFormat(  
  8.         "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")  
  9.         .build());  
  10. monitorService.setMaximumPoolSize(20);  
  11. monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);  
  12. //4、定期清理被取消的组件  
  13. purger = new Purger();  
  14. //4.1、默认不进行清理  
  15. needToPurge = false;   

LifecycleSupervisor启动时会进行如下操作:

Java代码  
  1. public synchronized void start() {  
  2.   monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);  
  3.   lifecycleState = LifecycleState.START;  
  4. }   

首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:

Java代码  
  1. //1、首先停止守护监控服务  
  2. if (monitorService != null) {  
  3.   monitorService.shutdown();  
  4.   try {  
  5.     monitorService.awaitTermination(10, TimeUnit.SECONDS);  
  6.   } catch (InterruptedException e) {  
  7.     logger.error("Interrupted while waiting for monitor service to stop");  
  8.   }  
  9. }  
  10. //2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止  
  11. for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {  
  12.   if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {  
  13.     entry.getValue().status.desiredState = LifecycleState.STOP;  
  14.     entry.getKey().stop();  
  15.   }  
  16. }  
  17. //3、更新本组件状态  
  18. if (lifecycleState.equals(LifecycleState.START)) {  
  19.   lifecycleState = LifecycleState.STOP;  
  20. }  
  21. //4、最后的清理  
  22. supervisedProcesses.clear();  
  23. monitorFutures.clear();   

 

接下来就是调用supervise进行组件守护了:

Java代码  
  1.   if(this.monitorService.isShutdown() || this.monitorService.isTerminated()  
  2.   || this.monitorService.isTerminating()){  
  3.     //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护  
  4.   }  
  5.   //2、初始化守护组件  
  6.   Supervisoree process = new Supervisoree();  
  7.   process.status = new Status();  
  8.   //2.1、默认策略是失败重启  
  9.   process.policy = policy;  
  10.   //2.2、初始化组件默认状态,大多数组件默认为START  
  11.   process.status.desiredState = desiredState;  
  12.   process.status.error = false;  
  13.   //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件  
  14.   MonitorRunnable monitorRunnable = new MonitorRunnable();  
  15.   monitorRunnable.lifecycleAware = lifecycleAware;  
  16.   monitorRunnable.supervisoree = process;  
  17.   monitorRunnable.monitorService = monitorService;  
  18.   
  19.   supervisedProcesses.put(lifecycleAware, process);  
  20.   //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件  
  21.   ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(  
  22.       monitorRunnable, 0, 3, TimeUnit.SECONDS);  
  23.   monitorFutures.put(lifecycleAware, future);  
  24. }  

 

如果不需要守护了,则需要调用unsupervise:

Java代码  
  1. public synchronized void unsupervise(LifecycleAware lifecycleAware) {  
  2.   synchronized (lifecycleAware) {  
  3.     Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);  
  4.     //1.1、设置守护组件的状态为被丢弃  
  5.     supervisoree.status.discard = true;  
  6.     //1.2、设置组件盼望的最新生命周期状态为STOP  
  7.     this.setDesiredState(lifecycleAware, LifecycleState.STOP);  
  8.     //1.3、停止组件  
  9.     lifecycleAware.stop();  
  10.   }  
  11.   //2、从守护组件中移除  
  12.   supervisedProcesses.remove(lifecycleAware);  
  13.   //3、取消定时监控组件服务  
  14.   monitorFutures.get(lifecycleAware).cancel(false);  
  15.   //3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件  
  16.   needToPurge = true;  
  17.   monitorFutures.remove(lifecycleAware);  
  18. }  

 

接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:

Java代码  
  1. public void run() {  
  2.   long now = System.currentTimeMillis();  
  3.   try {  
  4.     if (supervisoree.status.firstSeen == null) {  
  5.         supervisoree.status.firstSeen = now; //1、记录第一次状态查看时间  
  6.     }  
  7.     supervisoree.status.lastSeen = now; //2、记录最后一次状态查看时间  
  8.     synchronized (lifecycleAware) {  
  9.         //3、如果守护组件被丢弃或出错了,则直接返回  
  10.         if (supervisoree.status.discard || supervisoree.status.error) {  
  11.           return;  
  12.         }  
  13.         //4、更新最后一次查看到的状态  
  14.         supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();  
  15.         //5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化  
  16.         if (!lifecycleAware.getLifecycleState().equals(  
  17.             supervisoree.status.desiredState)) {  
  18.           switch (supervisoree.status.desiredState) {   
  19.             case START: //6、如果是启动状态,则启动组件  
  20.              try {  
  21.                 lifecycleAware.start();  
  22.               } catch (Throwable e) {  
  23.                 if (e instanceof Error) {  
  24.                   supervisoree.status.desiredState = LifecycleState.STOP;  
  25.                   try {  
  26.                     lifecycleAware.stop();  
  27.                   } catch (Throwable e1) {  
  28.                     supervisoree.status.error = true;  
  29.                     if (e1 instanceof Error) {  
  30.                       throw (Error) e1;  
  31.                     }  
  32.                   }  
  33.                 }  
  34.                 supervisoree.status.failures++;  
  35.               }  
  36.               break;  
  37.             case STOP: //7、如果是停止状态,则停止组件  
  38.               try {  
  39.                 lifecycleAware.stop();  
  40.               } catch (Throwable e) {  
  41.                 if (e instanceof Error) {  
  42.                   throw (Error) e;  
  43.                 }  
  44.                 supervisoree.status.failures++;  
  45.               }  
  46.               break;  
  47.             default:  
  48.           }  
  49.     } catch(Throwable t) {  
  50.     }  
  51.   }  
  52. }  

 

如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。 

 

 

日志是系统数据的基石,对于系统的安全来说非常重要,它记录了系统每天发生的各种各样的事情,用户可以通过它来检查错误发生的原因,或者寻找受到攻击时攻击者留下的痕迹。日志主要的功能是审计和监测。它还可以实时地监测系统状态,监测和追踪侵入者。现在互联网上存在的日志组件各种各样,我们这里主要讲的是Flume。

Flume 发展历史

    Cloudera 开发的分布式日志收集系统 Flume,是 hadoop 周边组件之一。其可以实时的将分布在不同节点、机器上的日志收集到 hdfs 中。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,这点可以在 BigInsights 产品文档的 troubleshooting 板块发现。为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。当然我们现在用的是Flume NG,所以不再讲Flume OG的内容。

Flume定义

    Flume是一个高可用,高可靠,分布式海量日志采集、聚合和传输系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume架构介绍

  Flume日志收集结构图
 

 

 




    Flume 的核心是把数据从数据源收集过来,再送到目的地。
    为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
    Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。
    Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。
    Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
    Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含有三个核心组件,分别是 Source、Channel、Sink。
    Source 可以接收外部源发送过来的数据。不同的 Source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。
    Channel 是一个存储地,接收 Source 的输出,直到有 Sink 消费掉 Channel 中的数据。
    Channel 中的数据直到进入到下一个Channel中或者进入终端才会被删除。
    当 Sink 写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。
    Sink 会消费 Channel 中的数据,然后送给外部源或者其他 Source。如数据可以写入到 HDFS 或者 HBase 中。

Flume 核心概念整理

Agent Agent中包含多个sources和sinks。
Client 生产数据,运行在一个独立的线程。
Source 从Client收集数据,传递给Channel。用来消费传递到该组件的Event。
Sink 从Channel收集数据,将Event传递到Flow Pipeline中的下一个Agent。
Channel 中转Event临时存储,保存Source传递过来Event,连接 sources 和 sinks 。
Events 一个数据单元,带有一个可选的消息头。可以是日志记录、 avro 对象等。

Flume 特点

    flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

   Agent是Flume中最小的运行单位,一个Agent中由Source、Sink和Channel三个组件构成。
Event是Flume中基本数据单位,Event中包含有传输数据及数据头数据包


如下图所示:


 

 




值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。
比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。

如下图所示:


 

 




Flume 整体架构总结

    Flume架构整体上看就是 source-->channel-->sink 的三层架构,类似生成者和消费者的架构,他们之间通过queue(channel)传输,解耦。
    Source:完成对日志数据的收集,分成 transtion 和 event 打入到channel之中。
    Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

    对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。

Flume 下载、安装

安装JDK

1.将下载好的JDK包解压,比如我的解压到 /home/liuqing/jdk1.7.0_72 目录下
2.配置环境变量
   在/etc/profile 文件中添加
  

  1. export JAVA_HOME=/home/liuqing/jdk1.7.0_72  
  2. export PATH=$JAVA_HOME/bin:$PATH  
  3. export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$CLASS_PATH  

 
3.执行source profile
4.在命令行输入 java -version
   出现:
   java version "1.7.0_72"
   Java(TM) SE Runtime Environment (build 1.7.0_72-b14)
   Java HotSpot(TM) 64-Bit Server VM (build 24.72-b04, mixed mode)
   表示安装成功

安装Flume

  1. 从官网 http://flume.apache.org/download.html 下载最新的安装包
  2. 解压缩,比如我的解压到 /home/liuqing/hadoop/flume目录
  3. 修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置
     JAVA_HOME=/home/liuqing/jdk1.7.0_72
  4. 验证是否安装成功
     root@ubuntu:/home/liuqing/hadoop/flume/bin# ./flume-ng version
     出现:
     Flume 1.6.0
     Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
     表示安装成功




案例

案例1. 单节点 Flume配置

1. 新建配置文件,配置文件示例

  1. # example.conf: A single-node Flume configuration  
  2. # agent组件名称  
  3. a1.sources = r1  
  4. a1.sinks = k1  
  5. a1.channels = c1  
  6.   
  7. # source 配置  
  8. a1.sources.r1.type = netcat  
  9. a1.sources.r1.bind = localhost  
  10. a1.sources.r1.port = 44444  
  11.   
  12. # sink 配置  
  13. a1.sinks.k1.type = logger  
  14.   
  15. # 使用内存中Buffer Event Channel  
  16. a1.channels.c1.type = memory  
  17. a1.channels.c1.capacity = 1000  
  18. a1.channels.c1.transactionCapacity = 100  
  19.   
  20. # 绑定 source 和 sink 到channel  
  21. a1.sources.r1.channels = c1  
  22. a1.sinks.k1.channel = c1  

 

将上述配置存为:/home/liuqing/hadoop/flume/conf/example.conf

2. 然后我们就可以启动 Flume 了:
在/home/liuqing/hadoop/flume路径下运行:

  1. bin/flume-ngagent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console  

 

其中 -c/--conf 后跟配置目录,-f/--conf-file 后跟具体的配置文件,-n/--name 指定agent的名称

3. 然后我们再开一个 shell 终端窗口,telnet 上配置中侦听的端口,就可以发消息看到效果了:

  1. $ telnet localhost 44444  
  2. Trying 127.0.0.1...  
  3. Connected to localhost.localdomain (127.0.0.1).  
  4. Escape character is '^]'.  
  5. Hello world! <ENTER>  
  6. OK  

 


4.Flume 终端窗口此时会打印出如下信息,就表示成功了

  1. 12/06/1915:32:19 INFO source.NetcatSource: Source starting  
  2. 12/06/1915:32:19 INFO source.NetcatSource: Created  serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]  
  3. 12/06/1915:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }  

 

至此,咱们的第一个 Flume Agent 算是部署成功了!

案例2. 结合实际项目

参考:https://github.com/gilt/logback-flume-appender

1. 在/home/liuqing/hadoop/flume/conf/下新建配置文件 test.conf

  1. agent1.sources = source1  
  2. agent1.sinks = sink1  
  3. agent1.channels = channel1  
  4. # Describe/configure source1  
  5. agent1.sources.source1.type = avro  
  6. agent1.sources.source1.bind = 0.0.0.0  
  7. agent1.sources.source1.port = 44444  
  8.   
  9. # Describe sink1  
  10. #日志文件按时间生成  
  11. #agent1.sinks.sink1.type = FILE_ROLL  
  12. #agent1.sinks.sink1.sink.directory = /home/liuqing/hadoop/flume/flume-out  
  13. #agent1.sinks.sink1.sink.rollInterval = 1800  
  14. #agent1.sinks.sink1.batchSize = 5  
  15.   
  16. #日志文件根据大小生成  
  17. #生成目录在conf文件夹下的log4j.properties可以配置  
  18. agent1.sinks.sink1.type = logger  
  19.   
  20. # Use a channel which buffers events in memory  
  21. agent1.channels.channel1.type = memory  
  22. agent1.channels.channel1.capacity = 1000  
  23. agent1.channels.channel1.transactionCapactiy = 100  
  24.   
  25. # Bind the source and sink to the channel  
  26. agent1.sources.source1.channels = channel1  
  27. agent1.sinks.sink1.channel = channel1  

 


2. 项目已经配好了logback.xml 文件
   在logback.xml文件中添加
  

  1.  <appender name="flumeApplender"  
  2. class="com.xxx.hd.extended.log.flume.FlumeLogstashV1Appender">  
  3. <flumeAgents>  
  4. 192.168.23.235:44444,  
  5. </flumeAgents>  
  6. <flumeProperties>  
  7. connect-timeout=4000;  
  8. request-timeout=8000  
  9. </flumeProperties>  
  10. <batchSize>2048</batchSize>  
  11. <reportingWindow>20480</reportingWindow>  
  12. <additionalAvroHeaders>  
  13. myHeader=myValue  
  14. </additionalAvroHeaders>  
  15. <application>ProjectName</application>  
  16. <layout class="ch.qos.logback.classic.PatternLayout">  
  17. <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %msg%n%ex</pattern>  
  18. </layout>  
  19. </appender>  

 

3. 然后我们就可以启动 Flume 了:
  在/home/liuqing/hadoop/flume路径下运行:

  1. bin/flume-ng agent --conf ./conf/ -f conf/lqtest.conf -n agent1  

 

4. 现在日志会打印到/home/liuqing/hadoop/flume/logs目录下
   日志文件满128M就会自动建一个新的


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM