Flume-NG啟動過程源碼分析(三)(原創)


  上一篇文章分析了Flume如何加載配置文件的,動態加載也只是重復運行getConfiguration()。

  本篇分析加載配置文件后各個組件是如何運行的?

  加載完配置文件訂閱者Application類會收到訂閱信息執行:

  @Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

  MaterializedConfiguration conf就是getConfiguration()方法獲取的配置信息,是SimpleMaterializedConfiguration的一個實例。

  handleConfigurationEvent方法在前面章節(一)中有過大致分析,包括:stopAllComponents()和startAllComponents(conf)。Application中的materializedConfiguration就是MaterializedConfiguration conf,stopAllComponents()方法中的materializedConfiguration是舊的配置信息,需要先停掉舊的組件,然后startAllComponents(conf)將新的配置信息賦給materializedConfiguration並依次啟動各個組件。

  1、先看startAllComponents(conf)方法。代碼如下:

private void startAllComponents(MaterializedConfiguration materializedConfiguration) {//啟動所有組件最基本的三大組件
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;

    for (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try{
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e){
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    /*
     * Wait for all channels to start.等待所有channel啟動完畢
     */
    for(Channel ch: materializedConfiguration.getChannels().values()){
      while(ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)){
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }

    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
        .entrySet()) {        //啟動所有sink
      try{
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    for (Entry<String, SourceRunner> entry : materializedConfiguration
        .getSourceRunners().entrySet()) {//啟動所有source
      try{
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();
  }

  三大組件都是通過supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START)啟動的,其中,channel啟動之后還要待所有的channel完全啟動完畢之后才可再去啟動sink和source。如果channel沒有啟動完畢就去啟動另外倆組件,會出現錯誤,以為一旦sink或者source建立完畢就會立即與channel通信獲取數據。稍后會分別分析sink和source的啟動。

  supervisor是LifecycleSupervisor的一個對象,該類的構造方法會構造一個有10個線程,上限是20的線程池供各大組件使用。構造方法如下:

public LifecycleSupervisor() {
    lifecycleState = LifecycleState.IDLE;
    supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();//存儲所有歷史上的組件及其監控信息
    monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
    monitorService = new ScheduledThreadPoolExecutor(10,
        new ThreadFactoryBuilder().setNameFormat(
            "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
            .build());
    monitorService.setMaximumPoolSize(20);
    monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
    purger = new Purger();
    needToPurge = false;
  }

  supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState)方法則是具體執行啟動各個組件的方法。flume的所有組件均實現自

LifecycleAware 接口,如圖:,這個接口就三個方法getLifecycleState(返回組件運行狀態)、start(組件啟動)、stop(停止組件)。supervise方法代碼如下:
public synchronized void supervise(LifecycleAware lifecycleAware,
      SupervisorPolicy policy, LifecycleState desiredState) {
  //檢查線程池狀態
if(this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()){ throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); }   //如果該組件已經在監控,則拒絕二次監控 Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once"); if (logger.isDebugEnabled()) { logger.debug("Supervising service:{} policy:{} desiredState:{}", new Object[] { lifecycleAware, policy, desiredState }); }   //新的組件 Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware;//組件 monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務的任一執行遇到異常,就會取消后續執行。 ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); //啟動MonitorRunnable,結束之后3秒再重新啟動,可以用於重試 monitorFutures.put(lifecycleAware, future); }

  該方法首先monitorService是否是正常運行狀態;然后構造Supervisoree process = new Supervisoree(),進行賦值並構造一個監控進程MonitorRunnable,放入線程池去執行。

  MonitorRunnable.run()方法:

public void run() {
      logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
          supervisoree);

      long now = System.currentTimeMillis();//獲取現在的時間戳

      try {
        if (supervisoree.status.firstSeen == null) {
          logger.debug("first time seeing {}", lifecycleAware);
      //如果這個組件是是初次受監控
          supervisoree.status.firstSeen = now;
        }
     //如果這個組件已經監控過
        supervisoree.status.lastSeen = now;
        synchronized (lifecycleAware) {//鎖住組件
          if (supervisoree.status.discard) {//該組件已經停止監控
            // Unsupervise has already been called on this.
            logger.info("Component has already been stopped {}", lifecycleAware);
            return;//直接返回
          } else if (supervisoree.status.error) {//該組件是錯誤狀態
            logger.info("Component {} is in error state, and Flume will not"
                + "attempt to change its state", lifecycleAware);
            return;//直接返回
          }

          supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();//獲取組件最新狀態,沒運行start()方法之前是LifecycleState.IDLE狀態

          if (!lifecycleAware.getLifecycleState().equals(
              supervisoree.status.desiredState)) {//該組件最新狀態和期望的狀態不一致

            logger.debug("Want to transition {} from {} to {} (failures:{})",
                new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
                    supervisoree.status.desiredState,
                    supervisoree.status.failures });

            switch (supervisoree.status.desiredState) {//根據狀態執行相應的操作
              case START:
                try {
                  lifecycleAware.start();   //啟動組件,同時其狀態也會變為LifecycleState.START
                } catch (Throwable e) {
                  logger.error("Unable to start " + lifecycleAware
                      + " - Exception follows.", e);
                  if (e instanceof Error) {
                    // This component can never recover, shut it down.
                    supervisoree.status.desiredState = LifecycleState.STOP;
                    try {
                      lifecycleAware.stop();
                      logger.warn("Component {} stopped, since it could not be"
                          + "successfully started due to missing dependencies",
                          lifecycleAware);
                    } catch (Throwable e1) {
                      logger.error("Unsuccessful attempt to "
                          + "shutdown component: {} due to missing dependencies."
                          + " Please shutdown the agent"
                          + "or disable this component, or the agent will be"
                          + "in an undefined state.", e1);
                      supervisoree.status.error = true;
                      if (e1 instanceof Error) {
                        throw (Error) e1;
                      }
                      // Set the state to stop, so that the conf poller can
                      // proceed.
                    }
                  }
                  supervisoree.status.failures++;//啟動錯誤失敗次數+1
                }
                break;
              case STOP:
                try {
                  lifecycleAware.stop();    //停止組件
                } catch (Throwable e) {
                  logger.error("Unable to stop " + lifecycleAware
                      + " - Exception follows.", e);
                  if (e instanceof Error) {
                    throw (Error) e;
                  }
                  supervisoree.status.failures++;  //組件停止錯誤,錯誤次數+1
                }
                break;
              default:
                logger.warn("I refuse to acknowledge {} as a desired state",
                    supervisoree.status.desiredState);
            }
       //兩種SupervisorPolicy(AlwaysRestartPolicy和OnceOnlyPolicy)后者還未使用過,前者表示可以重新啟動的組件,后者表示只能運行一次的組件
            if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
              logger.error(
                  "Policy {} of {} has been violated - supervisor should exit!",
                  supervisoree.policy, lifecycleAware);
            }
          }
        }
      } catch(Throwable t) {
        logger.error("Unexpected error", t);
      }
      logger.debug("Status check complete");
    }

   上面的 lifecycleAware.stop()和lifecycleAware.start()就是執行的sink、source、channel等的對應方法。

  這里的start需要注意如果是channel則是直接執行start方法;如果是sink或者PollableSource的實現類,則會在start()方法中啟動一個線程來循環的調用process()方法來從channel拿數據(sink)或者向channel送數據(source);如果是EventDrivenSource的實現類,則沒有process()方法,通過執行start()來執行想channel中送數據的操作(可以在此添加線程來實現相應的邏輯)。

  2、stopAllComponents()方法。顧名思義,就是停止所有組件的方法。該方法代碼如下:

private void stopAllComponents() {
    if (this.materializedConfiguration != null) {
      logger.info("Shutting down configuration: {}", this.materializedConfiguration);
      for (Entry<String, SourceRunner> entry : this.materializedConfiguration
          .getSourceRunners().entrySet()) {
        try{
          logger.info("Stopping Source " + entry.getKey());
          supervisor.unsupervise(entry.getValue());
        } catch (Exception e){
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }

      for (Entry<String, SinkRunner> entry :
        this.materializedConfiguration.getSinkRunners().entrySet()) {
        try{
          logger.info("Stopping Sink " + entry.getKey());
          supervisor.unsupervise(entry.getValue());
        } catch (Exception e){
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }

      for (Entry<String, Channel> entry :
        this.materializedConfiguration.getChannels().entrySet()) {
        try{
          logger.info("Stopping Channel " + entry.getKey());
          supervisor.unsupervise(entry.getValue());
        } catch (Exception e){
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }
    }
    if(monitorServer != null) {
      monitorServer.stop();
    }
  }

  首先,需要注意的是,stopAllComponents()放在startAllComponents(MaterializedConfiguration materializedConfiguration)方法之前的原因,由於配置文件的動態加載這一特性的存在,使得每次加載之前都要先把舊的組件停掉,然后才能去加載最新配置文件中的配置;

  其次,首次執行stopAllComponents()時,由於配置文件尚未賦值,所以並不會執行停止所有組件的操作以及停止monitorServer。再次加載時會依照順序依次停止對source、sink以及channel的監控,通過supervisor.unsupervise(entry.getValue())停止對其的監控,然后停止monitorServer。supervisor.unsupervise方法如下:

public synchronized void unsupervise(LifecycleAware lifecycleAware) {

    Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware),
        "Unaware of " + lifecycleAware + " - can not unsupervise");

    logger.debug("Unsupervising service:{}", lifecycleAware);

    synchronized (lifecycleAware) {
    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
    supervisoree.status.discard = true;
      this.setDesiredState(lifecycleAware, LifecycleState.STOP);
      logger.info("Stopping component: {}", lifecycleAware);
      lifecycleAware.stop();
    }
    supervisedProcesses.remove(lifecycleAware);
    //We need to do this because a reconfiguration simply unsupervises old
    //components and supervises new ones.
    monitorFutures.get(lifecycleAware).cancel(false);
    //purges are expensive, so it is done only once every 2 hours.
    needToPurge = true;
    monitorFutures.remove(lifecycleAware);
  }

  該方法首先會檢查正在運行的組件當中是否有此組件supervisedProcesses.containsKey(lifecycleAware);如果存在,則對此組件標記為已取消監控supervisoree.status.discard = true;將狀態設置為STOP,並停止組件lifecycleAware.stop();然后從刪除此組件的監控記錄,包括從記錄正在處於監控的組件的結構supervisedProcesses以及記錄組件及其對應的運行線程的結構monitorFutures中刪除相應的組件信息,並且needToPurge = true會使得兩小時執行一次的線程池清理操作。

  有一個問題就是,sink和source是如何找到對應的channel的呢??其實前面章節就已經講解過,分別在AbstractConfigurationProvider.loadSources方法中通過ChannelSelector配置source對應的channel,而在source中通過getChannelProcessor()獲取channels,通過channelProcessor.processEventBatch(eventList)將events發送到channel中;而在AbstractConfigurationProvider.loadSinks方法中sink.setChannel(channelComponent.channel)來設置此sink對應的channel,然后在sink的實現類中通過getChannel()獲取設置的channel,並使用channel.take()從channel中獲取event進行處理。

  

  以上三節是Flume-NG的啟動、配置文件的加載、配置文件的動態加載、組件的執行的整個流程。文中的疏漏之處,請各位指教,我依然會后續繼續完善這些內容的。

 

  后續還有更精彩的章節。。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM