從bin/flume 這個shell腳本可以看到Flume的起始於org.apache.flume.node.Application類,這是flume的main函數所在。
main方法首先會先解析shell命令,如果指定的配置文件不存在就甩出異常。
根據命令中含有"no-reload-conf"參數,決定采用那種加載配置文件方式:一、沒有此參數,會動態加載配置文件,默認每30秒加載一次配置文件,因此可以動態修改配置文件;二、有此參數,則只在啟動時加載一次配置文件。實現動態加載功能采用了發布訂閱模式,使用guava中的EventBus實現。
EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); //這里是發布事件的類,這里的30則是動態加載配置文件時間間隔,單位是s components.add(configurationProvider); application = new Application(components); eventBus.register(application); //將訂閱類注冊到Bus中
訂閱類是application = new Application(components);發布代碼在PollingPropertiesFileConfigurationProvider中的FileWatcherRunnable.run方法中。在這只是先構建一個PollingPropertiesFileConfigurationProvider對象,PollingPropertiesFileConfigurationProvider extends PropertiesFileConfigurationProvider implements LifecycleAware,繼續跟蹤PropertiesFileConfigurationProvider extends AbstractConfigurationProvider,再跟蹤AbstractConfigurationProvider implements ConfigurationProvider可以看到這些類的構造方法都是初始化,AbstractConfigurationProvid的構造方法初始化了sink、channel、source的工廠類。
Application.handleConfigurationEvent(MaterializedConfiguration conf)有@Subscribe注解,是訂閱方法,當eventBus.post(MaterializedConfiguration conf)執行時,會觸發執行handleConfigurationEvent方法。
new Application(components)時,會構建一個對象supervisor = new LifecycleSupervisor()會啟動10個線程用來執行配置文件中的各個組件,並監控組件的整個運行過程。
application.start()方法會啟動配置文件的加載過程supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //LifecycleState.START開始運行,在這的component就是上面的PollingPropertiesFileConfigurationProvider對象。supervise方法會對component創建一個MonitorRunnable進程,並放入默認有10個線程的monitorService去執行
Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware;//組件 monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務的任一執行遇到異常,就會取消后續執行。 ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); //啟動MonitorRunnable,結束之后3秒再重新啟動,可以用於重試 monitorFutures.put(lifecycleAware, future);
看MonitorRunnable類,其run方法主要是根據supervisoree.status.desiredState的值執行對應的操作。這里的lifecycleAware就是上面supervise方法中的component,lifecycleAware在構造之初將lifecycleState=IDLE,application.start()方法通過supervisor.supervise方法將supervisoree.status.desiredState=START。所以在run方法中會執行lifecycleAware.start(),也就是PollingPropertiesFileConfigurationProvider.start()方法。
PollingPropertiesFileConfigurationProvider.start()方法會啟動一個單線程FileWatcherRunnable每隔30s去加載一次配置文件(如果配置文件有修改):eventBus.post(getConfiguration())。getConfiguration()是AbstractConfigurationProvider.getConfiguration()這個方法解析了配置文件獲取了所有組件及其配置屬性。這個方法較為復雜,放在后續再講解。
待eventBus.post(getConfiguration())之后會觸發Application.handleConfigurationEvent方法:
@Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
stopAllComponents()方法會依次stop各個組件的運行,順序是:source、sink、channel。之所以有順序是因為:一、source是不停的讀數據放入channel的;二、sink是不停的從channel拿數據的,channel兩頭都在使用應該最后停止,停止向channel發送數據后sink停止才不會丟數據。stop是通過supervisor.unsupervise方法來完成的。
startAllComponents(conf)是啟動各個組件的,順序正好和stopAllComponents()停止順序相反,相信大伙很容易理解。是通過supervisor.supervise啟動組件的。另外需要注意的是啟動channel組件后需要等待一定時間,是為了讓所有channel全部啟動。
另外為什么要先stop再start呢?因為考慮到要動態加載配置文件啊,加載配置文件后就需要重新啟動所有組件,所以先停止所有的,再重新啟動所有的。
main方法的最后還有一個鈎子函數Runtime.getRuntime().addShutdownHook,主要是用來進行內存清理、對象銷毀等操作。
歡迎大伙交流