kafka+storm結合存在的一些問題與解決方法


  在配置kafka和storm的時候, 經常的會出現一些問題, 主要在以下幾個:

  1.  打jar包上去storm集群的時候會出現jar包沖突,類似於log4j或者sf4j的報錯信息.

  2. kafka本地Java生產者和消費者無法消費數據

  3. kafkaSpout的declareFields到底是什么

  下面我們結合kafka_2.11-0.10.1.0 + apache-storm-1.1.0來詳細的說明這三個問題.

  1.  打jar包上去storm集群的時候會出現jar包沖突,類似於log4j或者sf4j的報錯信息.

SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
5370 [Thread-14-newKafka] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.Log4jLoggerFactory
    at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    at kafka.utils.Logging$class.logger(Logging.scala:24) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.consumer.SimpleConsumer.logger$lzycompute(SimpleConsumer.scala:30) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.consumer.SimpleConsumer.logger(SimpleConsumer.scala:30) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.utils.Logging$class.info(Logging.scala:67) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.consumer.SimpleConsumer.info(SimpleConsumer.scala:30) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:74) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.3.jar:0.9.3]
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.3.jar:0.9.3]
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]
    at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]

  原因:KafkaSpout 代碼里(storm.kafka.KafkaSpout)使用了 slf4j 的包,而 Kafka 系統本身(kafka.consumer.SimpleConsumer)卻使用了 apache 的包.

  解決辦法:在依賴定義中去除問題依賴包

 

 <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.10.1.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

  還有類似於zk和kafkaClient包沖突的情況:

7630 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.PartitionManager - Read partition information from: /test-topic/04680174-656f-41ad-ad6f-2976d28b2d24/partition_0  --> null
7663 [Thread-16-spout-executor[3 3]] INFO  k.c.SimpleConsumer - Reconnect due to error:
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [kafka_2.11-0.10.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [kafka_2.11-0.10.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:103) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]
7672 [Thread-16-spout-executor[3 3]] ERROR o.a.s.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.10.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:103) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]
7673 [Thread-16-spout-executor[3 3]] ERROR o.a.s.d.executor - 
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.10.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:103) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]
7694 [Thread-16-spout-executor[3 3]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
        at org.apache.storm.daemon.worker$fn__8659$fn__8660.invoke(worker.clj:761) [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]

  原因:org.apache.kafka.common.network.NetworkSend 是一個Kafka客戶端庫,kafka 0.9以前,首先初始化這個類,pom.xml中未顯示的聲明Kafka-clients,故導致錯誤。

    解決辦法:加入Kafka-clients依賴.請參照以上的解決方法, 可以用eclipse去找沖突的包.

2. kafka本地Java生產者和消費者無法消費數據

  這個問題一定要強調一下, 因為之前踩坑的時候的確很惱火, 明明在虛擬機里面是可以生產和消費的, 但是本地的JavaApi卻始終無法訪問.后來不經意間發現說要修改hosts文件.

  本地的JavaApi如果hosts文件沒有相關的ip地址是不會調通的.

  

  另外, 需要在虛擬機的host文件里面加上172.16.11.224 kafka01.

  將server.config里面的配置改成advertised.listeners=PLAINTEXT://kafka01:9092

 

3. kafkaSpout的declareFields到底是什么

  這個最開始是在一個kafka+storm熱力圖項目看到的, 老師根據查看kafkaSpout的源碼發現它發送到下一層bolt的時候fileds的名稱是bytes.

  

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.spout;

import java.nio.ByteBuffer;
import java.util.List;

import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;


import static org.apache.storm.utils.Utils.tuple;
import static java.util.Arrays.asList;

public class RawMultiScheme implements MultiScheme {
  @Override
  public Iterable<List<Object>> deserialize(ByteBuffer ser) {
    return asList(tuple(Utils.toByteArray(ser)));
  }

  @Override
  public Fields getOutputFields() {
    return new Fields("bytes");
  }
}

 

  而且分組的方法的也是shuffleGrouping, 這就為難了, 假如我想要在spout開始就按照fields分組呢? 或者在接收的時候不需要bytes字節而是自定義的格式呢?

  這個時候就要更改kafkaSpout的源碼和PartitionManager的相關代碼了.

   

   在這里也補充一個問題, 就是kafkaSpout有很多配置需要定.

  通過SpoutConfig對象的startOffsetTime字段設置消費進度,默認值是kafka.api.OffsetRequest.EarliestTime(),也就是從最早的消息開始消費,如果想從最新的消息開始消費需要手動設置成kafka.api.OffsetRequest.LatestTime()。另外還有一個問題是,這個字段只會在第一次消費消息時起作用,之后消費的offset是從zookeeper中記錄的offset開始的(存放消費記錄的地方是SpoutConfig對象的zkroot字段,未驗證)

  如果想要當前的topology的消費進度接着上一個topology的消費進度繼續消費,那么不要修改SpoutConfig對象的id。換言之,如果你第一次已經從最早的消息開始消費了,那么如果不換id的話,它就要從最早的消息一直消費到最新的消息,這個時候如果想要跳過中間的消息直接從最新的消息開始消費,那么修改SpoutConfig對象的id就可以了

  下面是SpoutConfig對象的一些字段的含義,其實是繼承的KafkaConfig的字段,可看源碼

 

public int fetchSizeBytes = 1024 * 1024; //發給Kafka的每個FetchRequest中,用此指定想要的response中總的消息的大小
public int socketTimeoutMs = 10000;//與Kafka broker的連接的socket超時時間
public int fetchMaxWait = 10000; //當服務器沒有新消息時,消費者會等待這些時間 public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的讀緩沖區大小 public MultiScheme scheme = new RawMultiScheme();//從Kafka中取出的byte[],該如何反序列化 public boolean forceFromStart = false;//是否強制從Kafka中offset最小的開始讀起 public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//從何時的offset時間開始讀,默認為最舊的offset public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的消息 public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所請求的offset對應的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多長時間統計一次metrics

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM