注意: 本文不談廢話,低級問題請自行檢查。
我使用Java版本的Kafka Producer生產數據,但是拋出了這個異常。百思不得其解,明明防火牆配置,ZooKeeper,Kafka配置都是沒問題的啊。
困擾了我一天,最終發現這樣一個問題: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
Kafka的server.properties文件中IP不能寫主機名,必須寫IP地址而不能寫映射后的主機名.
如果你在這寫的是hostname,例如bigdata:
跑一個Producer程序,你就會喜提Exception:
但是如果改成IP地址:
程序就能正常運行:
我用的版本是Kafka 0.8 ,果然版本低還是BUG太多。浪費了不少時間。
我的生產者代碼:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Date; import java.util.Properties; import java.util.Random; public class TestProducer { public static void main(String[] args) { long events = 10; Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "192.168.29.132:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = "192.168.2." + rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("advClickStreamTopic", ip, msg); producer.send(data); } producer.close(); } }
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
我在JIRA上並沒找到相關BUG。.... 只能說有點坑
版本信息:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.9.2-RC3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency>