如何將Flume kafka source 插件從0.9升級到0.10.1


在使用flume的kafka插件為0.9版本,由於channel隊列滿后,導致心跳問題,進而導致flume kafka不接收消息,

所以打算升級到0.10版本,fluem首先需要做如下:

將flume中的lib包中kafka 0.9相關升級到0.10,(前提,服務端已經升級至0.10)即:

a)kafka_2.10-0.9.0.1.jar -> kafka_2.10-0.10.0.1.jar

b)kafka-clients-0.9.0.1.jar -> kafka-clients-0.10.0.1.jar

 

替換了jar之后,啟動flume,報如下異常

09 Jan 2018 14:29:36,149 ERROR [lifecycleSupervisor-1-3] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:251)  - Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
        at org.apache.flume.source.kafka.KafkaSource$TopicListSubscriber.subscribe(KafkaSource.java:152)
        at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:516)
        at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

 

對KafkaSource源碼分析發現,kafka在0.10的時將Consumer接口的subscription方法進行了改變:

老的0.9版本(使用反編譯工具看的)

public abstract interface Consumer<K, V> extends Closeable
{
  public abstract Set<TopicPartition> assignment();

  public abstract Set<String> subscription();

  public abstract void subscribe(List<String> paramList);

  public abstract void subscribe(List<String> paramList, ConsumerRebalanceListener paramConsumerRebalanceListener);

  public abstract void assign(List<TopicPartition> paramList);

  public abstract void subscribe(Pattern paramPattern, ConsumerRebalanceListener paramConsumerRebalanceListener);

  public abstract void unsubscribe();
...

 

而新0.10 Consumer接口如下:

public interface Consumer<K, V> extends Closeable {

    /**
     * @see KafkaConsumer#assignment()
     */
    public Set<TopicPartition> assignment();

    /**
     * @see KafkaConsumer#subscription()
     */
    public Set<String> subscription();

    /**
     * @see KafkaConsumer#subscribe(Collection)
     */
    public void subscribe(Collection<String> topics);

    /**
     * @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
     */
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
  ...

可見接口中使用了更通用的Collection作為參數。

但按理講List集成了Collection,所以原有List應該不用修改可直接訪問。(public interface List<E> extends Collection<E> ),但實際卻是報錯。具體原有見下:

 

現在的問題轉換為如果依賴的第三方jar更改了,那么你的項目需不需要重新編譯的問題?

通過求證得到如下結果:

1)如果依賴的接口發生變化,那么需要重新編譯

2)如果依賴的常量發生變化,也需要重新編譯

(參考:https://stackoverflow.com/questions/536971/do-i-have-to-recompile-my-application-when-i-upgrade-a-third-party-jar

可見Consumer接口發生變化,所以需要重新編譯。

將對應的flume源碼得到,將flume-parent中的pom.xml kakfa依賴修改如下

    <!-- <kafka.version>0.9.0.1</kafka.version> -->
    <kafka.version>0.10.0.1</kafka.version>

重新編譯對flume-kafka-source進行打包,更新原flume lib下flume-kafka-source-1.7.0.jar,重啟,一切正常。

 

總結:

1.flume kafka從0.9升級到0.10時,首先將依賴的kafka jar包升級

2.需要依賴0.10對flume-kafka-source項目重新編譯

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM