最近項目在用到flume,因此翻了下flume的代碼,
啟動腳本:
nohup bin/flume-ng agent -n tsdbflume -c conf -f conf/配置文件.conf -Dflume.root.logger=DEBUG,console &
翻下flume-ng的腳本, FLUME_AGENT_CLASS="org.apache.flume.node.Application"
從Application進入;
寫了下flume-agent啟動的時序圖:
這是簡單的啟動時序圖,時序圖里畫的都在flume-node這個項目里;
里邊其實最核心的是創建配置文件里的對象,這個工作是在AbstractConfigurationProvider.getConfiguration()這個方法里做的。先上代碼
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
// getFlumeConfiguration()方法,是關鍵核心,負責整個配置加載,下邊代碼說明 FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
下面說下這個類的具體加載過程。
PropertiesFileConfigurationProvider.getFlumeConfiguration()
@Override public FlumeConfiguration getFlumeConfiguration() { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); Properties properties = new Properties(); properties.load(reader); return new FlumeConfiguration(toMap(properties)); } catch (IOException ex) { LOGGER.error("Unable to load file:" + file + " (I/O failure) - Exception follows.", ex); } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { LOGGER.warn( "Unable to close file reader for file: " + file, ex); } } } return new FlumeConfiguration(new HashMap<String, String>()); }
new FlumeConfiguration(toMap(properties)),代碼在下邊:
下邊從代碼入手寫下怎么加載內容:
/** * 創建一個FlumeConfiguration對象,參數為:配置文件的key-value對 * Creates a populated Flume Configuration object. */ public FlumeConfiguration(Map<String, String> properties) { agentConfigMap = new HashMap<String, AgentConfiguration>(); errors = new LinkedList<FlumeConfigurationError>(); // Construct the in-memory component hierarchy for (String name : properties.keySet()) { String value = properties.get(name); // addRawProperty里對agentConfigMap初始化,
// 1:這里插入的是agentConfiguration對象里的contextMap if (!addRawProperty(name, value)) { logger.warn("Configuration property ignored: " + name + " = " + value); } } // Now iterate thru the agentContext and create agent configs and add them // to agentConfigMap // validate and remove improperly configured components
// 2:這里插入的是agentConfiguration對象里的configMap
validateConfiguration(); }
然后,進入FlumeConfiguration.addRawProperty(name,value):
private boolean addRawProperty(String name, String value) { // Null names and values not supported if (name == null || value == null) { errors .add(new FlumeConfigurationError("", "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } // Empty values are not supported if (value.trim().length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); return false; } // Remove leading and trailing spaces name = name.trim(); value = value.trim(); int index = name.indexOf('.'); // All configuration keys must have a prefix defined as agent name if (index == -1) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } String agentName = name.substring(0, index); // Agent name must be specified for all properties if (agentName.length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } String configKey = name.substring(index + 1); // Configuration key must be specified for every property if (configKey.length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.PROPERTY_NAME_NULL, ErrorOrWarning.ERROR)); return false; } AgentConfiguration aconf = agentConfigMap.get(agentName); // 這里創建AgentConfiguration,並插入到FlumeConfiguration的Map<String, AgentConfiguration> agentConfigMap中 if (aconf == null) { aconf = new AgentConfiguration(agentName, errors); agentConfigMap.put(agentName, aconf); } // Each configuration key must begin with one of the three prefixes: // sources, sinks, or channels.
// 最終,鍵值對被加載到agentConfiguration中
return aconf.addProperty(configKey, value); }
最終被加載到:agentConfiguration.addProperty(String key, String value)中
ComponentNameAndConfigKey cnck = parseConfigKey(key, BasicConfigurationConstants.CONFIG_SOURCES_PREFIX); if (cnck != null) { // it is a source String name = cnck.getComponentName(); Context srcConf = sourceContextMap.get(name); if (srcConf == null) { srcConf = new Context(); sourceContextMap.put(name, srcConf); } // sourceContextMap中存放new Context().put(cnck.getConifyKey(),value) srcConf.put(cnck.getConfigKey(), value); return true; }
所以最后是加載到AgentConfiguration中對應的*ContextMap中了;
public static class AgentConfiguration { private final String agentName; private String sources; private String sinks; private String channels; private String sinkgroups; private final Map<String, ComponentConfiguration> sourceConfigMap; private final Map<String, ComponentConfiguration> sinkConfigMap; private final Map<String, ComponentConfiguration> channelConfigMap; private final Map<String, ComponentConfiguration> sinkgroupConfigMap; private Map<String, Context> sourceContextMap; private Map<String, Context> sinkContextMap; private Map<String, Context> channelContextMap; private Map<String, Context> sinkGroupContextMap; private Set<String> sinkSet; private Set<String> sourceSet; private Set<String> channelSet; private Set<String> sinkgroupSet; private final List<FlumeConfigurationError> errorList; // ** 省略其他代碼 }
這里比較崩潰,跑來跑去的我們來梳理下:
先是FlumeConfiguration的addRawProperty方法執行put動作:
FlumeConfiguration.agentConfigMap.put(agentName, AgentConfiguration conf);
在這里創建AgentCponfiguration對象
aconf = new AgentCponfiguration(agentName, errors);
然后執行
aconf.addProperty(configKey, value), 將key,value插入到AgentConfiguration對象的*ContextMap中,至此完成鍵值對的保存工作;
最終所有的鍵值對都被保存在AgentConfiguration對象中;
對於AgentConfiguration的插入操作,也分為兩個部分,在FlumeConifiguration構造方法中,
1:addRawProperty(name,value);將kv鍵值對,插入到AgentConfiguration對象的contextMap中;
2:validateConfiguration;下邊跟進下validateConfiguration代碼;
validateConfiguration -> aconf.isValid() -> validateChannels(channelSet)
代碼注釋中,其實已經寫得比較詳細了
/** * If it is a known component it will do the full validation required for * that component, else it will do the validation required for that class. */ private Set<String> validateChannels(Set<String> channelSet) { Iterator<String> iter = channelSet.iterator(); Map<String, Context> newContextMap = new HashMap<String, Context>(); ChannelConfiguration conf = null; /* * The logic for the following code: * * Is it a known component? * -Yes: Get the ChannelType and set the string name of that to * config and set configSpecified to true. * -No.Look for config type for the given component: * -Config Found: * Set config to the type mentioned, set configSpecified to true * -No Config found: * Set config to OTHER, configSpecified to false, * do basic validation. Leave the context in the * contextMap to process later. Setting it to other returns * a vanilla configuration(Source/Sink/Channel Configuration), * which does basic syntactic validation. This object is not * put into the map, so the context is retained which can be * picked up - this is meant for older classes which don't * implement ConfigurableComponent. */ while (iter.hasNext()) { String channelName = iter.next(); Context channelContext = channelContextMap.get(channelName); // Context exists in map. if (channelContext != null) { // Get the configuration object for the channel: ChannelType chType = getKnownChannel(channelContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); boolean configSpecified = false; String config = null; // Not a known channel - cannot do specific validation to this channel if (chType == null) { config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { configSpecified = true; } } else { config = chType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } try { conf = (ChannelConfiguration) ComponentConfigurationFactory.create( channelName, config, ComponentType.CHANNEL); logger.debug("Created channel " + channelName); if (conf != null) { conf.configure(channelContext); } if ((configSpecified && conf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(channelName, channelContext); } else if (configSpecified) { channelConfigMap.put(channelName, conf); } if (conf != null) { errorList.addAll(conf.getErrors()); } } catch (ConfigurationException e) { // Could not configure channel - skip it. // No need to add to error list - already added before exception is // thrown if (conf != null) errorList.addAll(conf.getErrors()); iter.remove(); logger.warn("Could not configure channel " + channelName + " due to: " + e.getMessage(), e); } } else { iter.remove(); errorList.add(new FlumeConfigurationError(agentName, channelName, FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); } } channelContextMap = newContextMap; Set<String> tempchannelSet = new HashSet<String>(); tempchannelSet.addAll(channelConfigMap.keySet()); tempchannelSet.addAll(channelContextMap.keySet()); channelSet.retainAll(tempchannelSet); return channelSet; }
在validChannels中,根據配置文件中的channel的類型,創建ChannelConfiguration對象,然后插入到AgentConfiguration的configMap中;
到這里,一個完整的FlumeConfiguration對象已經完全加載好了;下面繼續;