Configuration是Flume項目的入口程序了,當我們輸入
bin/flume-ng agent --conf conf --conf-file conf/kafka1.properties --name test -Dflume.root.logger=INFO,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
后,腳本會導入環境變量,並且啟動org.apache.flume.node.Application
。
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
# finally, invoke the appropriate command
# 判斷是agent,然后調用run_flume
if [ -n "$opt_agent" ] ; then
run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
然后調用Application
類的main方法,這個方法里面加載了配置,並且啟動了每個組件。
public static void main(String[] args) {
try {
//flume 的zookeeper在1.7版本中還是一個實驗特性
boolean isZkConfigured = false;
//設置一些必要的參數
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
option = new Option(null, "no-reload-conf", false,
"do not reload config file if changed");
options.addOption(option);
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
"specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);
option = new Option("p", "zkBasePath", true,
"specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
Application application = null;
if (isZkConfigured) {
// get options
String zkConnectionStr = commandLine.getOptionValue('z');
String baseZkPath = commandLine.getOptionValue('p');
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
List<LifecycleAware> components = Lists.newArrayList();
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
new PollingZooKeeperConfigurationProvider(
agentName, zkConnectionStr, baseZkPath, eventBus);
components.add(zookeeperConfigurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
new StaticZooKeeperConfigurationProvider(
agentName, zkConnectionStr, baseZkPath);
application = new Application();
application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
}
} else {
File configurationFile = new File(commandLine.getOptionValue('f'));
/*
* The following is to ensure that by default the agent will fail on
* startup if the file does not exist.
*/
if (!configurationFile.exists()) {
// If command line invocation, then need to fail fast
if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
null) {
String path = configurationFile.getPath();
try {
path = configurationFile.getCanonicalPath();
} catch (IOException ex) {
logger.error("Failed to read canonical path for file: " + path,
ex);
}
throw new ParseException(
"The specified configuration file does not exist: " + path);
}
}
List<LifecycleAware> components = Lists.newArrayList();
//如果reload為真,每過30秒鍾加載一次配置文件
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
//通過PollingPropertiesFileConfigurationProvider來創建一個線程,每隔30秒讀取一次配置文件
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
//一次性加載配置文件
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
}
//依次啟動每個應用component
application.start();
//在應用程序結束的時候,調用stop()函數。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
} catch (Exception e) {
logger.error("A fatal error occurred while running. Exception follows.", e);
}
}
在這個里面使用了PollingPropertiesFileConfigurationProvider
和 PropertiesFileConfigurationProvider
兩個類,實際作用是提供每個組件的配置。
他們的類圖如下:
ConfigurationProvider
是一個接口,所有***ConfigurationProvider
都是為了各種組件提供配置。
public interface ConfigurationProvider {
MaterializedConfiguration getConfiguration();
}
中間有一個抽象類,public abstract class AbstractConfigurationProvider implements ConfigurationProvider
,它會實現getConfiguration()接口,為每個一個組件添加配置。
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
//獲取配置,getFlumeConfiguration這個方法會在不同的子類中進行實現。
FlumeConfiguration fconfig = getFlumeConfiguration();
//獲取不同agent的配置
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
//加載channels,source,sinks,這里會創建出對應的對象
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
//如果某個channel沒有和source、sink做關聯,就刪除掉
//如果關聯着,就加入到conf里面,
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
//將source、sink加入從里面
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
話說里面的名字起的比較奇怪,SourceRunner
,SinkRunner
,ChannelComponent
。
前面兩個都是Runner
,后面就是Component
。
接下來就是public class PropertiesFileConfigurationProvider extends AbstractConfigurationProvider
在這個類里面實現了getFlumeConfiguration()
方法。
最后就是
public class PollingPropertiesFileConfigurationProvider extends PropertiesFileConfigurationProvider implements LifecycleAware
這個類,就是實現了每隔30秒讀取一次配置文件。它的start
函數里面啟動了一個單任務延遲線程池,來做文件操作。
@Override
public void start() {
LOGGER.info("Configuration provider starting");
Preconditions.checkState(file != null,
"The parameter file must not be null");
//啟動一個線程池
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());
FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);
executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);
lifecycleState = LifecycleState.START;
LOGGER.debug("Configuration provider started");
}
里面的FlieWatchRunnable
類會判斷文件是否更新,如果更新了,就重新調用getConfiguration
方法。
整個配置加載的大體就是這樣子,整個過程涉及到了FlumeConfiguration
,下次記錄一下Flume的配置類。
整個代碼結構寫的也很清晰,我覺得是這樣子,笑。每個類,每個函數都能看出它的作用。這是需要學習的地方。