上一篇,我們搭建了 kafka 單節點 回顧,現在我們要搭建集群。
在開始之前,先把單節點那套用
docker-compose -f docker-compose-single-broker.yml down清理掉
1.定義 docker-compose.yml
KAFKA_ADVERTISED_HOST_NAME 不建議使用了,因為它對應 server.properties 中的 advertised.host.name,而這個屬性已經是 DEPRECATED
參考自 http://kafka.apache.org/0100/documentation.html#brokerconfigs
作為替代可以使用 KAFKA_ADVERTISED_LISTENERS,該環境變量對應 server.properties 中的 advertised.listeners.
相信你們和我有一樣的疑惑, 戳-> kafka listeners 和 advertised.listeners 的區別及應用
我的宿主機的IP地址是 10.24.99.195,我的 docker-compose.yml 文件內容如下:
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
restart: always
kafka1:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka1
ports:
- "9091:9091"
environment:
HOSTNAME: kafka1
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9091
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
extra_hosts:
kafka1: 10.24.99.195
kafka2:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka2
ports:
- "9092:9092"
environment:
HOSTNAME: kafka2
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
extra_hosts:
kafka2: 10.24.99.195
kafka3:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka3
ports:
- "9093:9093"
environment:
HOSTNAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
extra_hosts:
kafka3: 10.24.99.195
環境變量 KAFKA_LISTENERS 的 INSIDE 和 OUTSIDE 的端口必須不同。
接着,cd進入docker-compose.yml所在的工作目錄,運行命令 docker-compose up -d,此時默認就是使用該文件。
- container_name 屬性:使用
docker ps命令看到 NAMES 一列,將以你的命名相同。 - HOSTNAME 和 extra_hosts 的組合使用:在容器的 /etc/hosts 中增加一條記錄
例如,docker exec -it kafka1 cat /etc/hosts 命令查看容器 kafka1 的 /etc/hosts 文件,多出一條映射:
10.24.99.195 kafka1
2.容器內驗證
進入容器的方法,就不啰嗦了,不了解的可以百度。(docker ps 和 docker exec -it CONTAINER_ID bash)
1、創建一個主題 mytopic:
kafka-topics.sh --create --topic mytopic --partitions 2 --zookeeper kafka_zookeeper_1:2181 --replication-factor 2
2、打開一個窗口,進入容器作生產者:
kafka-console-producer.sh --topic=mytopic --broker-list kafka1:9091,kafka2:9092,kafka3:9093
3、再打開一個窗口,進入容器作消費者:
kafka-console-consumer.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --from-beginning --topic mytopic
在kafka集群內部,我們使用的集群字符串都是 kafka1:9091,kafka2:9092,kafka3:9093
3. SpringBoot應用
在網站 start.springboot.io 初始化項目:
- Dependencies:選擇 Spring Web,以及 Spring For Apache Kafka
- 分別生成一個 kafka-producer 項目和一個 kafka-consumer 項目

3.1 消費端:
ReceiverService.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class ReceiverService {
@KafkaListener(topics = {"news"}, groupId = "agent")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
System.out.println("receiver record = " + record);
System.out.println("receiver message = " + message.get());
}
}
}
application.properties:
spring.kafka.bootstrap-servers=kafka2:9090,kafka1:9091,kafka3:9093
3.2 生產端:
SendController.java
import com.alibaba.fastjson.JSON;
import com.example.kafka.producer.dto.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
@Controller
@RequestMapping
public class SendController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String msg) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMessage(msg);
message.setSendAt(new Date());
kafkaTemplate.send("news", JSON.toJSONString(message));
return JSON.toJSONString(message);
}
}
application.properties:
spring.kafka.bootstrap-servers=kafka2:9090,kafka1:9091,kafka3:9093
另外,pom.xml 需要多一個 fastjson 的依賴:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
3.3 測試中的問題
先啟動生產端,然后訪問 http://localhost:8080/send?msg=hello,你會遇到這個異常:
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
在你的宿主機上修改 hosts 文件,在末尾追加
10.24.99.195 kafka1
10.24.99.195 kafka2
10.24.99.195 kafka3
10.24.99.195 是我當前的主機IP,你需要改成你的主機IP
參考文檔
-
Kafka Document -- Broker Config官方文檔
-
docker快速搭建kafka集群 閱讀
-
docker啟動容器出現問題 進行日志查看 閱讀
docker logs 命令可以幫助尋找容器Exited的異常原因 -
如果你不會用 kafka-docker,看這里 閱讀
這篇文章結尾的答疑,畫的kafka網絡拓撲還不錯。 -
kafka listeners 和 advertised.listeners 的區別及應用 閱讀
-
用 Docker 快速搭建 Kafka 集群 閱讀
這篇中,也和我的想法一樣,提到了使用 SpringBoot 應用 -
解決Docker容器連接 Kafka 連接失敗問題 閱讀
