kafka通過控制台模擬消息發送和消息接收正常,但是通過javaAPI操作生產者發送消息不成功 消費者接收不到數據解決方案?
1.問題排查
(1)首先通過在服務器上使用命令行來模擬生產、消費數據,發現生產、消費數據正常。測試如下:
#生產者
bin/kafka-console-consumer.sh --bootstrap-server ***.***.***.***:9092 --topic test
其中 ***.***.***
代表的是 kafka broker的地址.
再開一個窗口啟動消費者:
#消費者
bin/kafka-console-consumer.sh --bootstrap-server ***.***.***.***:9092 --topic test
在生產端隨機輸入數據,在消費者端接收正常,如下圖所示:
(2)那么問題可能出現在client和服務端的連接上了,經過查資料分析,看到網上有如下解決方案:
將kafka/config/server.properties文件中advertised.listeners改為如下屬性。192.168.75.137是我虛擬機的IP。改完后重啟,OK了。Java端的代碼終於能通信了
advertised.listeners=PLAINTEXT://192.168.75.137:9092
advertised.listeners上的注釋是這樣的:
#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
意思就是說:hostname、port都會廣播給producer、consumer。如果你沒有配置了這個屬性的話,則使用listeners的值,如果listeners的值也沒有配置的話,則使用
java.net.InetAddress.getCanonicalHostName()返回值(這里也就是返回localhost了,也就是你的broker的hostname)。
這么說也沒毛病,但是我本身也做了hostname的映射,即使返回hostname,我這邊也能解析的,不用這樣配置也ok啊,問題肯定不是這里。然后我繼續排查。
(3) 通過getCanonicalHostName()方法排查
找到的測試語句如下:
import java.net.*;
public class TestDemo {
public static void outHostName(InetAddress address, String s)
{
System.out.println("通過" + s + "創建InetAddress對象");
System.out.println("主 機 名:" + address.getCanonicalHostName());
System.out.println("主機別名:" + address.getHostName());
System.out.println("");
}
public static void main(String[] args) throws Exception
{
// outHostName(InetAddress.getLocalHost(), "getLocalHost方法");
outHostName(InetAddress.getByName("***.***.***.***"),"***.***.***.***");
// outHostName(InetAddress.getByName("www.baidu.com"), "www.baidu.com");
}
}
2.問題解決
***.***.***.***
處為我的kafka broker的地址,結果輸出結果為映射的其他的hostname,果然是配的時候大意了,在客戶端查找對應的ip的映射,果然有問題,把對應的ip和hostname進行了更正,再次運行java代碼生產消息后,kafka端成功消費,問題解決。