最開始在本機搭建了kafka偽集群,本地 producer 客戶端成功發布消息至 broker。隨后在服務器(虛擬機)上搭建了 kafka 集群,在本機連接該集群,producer 卻無法發布消息到 broker(奇怪也沒有拋錯)。最開始懷疑是 iptables 沒開放,於是開放端口,結果還不行(又開始懷疑是代碼問題、版本問題等等,倒騰了很久)。后來打印了返回信息,發現報了異常org.apache.kafka.common.errors.TimeoutException: Batch Expired
參考了網上的一些解決方案,最終鎖定修改如下配置信息。
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092
以上說的就是 advertised.listeners 是 broker 給 producer 和 consumer 連接使用的,如果沒有設置,就使用 listeners,而如果 host_name 沒有設置的話,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主機名。
修改方法:
修改server.properties 如下配置
修改前:
listeners=PLAINTEXT://:9092
修改后
listeners=PLAINTEXT://192.168.x.100:9092
advertised.listeners=PLAINTEXT://10.10.xx.252:9092
其中192.168.x.100是虛擬機內網地址
10.10.xx.252是安裝虛擬機的服務器映射的外網地址
如果是虛擬機還需要配置虛擬機與主機的端口映射以及主機的入棧、出棧協議。
修改后重啟kafka服務。