關於flume配置加載


最近項目在用到flume,因此翻了下flume的代碼,

啟動腳本:

  nohup bin/flume-ng agent -n tsdbflume -c conf -f conf/配置文件.conf -Dflume.root.logger=DEBUG,console &

翻下flume-ng的腳本,  FLUME_AGENT_CLASS="org.apache.flume.node.Application"

從Application進入;

 

寫了下flume-agent啟動的時序圖:

這是簡單的啟動時序圖,時序圖里畫的都在flume-node這個項目里;

里邊其實最核心的是創建配置文件里的對象,這個工作是在AbstractConfigurationProvider.getConfiguration()這個方法里做的。先上代碼

  public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
   // getFlumeConfiguration()方法,是關鍵核心,負責整個配置加載,下邊代碼說明 FlumeConfiguration fconfig
= getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }

下面說下這個類的具體加載過程。

PropertiesFileConfigurationProvider.getFlumeConfiguration()

  @Override
  public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      Properties properties = new Properties();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }

new FlumeConfiguration(toMap(properties)),代碼在下邊:

下邊從代碼入手寫下怎么加載內容:

  /**
   * 創建一個FlumeConfiguration對象,參數為:配置文件的key-value對
   * Creates a populated Flume Configuration object.
   */
  public FlumeConfiguration(Map<String, String> properties) {
    agentConfigMap = new HashMap<String, AgentConfiguration>();
    errors = new LinkedList<FlumeConfigurationError>();
    // Construct the in-memory component hierarchy
    for (String name : properties.keySet()) {
      String value = properties.get(name);

      // addRawProperty里對agentConfigMap初始化,  
// 1:這里插入的是agentConfiguration對象里的contextMap
if (!addRawProperty(name, value)) { logger.warn("Configuration property ignored: " + name + " = " + value); } } // Now iterate thru the agentContext and create agent configs and add them // to agentConfigMap // validate and remove improperly configured components
// 2:這里插入的是agentConfiguration對象里的configMap
validateConfiguration(); }

然后,進入FlumeConfiguration.addRawProperty(name,value):

  private boolean addRawProperty(String name, String value) {
    // Null names and values not supported
    if (name == null || value == null) {
      errors
          .add(new FlumeConfigurationError("", "",
              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
              ErrorOrWarning.ERROR));
      return false;
    }

    // Empty values are not supported
    if (value.trim().length() == 0) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
              ErrorOrWarning.ERROR));
      return false;
    }

    // Remove leading and trailing spaces
    name = name.trim();
    value = value.trim();

    int index = name.indexOf('.');

    // All configuration keys must have a prefix defined as agent name
    if (index == -1) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
              ErrorOrWarning.ERROR));
      return false;
    }

    String agentName = name.substring(0, index);

    // Agent name must be specified for all properties
    if (agentName.length() == 0) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
              ErrorOrWarning.ERROR));
      return false;
    }

    String configKey = name.substring(index + 1);

    // Configuration key must be specified for every property
    if (configKey.length() == 0) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
              ErrorOrWarning.ERROR));
      return false;
    }

    AgentConfiguration aconf = agentConfigMap.get(agentName);

    // 這里創建AgentConfiguration,並插入到FlumeConfiguration的Map<String, AgentConfiguration> agentConfigMap中
    if (aconf == null) {
      aconf = new AgentConfiguration(agentName, errors);
      agentConfigMap.put(agentName, aconf);
    }

    // Each configuration key must begin with one of the three prefixes:
    // sources, sinks, or channels.
// 最終,鍵值對被加載到agentConfiguration中
return aconf.addProperty(configKey, value); }

最終被加載到:agentConfiguration.addProperty(String key, String value)中

      ComponentNameAndConfigKey cnck = parseConfigKey(key,
          BasicConfigurationConstants.CONFIG_SOURCES_PREFIX);

      if (cnck != null) {
        // it is a source
        String name = cnck.getComponentName();
        Context srcConf = sourceContextMap.get(name);

        if (srcConf == null) {
          srcConf = new Context();
          sourceContextMap.put(name, srcConf);
        }
        // sourceContextMap中存放new Context().put(cnck.getConifyKey(),value)
        srcConf.put(cnck.getConfigKey(), value);
        return true;
      }

所以最后是加載到AgentConfiguration中對應的*ContextMap中了;

  public static class AgentConfiguration {

    private final String agentName;
    private String sources;
    private String sinks;
    private String channels;
    private String sinkgroups;

    private final Map<String, ComponentConfiguration> sourceConfigMap;
    private final Map<String, ComponentConfiguration> sinkConfigMap;
    private final Map<String, ComponentConfiguration> channelConfigMap;
    private final Map<String, ComponentConfiguration> sinkgroupConfigMap;

    private Map<String, Context> sourceContextMap;
    private Map<String, Context> sinkContextMap;
    private Map<String, Context> channelContextMap;
    private Map<String, Context> sinkGroupContextMap;

    private Set<String> sinkSet;
    private Set<String> sourceSet;
    private Set<String> channelSet;
    private Set<String> sinkgroupSet;

    private final List<FlumeConfigurationError> errorList;

    // ** 省略其他代碼
    
}

這里比較崩潰,跑來跑去的我們來梳理下:

先是FlumeConfiguration的addRawProperty方法執行put動作:

  FlumeConfiguration.agentConfigMap.put(agentName, AgentConfiguration conf);

在這里創建AgentCponfiguration對象

  aconf = new AgentCponfiguration(agentName, errors);

然后執行

  aconf.addProperty(configKey, value), 將key,value插入到AgentConfiguration對象的*ContextMap中,至此完成鍵值對的保存工作;

最終所有的鍵值對都被保存在AgentConfiguration對象中;

 

對於AgentConfiguration的插入操作,也分為兩個部分,在FlumeConifiguration構造方法中,

1:addRawProperty(name,value);將kv鍵值對,插入到AgentConfiguration對象的contextMap中;

2:validateConfiguration;下邊跟進下validateConfiguration代碼;

 

validateConfiguration -> aconf.isValid() -> validateChannels(channelSet)

代碼注釋中,其實已經寫得比較詳細了

    /**
     * If it is a known component it will do the full validation required for
     * that component, else it will do the validation required for that class.
     */
    private Set<String> validateChannels(Set<String> channelSet) {
      Iterator<String> iter = channelSet.iterator();
      Map<String, Context> newContextMap = new HashMap<String, Context>();
      ChannelConfiguration conf = null;
      /*
       * The logic for the following code:
       *
       * Is it a known component?
       *  -Yes: Get the ChannelType and set the string name of that to
       *        config and set configSpecified to true.
       *  -No.Look for config type for the given component:
       *      -Config Found:
       *        Set config to the type mentioned, set configSpecified to true
       *      -No Config found:
       *        Set config to OTHER, configSpecified to false,
       *        do basic validation. Leave the context in the
       *        contextMap to process later. Setting it to other returns
       *        a vanilla configuration(Source/Sink/Channel Configuration),
       *        which does basic syntactic validation. This object is not
       *        put into the map, so the context is retained which can be
       *        picked up - this is meant for older classes which don't
       *        implement ConfigurableComponent.
       */
      while (iter.hasNext()) {
        String channelName = iter.next();
        Context channelContext = channelContextMap.get(channelName);
        // Context exists in map.
        if (channelContext != null) {
          // Get the configuration object for the channel:
          ChannelType chType = getKnownChannel(channelContext.getString(
              BasicConfigurationConstants.CONFIG_TYPE));
          boolean configSpecified = false;
          String config = null;
          // Not a known channel - cannot do specific validation to this channel
          if (chType == null) {
            config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG);
            if (config == null || config.isEmpty()) {
              config = "OTHER";
            } else {
              configSpecified = true;
            }
          } else {
            config = chType.toString().toUpperCase(Locale.ENGLISH);
            configSpecified = true;
          }

          try {
            conf =
                (ChannelConfiguration) ComponentConfigurationFactory.create(
                    channelName, config, ComponentType.CHANNEL);
            logger.debug("Created channel " + channelName);
            if (conf != null) {
              conf.configure(channelContext);
            }
            if ((configSpecified && conf.isNotFoundConfigClass()) ||
                !configSpecified) {
              newContextMap.put(channelName, channelContext);
            } else if (configSpecified) {
              channelConfigMap.put(channelName, conf);
            }
            if (conf != null) {
              errorList.addAll(conf.getErrors());
            }
          } catch (ConfigurationException e) {
            // Could not configure channel - skip it.
            // No need to add to error list - already added before exception is
            // thrown
            if (conf != null) errorList.addAll(conf.getErrors());
            iter.remove();
            logger.warn("Could not configure channel " + channelName
                + " due to: " + e.getMessage(), e);

          }
        } else {
          iter.remove();
          errorList.add(new FlumeConfigurationError(agentName, channelName,
              FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
        }
      }
      channelContextMap = newContextMap;
      Set<String> tempchannelSet = new HashSet<String>();
      tempchannelSet.addAll(channelConfigMap.keySet());
      tempchannelSet.addAll(channelContextMap.keySet());
      channelSet.retainAll(tempchannelSet);
      return channelSet;
    }

 

在validChannels中,根據配置文件中的channel的類型,創建ChannelConfiguration對象,然后插入到AgentConfiguration的configMap中;

到這里,一個完整的FlumeConfiguration對象已經完全加載好了;下面繼續;

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM