Flume-NG啟動過程源碼分析(一)(原創)


  從bin/flume 這個shell腳本可以看到Flume的起始於org.apache.flume.node.Application類,這是flume的main函數所在。

  main方法首先會先解析shell命令,如果指定的配置文件不存在就甩出異常。

  根據命令中含有"no-reload-conf"參數,決定采用那種加載配置文件方式:一、沒有此參數,會動態加載配置文件,默認每30秒加載一次配置文件,因此可以動態修改配置文件;二、有此參數,則只在啟動時加載一次配置文件。實現動態加載功能采用了發布訂閱模式,使用guava中的EventBus實現。

     EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30); //這里是發布事件的類,這里的30則是動態加載配置文件時間間隔,單位是s
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);    //將訂閱類注冊到Bus中

  訂閱類是application = new Application(components);發布代碼在PollingPropertiesFileConfigurationProvider中的FileWatcherRunnable.run方法中。在這只是先構建一個PollingPropertiesFileConfigurationProvider對象,PollingPropertiesFileConfigurationProvider extends  PropertiesFileConfigurationProvider implements LifecycleAware,繼續跟蹤PropertiesFileConfigurationProvider extends AbstractConfigurationProvider,再跟蹤AbstractConfigurationProvider implements  ConfigurationProvider可以看到這些類的構造方法都是初始化,AbstractConfigurationProvid的構造方法初始化了sink、channel、source的工廠類。

  Application.handleConfigurationEvent(MaterializedConfiguration conf)有@Subscribe注解,是訂閱方法,當eventBus.post(MaterializedConfiguration conf)執行時,會觸發執行handleConfigurationEvent方法。

  new Application(components)時,會構建一個對象supervisor = new LifecycleSupervisor()會啟動10個線程用來執行配置文件中的各個組件,並監控組件的整個運行過程。

  application.start()方法會啟動配置文件的加載過程supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //LifecycleState.START開始運行,在這的component就是上面的PollingPropertiesFileConfigurationProvider對象。supervise方法會對component創建一個MonitorRunnable進程,並放入默認有10個線程的monitorService去執行

 

    Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;//組件
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);
    //創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務的任一執行遇到異常,就會取消后續執行。
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);  //啟動MonitorRunnable,結束之后3秒再重新啟動,可以用於重試
    monitorFutures.put(lifecycleAware, future);

 

  看MonitorRunnable類,其run方法主要是根據supervisoree.status.desiredState的值執行對應的操作。這里的lifecycleAware就是上面supervise方法中的componentlifecycleAware在構造之初將lifecycleState=IDLE,application.start()方法通過supervisor.supervise方法將supervisoree.status.desiredState=START。所以在run方法中會執行lifecycleAware.start(),也就是PollingPropertiesFileConfigurationProvider.start()方法。

  PollingPropertiesFileConfigurationProvider.start()方法會啟動一個單線程FileWatcherRunnable每隔30s去加載一次配置文件(如果配置文件有修改):eventBus.post(getConfiguration())。getConfiguration()是AbstractConfigurationProvider.getConfiguration()這個方法解析了配置文件獲取了所有組件及其配置屬性。這個方法較為復雜,放在后續再講解。

  待eventBus.post(getConfiguration())之后會觸發Application.handleConfigurationEvent方法:

  @Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

 

  stopAllComponents()方法會依次stop各個組件的運行,順序是:source、sink、channel。之所以有順序是因為:一、source是不停的讀數據放入channel的;二、sink是不停的從channel拿數據的,channel兩頭都在使用應該最后停止,停止向channel發送數據后sink停止才不會丟數據。stop是通過supervisor.unsupervise方法來完成的。

  startAllComponents(conf)是啟動各個組件的,順序正好和stopAllComponents()停止順序相反,相信大伙很容易理解。是通過supervisor.supervise啟動組件的。另外需要注意的是啟動channel組件后需要等待一定時間,是為了讓所有channel全部啟動。

  另外為什么要先stop再start呢?因為考慮到要動態加載配置文件啊,加載配置文件后就需要重新啟動所有組件,所以先停止所有的,再重新啟動所有的。

  main方法的最后還有一個鈎子函數Runtime.getRuntime().addShutdownHook,主要是用來進行內存清理、對象銷毀等操作。

 

   

歡迎大伙交流


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM