上一篇文章分析了Flume如何加載配置文件的,動態加載也只是重復運行getConfiguration()。
本篇分析加載配置文件后各個組件是如何運行的?
加載完配置文件訂閱者Application類會收到訂閱信息執行:
@Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
MaterializedConfiguration conf就是getConfiguration()方法獲取的配置信息,是SimpleMaterializedConfiguration的一個實例。
handleConfigurationEvent方法在前面章節(一)中有過大致分析,包括:stopAllComponents()和startAllComponents(conf)。Application中的materializedConfiguration就是MaterializedConfiguration conf,stopAllComponents()方法中的materializedConfiguration是舊的配置信息,需要先停掉舊的組件,然后startAllComponents(conf)將新的配置信息賦給materializedConfiguration並依次啟動各個組件。
1、先看startAllComponents(conf)方法。代碼如下:
private void startAllComponents(MaterializedConfiguration materializedConfiguration) {//啟動所有組件最基本的三大組件 logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try{ logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e){ logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start.等待所有channel啟動完畢 */ for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)){ try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners() .entrySet()) { //啟動所有sink try{ logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration .getSourceRunners().entrySet()) {//啟動所有source try{ logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
三大組件都是通過supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START)啟動的,其中,channel啟動之后還要待所有的channel完全啟動完畢之后才可再去啟動sink和source。如果channel沒有啟動完畢就去啟動另外倆組件,會出現錯誤,以為一旦sink或者source建立完畢就會立即與channel通信獲取數據。稍后會分別分析sink和source的啟動。
supervisor是LifecycleSupervisor的一個對象,該類的構造方法會構造一個有10個線程,上限是20的線程池供各大組件使用。構造方法如下:
public LifecycleSupervisor() { lifecycleState = LifecycleState.IDLE; supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();//存儲所有歷史上的組件及其監控信息 monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>(); monitorService = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder().setNameFormat( "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d") .build()); monitorService.setMaximumPoolSize(20); monitorService.setKeepAliveTime(30, TimeUnit.SECONDS); purger = new Purger(); needToPurge = false; }
supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState)方法則是具體執行啟動各個組件的方法。flume的所有組件均實現自
LifecycleAware 接口,如圖:
,這個接口就三個方法getLifecycleState(返回組件運行狀態)、start(組件啟動)、stop(停止組件)。supervise方法代碼如下:
public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) {
//檢查線程池狀態 if(this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()){ throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); } //如果該組件已經在監控,則拒絕二次監控 Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once"); if (logger.isDebugEnabled()) { logger.debug("Supervising service:{} policy:{} desiredState:{}", new Object[] { lifecycleAware, policy, desiredState }); } //新的組件 Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware;//組件 monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務的任一執行遇到異常,就會取消后續執行。 ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); //啟動MonitorRunnable,結束之后3秒再重新啟動,可以用於重試 monitorFutures.put(lifecycleAware, future); }
該方法首先monitorService是否是正常運行狀態;然后構造Supervisoree process = new Supervisoree(),進行賦值並構造一個監控進程MonitorRunnable,放入線程池去執行。
MonitorRunnable.run()方法:
public void run() { logger.debug("checking process:{} supervisoree:{}", lifecycleAware, supervisoree); long now = System.currentTimeMillis();//獲取現在的時間戳 try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); //如果這個組件是是初次受監控 supervisoree.status.firstSeen = now; } //如果這個組件已經監控過 supervisoree.status.lastSeen = now; synchronized (lifecycleAware) {//鎖住組件 if (supervisoree.status.discard) {//該組件已經停止監控 // Unsupervise has already been called on this. logger.info("Component has already been stopped {}", lifecycleAware); return;//直接返回 } else if (supervisoree.status.error) {//該組件是錯誤狀態 logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return;//直接返回 } supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();//獲取組件最新狀態,沒運行start()方法之前是LifecycleState.IDLE狀態 if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) {//該組件最新狀態和期望的狀態不一致 logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) {//根據狀態執行相應的操作 case START: try { lifecycleAware.start(); //啟動組件,同時其狀態也會變為LifecycleState.START } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++;//啟動錯誤失敗次數+1 } break; case STOP: try { lifecycleAware.stop(); //停止組件 } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; //組件停止錯誤,錯誤次數+1 } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } //兩種SupervisorPolicy(AlwaysRestartPolicy和OnceOnlyPolicy)后者還未使用過,前者表示可以重新啟動的組件,后者表示只能運行一次的組件 if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch(Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); }
上面的 lifecycleAware.stop()和lifecycleAware.start()就是執行的sink、source、channel等的對應方法。
這里的start需要注意如果是channel則是直接執行start方法;如果是sink或者PollableSource的實現類,則會在start()方法中啟動一個線程來循環的調用process()方法來從channel拿數據(sink)或者向channel送數據(source);如果是EventDrivenSource的實現類,則沒有process()方法,通過執行start()來執行想channel中送數據的操作(可以在此添加線程來實現相應的邏輯)。
2、stopAllComponents()方法。顧名思義,就是停止所有組件的方法。該方法代碼如下:
private void stopAllComponents() { if (this.materializedConfiguration != null) { logger.info("Shutting down configuration: {}", this.materializedConfiguration); for (Entry<String, SourceRunner> entry : this.materializedConfiguration .getSourceRunners().entrySet()) { try{ logger.info("Stopping Source " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e){ logger.error("Error while stopping {}", entry.getValue(), e); } } for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners().entrySet()) { try{ logger.info("Stopping Sink " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e){ logger.error("Error while stopping {}", entry.getValue(), e); } } for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels().entrySet()) { try{ logger.info("Stopping Channel " + entry.getKey()); supervisor.unsupervise(entry.getValue()); } catch (Exception e){ logger.error("Error while stopping {}", entry.getValue(), e); } } } if(monitorServer != null) { monitorServer.stop(); } }
首先,需要注意的是,stopAllComponents()放在startAllComponents(MaterializedConfiguration materializedConfiguration)方法之前的原因,由於配置文件的動態加載這一特性的存在,使得每次加載之前都要先把舊的組件停掉,然后才能去加載最新配置文件中的配置;
其次,首次執行stopAllComponents()時,由於配置文件尚未賦值,所以並不會執行停止所有組件的操作以及停止monitorServer。再次加載時會依照順序依次停止對source、sink以及channel的監控,通過supervisor.unsupervise(entry.getValue())停止對其的監控,然后停止monitorServer。supervisor.unsupervise方法如下:
public synchronized void unsupervise(LifecycleAware lifecycleAware) { Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware), "Unaware of " + lifecycleAware + " - can not unsupervise"); logger.debug("Unsupervising service:{}", lifecycleAware); synchronized (lifecycleAware) { Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); supervisoree.status.discard = true; this.setDesiredState(lifecycleAware, LifecycleState.STOP); logger.info("Stopping component: {}", lifecycleAware); lifecycleAware.stop(); } supervisedProcesses.remove(lifecycleAware); //We need to do this because a reconfiguration simply unsupervises old //components and supervises new ones. monitorFutures.get(lifecycleAware).cancel(false); //purges are expensive, so it is done only once every 2 hours. needToPurge = true; monitorFutures.remove(lifecycleAware); }
該方法首先會檢查正在運行的組件當中是否有此組件supervisedProcesses.containsKey(lifecycleAware);如果存在,則對此組件標記為已取消監控supervisoree.status.discard = true;將狀態設置為STOP,並停止組件lifecycleAware.stop();然后從刪除此組件的監控記錄,包括從記錄正在處於監控的組件的結構supervisedProcesses以及記錄組件及其對應的運行線程的結構monitorFutures中刪除相應的組件信息,並且needToPurge = true會使得兩小時執行一次的線程池清理操作。
有一個問題就是,sink和source是如何找到對應的channel的呢??其實前面章節就已經講解過,分別在AbstractConfigurationProvider.loadSources方法中通過ChannelSelector配置source對應的channel,而在source中通過getChannelProcessor()獲取channels,通過channelProcessor.processEventBatch(eventList)將events發送到channel中;而在AbstractConfigurationProvider.loadSinks方法中sink.setChannel(channelComponent.channel)來設置此sink對應的channel,然后在sink的實現類中通過getChannel()獲取設置的channel,並使用channel.take()從channel中獲取event進行處理。
以上三節是Flume-NG的啟動、配置文件的加載、配置文件的動態加載、組件的執行的整個流程。文中的疏漏之處,請各位指教,我依然會后續繼續完善這些內容的。
后續還有更精彩的章節。。。。