在使用flume的kafka插件為0.9版本,由於channel隊列滿后,導致心跳問題,進而導致flume kafka不接收消息,
所以打算升級到0.10版本,fluem首先需要做如下:
將flume中的lib包中kafka 0.9相關升級到0.10,(前提,服務端已經升級至0.10)即:
a)kafka_2.10-0.9.0.1.jar -> kafka_2.10-0.10.0.1.jar
b)kafka-clients-0.9.0.1.jar -> kafka-clients-0.10.0.1.jar
替換了jar之后,啟動flume,報如下異常
09 Jan 2018 14:29:36,149 ERROR [lifecycleSupervisor-1-3] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:251) - Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.apache.flume.source.kafka.KafkaSource$TopicListSubscriber.subscribe(KafkaSource.java:152)
at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:516)
at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
對KafkaSource源碼分析發現,kafka在0.10的時將Consumer接口的subscription方法進行了改變:
老的0.9版本(使用反編譯工具看的)
public abstract interface Consumer<K, V> extends Closeable { public abstract Set<TopicPartition> assignment(); public abstract Set<String> subscription(); public abstract void subscribe(List<String> paramList); public abstract void subscribe(List<String> paramList, ConsumerRebalanceListener paramConsumerRebalanceListener); public abstract void assign(List<TopicPartition> paramList); public abstract void subscribe(Pattern paramPattern, ConsumerRebalanceListener paramConsumerRebalanceListener); public abstract void unsubscribe();
...
而新0.10 Consumer接口如下:
public interface Consumer<K, V> extends Closeable { /** * @see KafkaConsumer#assignment() */ public Set<TopicPartition> assignment(); /** * @see KafkaConsumer#subscription() */ public Set<String> subscription(); /** * @see KafkaConsumer#subscribe(Collection) */ public void subscribe(Collection<String> topics); /** * @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener) */ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
...
可見接口中使用了更通用的Collection作為參數。
但按理講List集成了Collection,所以原有List應該不用修改可直接訪問。(public interface List<E> extends Collection<E> ),但實際卻是報錯。具體原有見下:
現在的問題轉換為如果依賴的第三方jar更改了,那么你的項目需不需要重新編譯的問題?
通過求證得到如下結果:
1)如果依賴的接口發生變化,那么需要重新編譯
2)如果依賴的常量發生變化,也需要重新編譯
可見Consumer接口發生變化,所以需要重新編譯。
將對應的flume源碼得到,將flume-parent中的pom.xml kakfa依賴修改如下
<!-- <kafka.version>0.9.0.1</kafka.version> -->
<kafka.version>0.10.0.1</kafka.version>
重新編譯對flume-kafka-source進行打包,更新原flume lib下flume-kafka-source-1.7.0.jar,重啟,一切正常。
總結:
1.flume kafka從0.9升級到0.10時,首先將依賴的kafka jar包升級
2.需要依賴0.10對flume-kafka-source項目重新編譯
