Docker部署kafka集群


上一篇,我們搭建了 kafka 單節點 回顧,現在我們要搭建集群。

在開始之前,先把單節點那套用 docker-compose -f docker-compose-single-broker.yml down 清理掉

1.定義 docker-compose.yml

KAFKA_ADVERTISED_HOST_NAME 不建議使用了,因為它對應 server.properties 中的 advertised.host.name,而這個屬性已經是 DEPRECATED

參考自 http://kafka.apache.org/0100/documentation.html#brokerconfigs

作為替代可以使用 KAFKA_ADVERTISED_LISTENERS,該環境變量對應 server.properties 中的 advertised.listeners.

相信你們和我有一樣的疑惑, 戳-> kafka listeners 和 advertised.listeners 的區別及應用

我的宿主機的IP地址是 10.24.99.195,我的 docker-compose.yml 文件內容如下:

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    restart: always
  kafka1:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    container_name: kafka1
    ports:
      - "9091:9091"
    environment:
      HOSTNAME: kafka1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9091
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    extra_hosts:
      kafka1: 10.24.99.195
  kafka2:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    container_name: kafka2
    ports:
      - "9092:9092"
    environment:
      HOSTNAME: kafka2
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    extra_hosts:
      kafka2: 10.24.99.195
  kafka3:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    container_name: kafka3
    ports:
      - "9093:9093"
    environment:
      HOSTNAME: kafka3
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    extra_hosts:
      kafka3: 10.24.99.195

環境變量 KAFKA_LISTENERS 的 INSIDE 和 OUTSIDE 的端口必須不同。

接着,cd進入docker-compose.yml所在的工作目錄,運行命令 docker-compose up -d,此時默認就是使用該文件。

  • container_name 屬性:使用 docker ps 命令看到 NAMES 一列,將以你的命名相同。
  • HOSTNAME 和 extra_hosts 的組合使用:在容器的 /etc/hosts 中增加一條記錄

例如,docker exec -it kafka1 cat /etc/hosts 命令查看容器 kafka1 的 /etc/hosts 文件,多出一條映射:

10.24.99.195    kafka1

參考自 自定義Docker容器的 hostname

2.容器內驗證

進入容器的方法,就不啰嗦了,不了解的可以百度。(docker psdocker exec -it CONTAINER_ID bash

1、創建一個主題 mytopic

kafka-topics.sh --create --topic mytopic --partitions 2 --zookeeper kafka_zookeeper_1:2181 --replication-factor 2

2、打開一個窗口,進入容器作生產者:

kafka-console-producer.sh --topic=mytopic --broker-list kafka1:9091,kafka2:9092,kafka3:9093

3、再打開一個窗口,進入容器作消費者:

kafka-console-consumer.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --from-beginning --topic mytopic

在kafka集群內部,我們使用的集群字符串都是 kafka1:9091,kafka2:9092,kafka3:9093

3. SpringBoot應用

在網站 start.springboot.io 初始化項目:

  • Dependencies:選擇 Spring Web,以及 Spring For Apache Kafka
  • 分別生成一個 kafka-producer 項目和一個 kafka-consumer 項目

3.1 消費端:

ReceiverService.java:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.Optional;

@Service
public class ReceiverService {

    @KafkaListener(topics = {"news"}, groupId = "agent")
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println("receiver record = " + record);
            System.out.println("receiver message = " + message.get());
        }
    }
}

application.properties:

spring.kafka.bootstrap-servers=kafka2:9090,kafka1:9091,kafka3:9093

3.2 生產端:

SendController.java

import com.alibaba.fastjson.JSON;
import com.example.kafka.producer.dto.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Date;

@Controller
@RequestMapping
public class SendController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    @ResponseBody
    public String send(@RequestParam String msg) {
        Message message = new Message();

        message.setId(System.currentTimeMillis());
        message.setMessage(msg);
        message.setSendAt(new Date());

        kafkaTemplate.send("news", JSON.toJSONString(message));
        return JSON.toJSONString(message);
    }
}

application.properties:

spring.kafka.bootstrap-servers=kafka2:9090,kafka1:9091,kafka3:9093

另外,pom.xml 需要多一個 fastjson 的依賴:

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.76</version>
</dependency>

3.3 測試中的問題

先啟動生產端,然后訪問 http://localhost:8080/send?msg=hello,你會遇到這個異常:

org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

在你的宿主機上修改 hosts 文件,在末尾追加

10.24.99.195 kafka1
10.24.99.195 kafka2
10.24.99.195 kafka3

10.24.99.195 是我當前的主機IP,你需要改成你的主機IP

參考文檔

  • Kafka Document -- Broker Config官方文檔

  • docker快速搭建kafka集群 閱讀

  • docker啟動容器出現問題 進行日志查看 閱讀
    docker logs 命令可以幫助尋找容器Exited的異常原因

  • 如果你不會用 kafka-docker,看這里 閱讀
    這篇文章結尾的答疑,畫的kafka網絡拓撲還不錯。

  • kafka listeners 和 advertised.listeners 的區別及應用 閱讀

  • 用 Docker 快速搭建 Kafka 集群 閱讀
    這篇中,也和我的想法一樣,提到了使用 SpringBoot 應用

  • 解決Docker容器連接 Kafka 連接失敗問題 閱讀


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM