解決阿里雲Flink連接Kafka報UnknownHostException的問題


解決阿里雲Flink連接Kafka報UnknownHostException的問題

問題描述

最近遇到一個較為麻煩的問題,寫篇文章記錄下解決問題的思路。

在使用阿里雲Flink時,想連接公司之前自建的Kafka服務,但報UnknownHostException。這是由於Kafka通過advertised.listeners配置了域名,一般的解決辦法是修改本機的/etc/hosts文件,添加域名和對應的IP,這樣就能解析成功。但棘手的地方在於阿里雲的Flink是雲端服務,無法修改/etc/hosts,所以導致該Flink無法正常訪問Kafka。后面我們也與公司運維及阿里技術支持進行了交流,阿里給出的解決方案是使用PrivateZone服務,提供一個私有DNS解析服務並綁定對應的VPC。然而阿里的PrivateZone服務也存在一個問題,它的域名只支持FQDN,也就是全路徑的域名格式,但我們自建的Kakfa使用了簡單Hostname形式,如:cx-kafka1cx-kafka2等,這樣導致阿里的方案也被pass了。該集群由於有不少地方在使用,因此配置不能修改,這個問題也被擱置了。最近花了些時間,順着之前的思路解決了該問題。

原理及解決思路

  • 我們知道bootstrap.servers配置的地址信息,只是Kafka用來獲取連接引導信息的地址,是用於發現Kafka完整集群信息的,而連接真正Kafka服務的地址是advertised.listeners廣播到Zookeeper的。所以為了正常連接Kafka,客戶端必須要能解析advertised.listeners的地址。既然阿里的PrivateZone無法配置我們這樣的域名格式,我們能否搭建自己的DNS Server解析該地址,並讓阿里的Flink服務使用我們的DNS Server呢?

DNS Server很好辦,由於沒有大量的解析需求,我采用了短小精悍的dnsmasq,配置起來也很簡單。

#dnsmasq config, for a complete example, see:
#  http://oss.segetech.com/intra/srv/dnsmasq.conf
log-queries
address=/cx-kafka1/xxx.xxx.xxx.xxx
address=/cx-kafka2/xxx.xxx.xxx.xxx
.........
.........

測試一下:

然后,向Flink集群開放UDP 53端口,第一步就算OK了。

  • 接下來,就是Flink服務如何去使用該DNS Server,通過調研我們了解到,JVM提供了兩個參數sun.net.spi.nameservice.provider.<n>sun.net.spi.nameservice.nameservers,可以指定JVM解析域名的方式。通過在系統管理 > 作業模板 > Flink配置中增加下面的配置,將上面自建的DNS Server地址指定給JVM。
env.java.opts: >-
  -Dsun.net.spi.nameservice.provider.1=default
  -Dsun.net.spi.nameservice.provider.2=dns,sun
  -Dsun.net.spi.nameservice.nameservers=xxx.xxx.xxx.xxx

由於JDK7之后是鏈式解析,因此默認的解析方式放在前面,如果默認的DNS解析失敗就會使用后面我們自定義的DNS Server。經過測試,阿里的Flink正常連接上了我們自己Kafka集群。這里還有一點需要注意,如果上述方法沒有生效,Per-Job集群需要新建作業,而Session集群也需要按上面描述重新配置下,這樣新的JVM配置才會生效。

上述修改JVM自定義DNS解析的方式,其實也可以延伸到其他不方便配置hosts的環境,如Docker容器中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM