Java訪問kafka的時候java.nio.channels.ClosedChannelException解決辦法


import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaTest2
{
    public static void main(String[] args)
    {
        // 設置配置屬性
        Properties props = new Properties();
        props.put("metadata.broker.list", "130.51.23.95:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // key.serializer.class默認為serializer.class
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        // 可選配置,如果不配置,則使用默認的partitioner
//        props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
        // 觸發acknowledgement機制,否則是fire and forget,可能會引起數據丟失
        // 值為0,1,-1,可以參考
        // http://kafka.apache.org/08/configuration.html
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        
        // 創建producer
        Producer<String, String> producer = new Producer<String, String>(config);
        // 產生並發送消息
        long start = System.currentTimeMillis();
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "test123", "test123");
        producer.send(data);
        System.out.println("耗時:" + (System.currentTimeMillis() - start));
        // 關閉producer
        producer.close();
    }
}

運行之后,報一下錯誤:

 

 

 

解決辦法:

修改  config/server.properties文件(多節點的話,每個節點都修改一下)

 

上面的端口必須放開,並且寫你的真實IP地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM