flume【源碼分析】分析Flume的啟動過程


前言

之前一直在用flume收集數據,也做了一些插件開發,但是一直沒整理相關的知識,最近感覺老是有一種知其然不知其所以然的感覺,所以從源碼入手希望能更透徹一點吧,越來越感覺會用不能掌握啊!別人幾個為啥就low了!

1.啟動入口

估計沒人關注過啟動入口在什么地方吧?啟動不報錯就可以直接去用了吧!

從這里可以看出flume的啟動入口是:org.apache.flume.node.Application 注意:記得用maven 安裝flume-ng-node 不然你找不到!因為有的開發用不到也就不裝了!

下面我們就來看該入口程序是如何來運行的:

try {

      boolean isZkConfigured = false;

      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);

      option = new Option("f", "conf-file", true,
          "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option(null, "no-reload-conf", false,
          "do not reload config file if changed");
      options.addOption(option);

      // Options for Zookeeper
      option = new Option("z", "zkConnString", true,
          "specify the ZooKeeper connection to use (required if -f missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("p", "zkBasePath", true,
          "specify the base path in ZooKeeper for agent configs");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("h", "help", false, "display help text");
      options.addOption(option);

      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);

      if (commandLine.hasOption('h')) {
        new HelpFormatter().printHelp("flume-ng agent", options, true);
        return;
      }

      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf");

      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
        isZkConfigured = true;
      }
      Application application = null;
      if (isZkConfigured) {
        // get options
        String zkConnectionStr = commandLine.getOptionValue('z');
        String baseZkPath = commandLine.getOptionValue('p');

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          List<LifecycleAware> components = Lists.newArrayList();
          PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
            new PollingZooKeeperConfigurationProvider(
              agentName, zkConnectionStr, baseZkPath, eventBus);
          components.add(zookeeperConfigurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
            new StaticZooKeeperConfigurationProvider(
              agentName, zkConnectionStr, baseZkPath);
          application = new Application();
          application.handleConfigurationEvent(zookeeperConfigurationProvider
            .getConfiguration());
        }
      } else {
        File configurationFile = new File(commandLine.getOptionValue('f'));

        /*
         * The following is to ensure that by default the agent will fail on
         * startup if the file does not exist.
         */
        if (!configurationFile.exists()) {
          // If command line invocation, then need to fail fast
          if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
            null) {
            String path = configurationFile.getPath();
            try {
              path = configurationFile.getCanonicalPath();
            } catch (IOException ex) {
              logger.error("Failed to read canonical path for file: " + path,
                ex);
            }
            throw new ParseException(
              "The specified configuration file does not exist: " + path);
          }
        }
        List<LifecycleAware> components = Lists.newArrayList();

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(
              agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(
              agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider
            .getConfiguration());
        }
      }
      application.start();

      final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });

    } catch (Exception e) {
      logger.error("A fatal error occurred while running. Exception follows.",
          e);
    }
  }
啟動main方法

附:flume每次啟動都會先判斷有沒有與當前配置的三大組件同名的組件存在,存在的話先停掉該組件,順序為source,sink,channel

其次是啟動所有當前配置的組件,啟動順序為channel,sink,source

以上啟動順序來源如下:

public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {  
    stopAllComponents();  
    startAllComponents(conf);  
  } 

這個地方說幾句:

1.前面一堆就是啟動命令中一些參數的解析,如果真想了解自己去看看源碼吧!

2.這里面有兩種形式配置文件,一種是連接zk讀取配置文件的,一種是讀取配置文件,反正我經常用的也是讀取配置文件的方式-f 那就說配置文件吧!

3.這里有一個機制,如果不帶--no-reload-conf這個參數,flume會自動加載配置參數 默認是30秒,現在不用再傻傻的修改完配置文件去重啟flume了吧!

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(
              agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(
              agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider
            .getConfiguration());
        }

PollingPropertiesFileConfigurationProvider該類是一個輪詢操作,每隔30秒會去檢查conf配置文件。

這個地方如果不是輪訓的方式,那么需要殺掉所有組件,在重啟所有組件。調用這兩個方法 stopAllComponents(); startAllComponents(conf);

configurationProvider.getConfiguration() 這個是重點好多配置,source 類型,source 和channel對接都在這個里面

1.請注意重點看一下loadxx方法

2.loadSources里面有個 SourceRunner.forSource(source)是指定source類型的:PollableSource,EventDrivenSourceRunner這個需要你在自己開發的時候根據需求自己繼承吧!

  public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }

    return runner;
  }
forSource
public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames =
            new HashSet<String>(channelComponentMap.keySet());
        for(String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.
              get(channelName);
          if(channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap = channelCache.
                get(channelComponent.channel.getClass());
            if(nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }
getConfiguration

這里通過文件修改時間來判斷是否配置文件被修改了,然后通過事件總線的post調用EventHandler,也就是被@Subscribe注解的方法:這個地方只需要添加這個注解就可以就會指定調用方法執行了。

@Subscribe  
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {  
    stopAllComponents();  
    startAllComponents(conf);  
  }

2.前面配置准備完后啟動程序

啟動程序:application.start();

  public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

這是對所有組件進行監督supervise,只有在flume啟動或者配置發生更改的時候會調用此監督方法

MonitorRunnable monitorRunnable = new MonitorRunnable();  
    monitorRunnable.lifecycleAware = lifecycleAware;  
    monitorRunnable.supervisoree = process;  
    monitorRunnable.monitorService = monitorService;  
  
    supervisedProcesses.put(lifecycleAware, process);  
  
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(  
        monitorRunnable, 0, 3, TimeUnit.SECONDS);  
    monitorFutures.put(lifecycleAware, future); 

方法里將每個組件納入了生命周期的管理中,每隔3秒會執行以下方法【在停止組件的時候,會調用unsupervisor方法,會給各個組件狀態賦值】:

1、判斷組件狀態

2、如果組件當前狀態不是組件預期的狀態,那么就要對預期狀態按照switch分支來執行相應的邏輯

MonitorRunnable 繼承了Runnable接口,重寫了run方法!

    @Override
    public void run() {
      logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
          supervisoree);

      long now = System.currentTimeMillis();

      try {
        if (supervisoree.status.firstSeen == null) {
          logger.debug("first time seeing {}", lifecycleAware);

          supervisoree.status.firstSeen = now;
        }

        supervisoree.status.lastSeen = now;
        synchronized (lifecycleAware) {
          if (supervisoree.status.discard) {
            // Unsupervise has already been called on this.
            logger.info("Component has already been stopped {}", lifecycleAware);
            return;
          } else if (supervisoree.status.error) {
            logger.info("Component {} is in error state, and Flume will not"
                + "attempt to change its state", lifecycleAware);
            return;
          }

          supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();

          if (!lifecycleAware.getLifecycleState().equals(
              supervisoree.status.desiredState)) {

            logger.debug("Want to transition {} from {} to {} (failures:{})",
                new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
                    supervisoree.status.desiredState,
                    supervisoree.status.failures });

            switch (supervisoree.status.desiredState) {
              case START:
                try {
                  lifecycleAware.start();
                } catch (Throwable e) {
                  logger.error("Unable to start " + lifecycleAware
                      + " - Exception follows.", e);
                  if (e instanceof Error) {
                    // This component can never recover, shut it down.
                    supervisoree.status.desiredState = LifecycleState.STOP;
                    try {
                      lifecycleAware.stop();
                      logger.warn("Component {} stopped, since it could not be"
                          + "successfully started due to missing dependencies",
                          lifecycleAware);
                    } catch (Throwable e1) {
                      logger.error("Unsuccessful attempt to "
                          + "shutdown component: {} due to missing dependencies."
                          + " Please shutdown the agent"
                          + "or disable this component, or the agent will be"
                          + "in an undefined state.", e1);
                      supervisoree.status.error = true;
                      if (e1 instanceof Error) {
                        throw (Error) e1;
                      }
                      // Set the state to stop, so that the conf poller can
                      // proceed.
                    }
                  }
                  supervisoree.status.failures++;
                }
                break;
              case STOP:
                try {
                  lifecycleAware.stop();
                } catch (Throwable e) {
                  logger.error("Unable to stop " + lifecycleAware
                      + " - Exception follows.", e);
                  if (e instanceof Error) {
                    throw (Error) e;
                  }
                  supervisoree.status.failures++;
                }
                break;
              default:
                logger.warn("I refuse to acknowledge {} as a desired state",
                    supervisoree.status.desiredState);
            }

            if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
              logger.error(
                  "Policy {} of {} has been violated - supervisor should exit!",
                  supervisoree.policy, lifecycleAware);
            }
          }
        }
      } catch(Throwable t) {
        logger.error("Unexpected error", t);
      }
      logger.debug("Status check complete");
    }
MonitorRunnable

注意:lifecycleAware.start(); 這個才是所有的核心,當判斷狀態后開始調用相關的start()方法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM