import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaTest2 { public static void main(String[] args) { // 設置配置屬性 Properties props = new Properties(); props.put("metadata.broker.list", "130.51.23.95:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // key.serializer.class默認為serializer.class props.put("key.serializer.class", "kafka.serializer.StringEncoder"); // 可選配置,如果不配置,則使用默認的partitioner // props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo"); // 觸發acknowledgement機制,否則是fire and forget,可能會引起數據丟失 // 值為0,1,-1,可以參考 // http://kafka.apache.org/08/configuration.html props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); // 創建producer Producer<String, String> producer = new Producer<String, String>(config); // 產生並發送消息 long start = System.currentTimeMillis(); KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "test123", "test123"); producer.send(data); System.out.println("耗時:" + (System.currentTimeMillis() - start)); // 關閉producer producer.close(); } }
運行之后,報一下錯誤:
解決辦法:
修改 config/server.properties文件(多節點的話,每個節點都修改一下)
上面的端口必須放開,並且寫你的真實IP地址