Flume實現寫入es


Flume定制elasticsearch sink源碼

最近嘗試通過Flume將消息寫入elasticsearch,但是flume並沒有對每個es版本提供支持,僅僅保留了對0.9版本支持,可能是由於es版本變化頻繁且不同版本間差異較大,沒有辦法在每個Flume版本都對es每個版本進行一次定制開發。

版本兼容問題

下面是我如何在flume1.7版本實現對es6.8寫入,實現期間趟了無數的坑,其中一個下插曲是,自己大意,從官網下載了最新的flume源碼(1.9),因為es sink部分代碼變化極少,以因此竊以為使用最新源碼開發只編譯es sink包是沒有問題的,開發完了才發現,打出的sink包無法在1.7上運行,重下1.7版本flume源碼再做調整。。。ε=(´ο`*)))唉。

 

Flume源碼下載

flume是apache的頂級開源項目,直接到apache官網下載,源碼下下來后使用IDE打開,我用的是Idea。flume有兩個發版的代碼線,0.9.x和1.x,這里要注意,下載的flume源碼版本要和自己使用的Flume版本一致。flume項目依賴的包非常多,並且開源項目都使用的是Maven中心倉庫里的官方包,因此第一次導入Flume項目是個很漫長的過程,保持網絡暢通哦,我導入大概花了3個小時把所有包下下來。

 

代碼修改

flume源碼中es sink相關代碼都在flume/flume-ng-sinks/flume-ng-elasticsearch-sink子模塊下,代碼實現很簡單。

源碼詳見:https://github.com/liwutao/flume-with-es6.85

 

apache-flume-1.7.0-src

|—flume-ng-elasticsearch-sink

|—client

         |—ElasticSearchClient.java

  |—ElasticSearchClientFactory.java

  |—ElasticSearchRestClient.java

  |—ElasticSearchTransportClient.java

  |—NoSuchClientTypeException.java

  |—RoundRobinList.java

|—AbstractElasticSearchIndexRequestBuilderFactory.java

|—ContentBuilderUtil.java

|—ElasticSearchDynamicSerializer.java

|—ElasticSearchIndexRequestBuilderFactory.java

|—ElasticSearchLogStashEventSerializer.java

|—ElasticSearchSink.java

|—ElasticSearchSinkConstants.java

|—EventSerializerIndexRequestBuilderFactory.java

|—IndexNameBuilder.java

|—SimpleIndexNameBuilder.java

|—TimeBasedIndexNameBuilder.java

|—TimestampedEvent.java

|—pom.xml

|—pom.xml

 1. 修改pom.xml 中es相關包依賴版本為6.8.5

 2. 調整es sink代碼中所有使用es接口的代碼,都調整為使用6.8.5接口

 3. 修改flume-ng-elasticsearch-sink子工程pom.xml,增加transport依賴,用於提供6.8.5客戶端依賴

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
</dependency>

 

 4.  修改flume-ng-elasticsearch-sink子工程pom.xml,增加httpclient依賴,用於提供6.8.5客戶端依賴

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
</dependency>

 

打包部署

修改完成后需要打包部署,將打出的flume-ng-elasticsearch-sink-1.7.0.jar 包部署到${FLUME_HOME}/lib/下

從es環境拷貝所有elastic相關包到${FLUME_HOME}/lib/下

從本地拷貝elasticsearch sink依賴的包到${FLUME_HOME}/lib/下, 依賴包挺多都是通過報錯一一排查出來的:

elasticsearch-6.8.5.jar
elasticsearch-cli-6.8.5.jar
elasticsearch-core-6.8.5.jar
elasticsearch-rest-client-6.8.5.ja
elasticsearch-secure-sm-6.8.5.jar
elasticsearch-ssl-config-6.8.5.jar
elasticsearch-x-content-6.8.5.jar
httpasyncclient-4.1.2.jar
jackson-core-asl-1.9.3.jar.bak
lang-mustache-client-6.8.5.jar
netty-3.9.4.Final.jar
netty-buffer-4.1.32.Final.jar
netty-codec-4.1.32.Final.jar
netty-codec-http-4.1.32.Final.jar
netty-common-4.1.32.Final.jar
netty-handler-4.1.32.Final.jar
netty-resolver-4.1.32.Final.jar
netty-transport-4.1.32.Final.jar
parent-join-client-6.8.5.jar
percolator-client-6.8.5.jar
rank-eval-client-6.8.5.jar
reindex-client-6.8.5.jar
transport-6.8.5.jar
transport-netty4-client-6.8.5.jar

 

 

定制flume interceptor

趟坑

 下面是遇到的幾個缺包報錯:

FAIL_ON_SYMBOL_HASH_OVERFLOW

11 三月 2020 12:16:31,586 ERROR [lifecycleSupervisor-1-2] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:251) - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@29ce66c0 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
at org.elasticsearch.common.xcontent.json.JsonXContent.<clinit>(JsonXContent.java:57)
at org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:56)
at org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:1318)
at org.elasticsearch.common.settings.Setting.access$800(Setting.java:87)
at org.elasticsearch.common.settings.Setting$ListSetting.lambda$new$0(Setting.java:1343)
at org.elasticsearch.common.settings.Setting$ListSetting.innerGetRaw(Setting.java:1353)
at org.elasticsearch.common.settings.Setting.getRaw(Setting.java:461)
at org.elasticsearch.common.settings.Setting.lambda$listSetting$35(Setting.java:1269)
at org.elasticsearch.common.settings.Setting.listSetting(Setting.java:1286)
at org.elasticsearch.common.settings.Setting.listSetting(Setting.java:1269)
at org.elasticsearch.transport.TransportSettings.<clinit>(TransportSettings.java:47)
at org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:105)
at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:135)
at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:288)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.openClient(ElasticSearchTransportClient.java:206)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:79)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
11 三月 2020 12:16:31,590 INFO [lifecycleSupervisor-1-2] (org.apache.flume.sink.elasticsearch.ElasticSearchSink.stop:381) - ElasticSearch sink {} stopping

 

問題:所依賴jackson包版本不一致

解決:需要把本地打包使用的所有jackson包都替換到flume環境

 

ClassNotFound:io.netty.util.NettyRuntime

問題:缺少nettyCommon包

解決:把本地倉庫netty目錄下所有依賴包直接拷貝到flume環境

 

 

ClassNotFound:SslConfigurationLoader

問題:缺少elasticsearch-ssl-config包

解決方案:elasticsearch所有包都需要添加到flume

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-ssl-config</artifactId>
    <version>6.7.1</version>
</dependency>

 

ClassNotFound:SchemeIOSessionStrategy

 

unner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6d310488 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoClassDefFoundError: org/apache/http/nio/conn/SchemeIOSessionStrategy
at org.elasticsearch.index.reindex.ReindexPlugin.getSettings(ReindexPlugin.java:94)
at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)
at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:147)
at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:288)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.openClient(ElasticSearchTransportClient.java:206)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:79)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.http.nio.conn.SchemeIOSessionStrategy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 29 more

 

解決方案:httpasyncclient包需要拷貝到flume

 

java.lang.NoSuchMethodError: io.netty.util.internal.ObjectUtil.checkPositive(ILjava/lang/String;)

原因:netty-common包版本低或者版本沖突

解決:使用netty-common-4.1.32.Final.jar

 

 兩種客戶端

flume elasticsearch sink訪問es使用了兩種客戶端:

PreBuiltTransportClient

transportClient使用接口9300
HttpClient

     restClient接口9200




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM