Java操作Kafka執行不成功的解決方法,Kafka Broker Advertised.Listeners屬性的設置


創建Spring Boot項目繼承Kafka,向Kafka發送消息始終不成功。具體項目配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lodestone</groupId>
    <artifactId>lodestone-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>lodestone-kafka</name>
    <description>Lodestone kafka</description>
    <parent>
         <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
         <version>1.5.12.RELEASE</version>
         <relativePath />
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.8</java.version>
    </properties>
    <dependencies>
         <dependency>
            <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter</artifactId>
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
         </dependency>
         
         <!--  https://mvnrepository.com/artifact/com.alibaba/fastjson -->
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.47</version>
         </dependency>
         <!--  https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <!--  https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j -->
         <dependency>
             <groupId>org.slf4j</groupId>
              <artifactId>log4j-over-slf4j</artifactId>
         </dependency>
         
    </dependencies>
    <build>
         <plugins>
             <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
             </plugin>
         </plugins>
    </build>
</project>
 
 
application.yml配置:
spring:
  kafka:
    bootstrap-servers:
      - 192.168.52.131:9092
    consumer:
      auto-offset-reset: earliest
      group-id: console-consumer-53989
      key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer:org.apache.kafka.common.serialization.StringSerializer
      value-serializer:org.apache.kafka.common.serialization.StringSerializer
logging:
  level:
    root: DEBUG
    org:
      springframework: DEBUG
      mybatis: DEBUG
 
 
生產者代碼:
package com.lodestone.kafka.producer;
 
import java.util.Date;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
 
import com.alibaba.fastjson.JSON;
import com.lodestone.kafka.message.LodestoneMessage;
 
@Component
public class Sender {
    
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
    public void sendMessage() {
        
        LodestoneMessage message = new LodestoneMessage();
        message.setId(UUID.randomUUID().toString().replaceAll("-", ""));
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}
 
 
 
消息代碼定義:
package com.lodestone.kafka.message;
 
import java.io.Serializable;
import java.util.Date;
 
public class LodestoneMessage implements Serializable {
 
    private static final long serialVersionUID = -6847574917429814430L;
 
    private String id;
    private String msg;
    private Date sendTime;
 
    public String getId() {
        return id;
    }
 
    public void setId(String id) {
        this.id = id;
    }
 
    public String getMsg() {
        return msg;
    }
 
    public void setMsg(String msg) {
        this.msg = msg;
    }
 
    public Date getSendTime() {
        return sendTime;
    }
 
    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
 
}
 
 
 
Spring Boot應用啟動類:
package com.lodestone.kafka;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
 
import com.lodestone.kafka.producer.Sender;
 
@SpringBootApplication
public class LodestoneKafkaApplication {    
 
    public static void main(String[] args) throws InterruptedException {
        
        ApplicationContext app = SpringApplication.run(LodestoneKafkaApplication.class, args);
        //測試
        for(int i=0; i<5; i++) {
            Sender sender = app.getBean(Sender.class);
            sender.sendMessage();
            Thread.sleep(500);
        }
    }
}
 
 
 
將應用的日志調整位Debug級別,啟動應用時看到如下報錯:
2018-04-21 16:05:10.755 DEBUG 10272 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Connection with localhost/127.0.0.1 disconnected
 
java.net.ConnectException : Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_111]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_111]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) [kafka-clients-0.10.1.1.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]
 
 
可以看到報錯第一句顯示:Connection with localhost/127.0.0.1 disconnected
但是我明明在application.yml中配置了我的Kafka Server的地址是: 192.168.52.131:9092,而在實際連接kafka服務器時卻使用的localhost/127.0.0.1這個地址,所以導致無法連接kafka Server。
 
經過百度,得知,在設置Kafka的時候,需要設置advertised.listeners這個屬性。
 
該屬性在config/server.properties中的描述如下:
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# advertised.listeners=PLAINTEXT://:your.host.name:9092
 
翻譯過來就是hostname和端口是用來建議給生產者和消費者使用的,如果沒有設置,將會使用 listeners的配置,如果listeners也沒有配置,將使用java.net.InetAddress.getCanonicalHostName()來獲取這個hostname和port,對於ipv4,基本就是localhost了。
 
"PLAINTEXT"表示協議,可選的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示對所有的網絡接口有效,如果hostname為空表示只對默認的網絡接口有效
也就是說如果你沒有配置advertised.listeners,就使用listeners的配置通告給消息的生產者和消費者,這個過程是在生產者和消費者獲取源數據(metadata)。
 
因此重新設置 advertised.listeners為如下:
advertised.listeners=PLAINTEXT://192.168.52.131:9092
 
 
需要注意的是,如果Kafka有多個節點,那么需要每個節點都按照這個節點的實際hostname和port情況進行設置。
 
每個節點都設置之后,再重新啟動Spring Boot應用,則能夠正常連接Kafka Server,並能夠正常發送消息了。
 
 
 
部分援引和參考:
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM