Apache Flume是一個分布式的、可靠的、可用的系統,用於有效地收集、 聚合和將大量日志數據從許多不同的源移動到一個集中的數據存儲,但是其本身是以本地properties作為配置的,配置無法做到動態監聽和更新。
一、Flume和ETCD的結合,使用ETCD作為flume 數據采集的配置中心。
那么如何做出一個flume的動態配置中心呢,etcd 可以是一個很好的選擇。etcd的API版本有v2和v3兩個,這里選擇v3版本。在flume啟動的時候,可以啟動etcd的監聽。
... @Override public void start() { //初始化監聽 EtcdUtil.initListen(etcdConfig); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); super.start(); } ...
/** * etcd的監聽,監聽指定的key,當key 發生變化后,監聽自動感知到變化。 key發生變化后,會更新本地緩存數據 * * @param key 指定需要監聽的key */ public static void initListen(String key) { try { //加載配置 loadProperties(getConfig(EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs())); new Thread(() -> { Watch.Watcher watcher = EtcdUtil.getEtclClient().getWatchClient().watch(ByteSequence.fromString(key)); try { while (true) { watcher.listen().getEvents().stream().forEach(watchEvent -> { KeyValue kv = watchEvent.getKeyValue(); log.info("etcd event:{} ,change key is:{},afterChangeValue:{}", watchEvent.getEventType(), kv.getKey().toStringUtf8(), kv.getValue().toStringUtf8()); loadProperties(kv.getValue().toStringUtf8()); }); } } catch (InterruptedException e) { log.error("etcd listen start cause Exception:{}", e); } }).start(); } catch (Exception e) { log.error("etcd listen start cause Exception:{}", e); } }
備注:完整的代碼可以參考筆者博客:https://www.cnblogs.com/laoqing/p/8967549.html
監聽完配置后,就可以在etcd 的配置中心中管理配置了
然后就可以通過如下代碼獲取配置了
.... EtcdUtil.getLocalPropertie("xxxxx") ....
二、Flume 日志采集中的流水線架構設計
flume 中數據采集是通過source->Sink的方式進行數據采集入庫的,但是有一個缺點就是數據中如果需要做一些ETL的業務處理,比如簡單的數據加工,或者增加一些業務邏輯處理等然后再入庫,無法滿足。而是我們就可以對flume原有的架構進行拓展。
拓展后的架構圖如下所示。
- 1、用戶可以自定義process,繼承統一的process接口,用戶的process自己打成jar包。放到flume的lib目錄中。
-
public interface Processor<T> { T process(T log); }
-
- 2、etcd動態配置中,配置需要使用哪些process,在多個process的時候,在etcd動態配置中配置順序。
- processors=[{"processor":"com.xxx.flume.tax.processor.TaxCrawlerDataCommonProcessor","logType":"5"}] # logType代表日志類型
-
public class ProcessorBean { private String processor; private String logType; private Processor processorInstance; public Processor getProcessorInstance() { return processorInstance; } public void setProcessorInstance(Processor processorInstance) { this.processorInstance = processorInstance; } public String getProcessor() { return processor; } public void setProcessor(String processor) { this.processor = processor; } public String getLogType() { return logType; } public void setLogType(String logType) { this.logType = logType; } @Override public String toString() { return "ProcessorBean{" + "processor='" + processor + '\'' + ", logType='" + logType + '\'' + ", processorInstance=" + processorInstance + '}'; } }
...
processorBeanList = GsonUtil.gson.fromJson(EtcdUtil.getLocalPropertie("processors"), new TypeToken<List<ProcessorBean>>() { }.getType()); processorBeanList.forEach(processorBean -> { try { Processor<?> processor = (Processor<?>) Class.forName(processorBean.getProcessor()).newInstance(); processorBean.setProcessorInstance(processor); } catch (Throwable e) { e.printStackTrace(); } });
...
- 3、process 為動態裝載形式,可以隨時開啟和關閉。Process中業務自己處理自己的業務邏輯。
- 4、source負責數據采集
- 5、sink負責數據入庫到目標端,並且負責通知(可以在動態配置中配置是否開啟通知功能)
- isNotice=1#1代表打開通知
-
public interface Notice { void noticePostLog(String logType); void noticePostLog(List<Map<String,Object>> noticeMsg); }
public void noticePostLog(String logType) { if (null != EtcdUtil.getLocalPropertie("isNotice") && "1".equals(EtcdUtil.getLocalPropertie("isNotice"))) { List<Map<String, Object>> callList = new ArrayList<>(); ................ if (null != callList && callList.size() > 0) { noticePostLog(callList); } } }
if (null != processorBeanList && processorBeanList.size() > 0) { for (ProcessorBean processorBean : processorBeanList) { try { if (logType.equals(processorBean.getLogType())) { if ("2".equals(logType)) { log = (BusinessLog) processorBean.getProcessorInstance().process(log); } else if ("5".equals(logType)) { log = (CrawlerLog) processorBean.getProcessorInstance().process(log); } } } catch (Throwable e) { logger.error("exec process cause Exception", e); } } }
- 6、通知為一個通用的json字段。
- 7、后續所有的應用服務器都在裝機時,統一預先把flume包放入進去。用戶在使用flume時,只需要做配置以及上傳自己的process包。
- 8、除了process不能復用外,其他的部分都通用組件復用。
- 9、process就類似流水線作業的一樣。
本文作者:張永清 連接:https://www.cnblogs.com/laoqing/p/12620747.html
三、Flume 日志采集中的流水線架構設計在爬蟲中的架構實踐
這里以稅務數據爬蟲為例,仔細看如下的架構設計
- 1、稅務的爬蟲數據采用flume進行采集入庫
- 2、由於各個省的稅務網站欠差萬別,數據在爬蟲下來后,需要按照不同的省份進行進行(html頁面數據解析,由於每個省的稅務網站不同,html不一樣)。解析時,就采用了process處理。
- 3、每個省份有一套解析的代碼,每個省份實現同一個底層的解析接口,解析時,通過http接口從業務系統中獲取配置的解析規則。
public interface TaxCrawlerAnalysis { TaxTable analysisTaxTable(TaxHtmlTable taxHtmlTable,String taxTableType); }
- 4、每個省份的解析類同樣采用動態加載的方式,在解析處理時通過省份編碼的形式進行匹配。
taxCrawlerAnalysises=[{"taxCrawlerAnalysis":"com.xxx.bigdata.crawler.tax.common.analysis.TaxCrawlerPdfTableAnalysis","provinceCity":"320000"}]
四、總結
作者的原創文章,轉載須注明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對於轉載了博主的原創文章,不標注出處的,作者將依法追究版權,請尊重作者的成果。本文作者:張永清 連接:https://www.cnblogs.com/laoqing/p/12620747.html
1、流水線的處理,讓flume可以動態的擴展,可以支持自定義的業務處理。業務處理的代碼可以作為單獨的項目即插即用的集成到flume中。
2、etcd作為動態配置中心后,配置可以做到動態的更新,而不需要配置變更后,對jvm進程進行重啟。
3、對flume進行改造和擴展的代碼,后續都會放入個人github中。
五、github 源碼:供參考