kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最無語的配置


注意: 本文不談廢話,低級問題請自行檢查。

 

 

我使用Java版本的Kafka Producer生產數據,但是拋出了這個異常。百思不得其解,明明防火牆配置,ZooKeeper,Kafka配置都是沒問題的啊。

困擾了我一天,最終發現這樣一個問題:  kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

Kafka的server.properties文件中IP不能寫主機名,必須寫IP地址而不能寫映射后的主機名.

如果你在這寫的是hostname,例如bigdata:

跑一個Producer程序,你就會喜提Exception:

但是如果改成IP地址:

程序就能正常運行:

 

我用的版本是Kafka 0.8 ,果然版本低還是BUG太多。浪費了不少時間。

 

我的生產者代碼:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

public class TestProducer {
    public static void main(String[] args) {
        long events = 10;
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.29.132:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "SimplePartitioner");
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = "192.168.2." + rnd.nextInt(255);
               String msg = runtime + ",www.example.com," + ip;
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("advClickStreamTopic", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

 

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

我在JIRA上並沒找到相關BUG。....  只能說有點坑

 

版本信息:

     <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.9.2-RC3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1.1</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
            </exclusions>
        </dependency>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM