Kubernetes 部署 Kafka & Zookeeper & Kafka Manager


系統環境:

一、簡介

Kafka 簡介

       Kafka 是由 Apache 軟件基金會開發的一個開源流處理平台,由 Scala 和 Java 編寫。它是一個分布式、支持分區的的、多副本,基於 zookeeper 協調的分布式消息系統。它最大特性是可以實時的處理大量數據以滿足各種需求場景,比如基於 Hadoop 的批處理系統、低延時的實時系統、storm/Spark 流式處理引擎,web/nginx 日志,訪問日志,消息服務等等。

Zookeeper 簡介

       ZooKeeper 是一種分布式協調服務,用於管理大型主機。在分布式環境中協調和管理服務是一個復雜的過程。ZooKeeper 通過其簡單的架構和 API 解決了這個問題。 ZooKeeper 允許開發人員專注於核心應用程序邏輯,而不必擔心應用程序的分布式特性。

       Kafka 中主要利用 zookeeper 解決分布式一致性問題。Kafka 使用 Zookeeper 的分布式協調服務將生產者,消費者,消息儲存結合在一起。同時借助 Zookeeper,Kafka 能夠將生產者、消費者和 Broker 在內的所有組件在無狀態的條件下建立起生產者和消費者的訂閱關系,實現生產者的負載均衡。

Kafka Manager 簡介

       Kafka Manager 是目前最受歡迎的 Kafka 集群管理工具,最早由雅虎開源,用戶可以在 Web 界面執行一些簡單的集群管理操作。

支持以下功能:

  • 管理 Kafka 集群
  • 方便集群狀態監控 (包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
  • 方便選擇分區副本
  • 配置分區任務,包括選擇使用哪些 Brokers
  • 可以對分區任務重分配
  • 提供不同的選項來創建及刪除 Topic
  • Topic List 會指明哪些topic被刪除
  • 批量產生分區任務並且和多個topic和brokers關聯
  • 批量運行多個主題對應的多個分區
  • 向已經存在的主題中添加分區
  • 對已經存在的 Topic 修改配置
  • 可以在 Broker Level 和 Topic Level 的度量中啟用 JMX Polling 功能
  • 可以過濾在 ZooKeeper 上沒有 ids/owners/offsets/directories 的 consumer

二、部署過程

這個流程需要部署三個組件,分別為 Zookeeper、Kafka、Kafka Manager:

  • (1)、Zookeeper: 首先部署 Zookeeper,方便后續部署 Kafka 節點注冊到 Zookeeper,用 StatefulSet 方式部署三個節點。
  • (2)、Kafka: 第二個部署的是 Kafka,設置環境變量來指定 Zookeeper 地址,用 StatefulSet 方式部署。
  • (3)、Kafka Manager: 最后部署的是 Kafka Manager,用 Deployment 方式部署,然后打開 Web UI 界面來管理、監控 Kafka。

三、Kubernetes 部署 Zookeeper & Kafka & Kafka Manager

1、創建 StorageClass

由於都是使用 StatefulSet 方式部署的有狀態服務,所以 Kubernetes 集群需要提前設置一個 StorageClass 方便后續部署時指定存儲分配(如果想指定為已經存在的 StorageClass 創建 PV 則跳過此步驟)。

此處用的是 NFS 存儲驅動,如果是其它存儲需要提前設置好相關配置

創建 StorageClass 部署文件

nfs-storage.yaml

apiVersion: storage.k8s.io/v1 kind: StorageClass metadata:  name: nfs-storage provisioner: nfs-client #動態卷分配服務指定的名稱 parameters:  archiveOnDelete: "true" #設置為"false"時刪除PVC不會保留數據,"true"則保留數據 mountOptions:  - hard #指定為硬掛載方式  - nfsvers=4 #指定NFS版本 

部署 StorageClass

$ kubectl apply -f nfs-storage.yaml

2、Kubernetes 部署 Zookeeper

創建 Zookeeper 部署文件

zookeeper.yaml

#部署 Service Headless,用於Zookeeper間相互通信 apiVersion: v1 kind: Service metadata:  name: zookeeper-headless  labels:  app: zookeeper spec:  type: ClusterIP  clusterIP: None  publishNotReadyAddresses: true  ports:  - name: client  port: 2181  targetPort: client  - name: follower  port: 2888  targetPort: follower  - name: election  port: 3888  targetPort: election  selector:  app: zookeeper --- #部署 Service,用於外部訪問 Zookeeper apiVersion: v1 kind: Service metadata:  name: zookeeper  labels:  app: zookeeper spec:  type: ClusterIP  ports:  - name: client  port: 2181  targetPort: client  - name: follower  port: 2888  targetPort: follower  - name: election  port: 3888  targetPort: election  selector:  app: zookeeper --- apiVersion: apps/v1 kind: StatefulSet metadata:  name: zookeeper  labels:  app: zookeeper spec:  serviceName: zookeeper-headless  replicas: 3  podManagementPolicy: Parallel  updateStrategy:  type: RollingUpdate  selector:  matchLabels:  app: zookeeper  template:  metadata:  name: zookeeper  labels:  app: zookeeper  spec:  securityContext:  fsGroup: 1001  containers:  - name: zookeeper  image: docker.io/bitnami/zookeeper:3.4.14-debian-9-r25  imagePullPolicy: IfNotPresent  securityContext:  runAsUser: 1001  command:  - bash  - -ec  - | # Execute entrypoint as usual after obtaining ZOO_SERVER_ID based on POD hostname HOSTNAME=`hostname -s` if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then ORD=${BASH_REMATCH[2]} export ZOO_SERVER_ID=$((ORD+1)) else echo "Failed to get index from hostname $HOST" exit 1 fi . /opt/bitnami/base/functions . /opt/bitnami/base/helpers print_welcome_page . /init.sh nami_initialize zookeeper exec tini -- /run.sh  resources:  limits:  cpu: 500m  memory: 512Mi  requests:  cpu: 250m  memory: 256Mi  env:  - name: ZOO_PORT_NUMBER  value: "2181"  - name: ZOO_TICK_TIME  value: "2000"  - name: ZOO_INIT_LIMIT  value: "10"  - name: ZOO_SYNC_LIMIT  value: "5"  - name: ZOO_MAX_CLIENT_CNXNS  value: "60"  - name: ZOO_SERVERS  value: " zookeeper-0.zookeeper-headless:2888:3888, zookeeper-1.zookeeper-headless:2888:3888, zookeeper-2.zookeeper-headless:2888:3888 "  - name: ZOO_ENABLE_AUTH  value: "no"  - name: ZOO_HEAP_SIZE  value: "1024"  - name: ZOO_LOG_LEVEL  value: "ERROR"  - name: ALLOW_ANONYMOUS_LOGIN  value: "yes"  ports:  - name: client  containerPort: 2181  - name: follower  containerPort: 2888  - name: election  containerPort: 3888  livenessProbe:  tcpSocket:  port: client  initialDelaySeconds: 30  periodSeconds: 10  timeoutSeconds: 5  successThreshold: 1  failureThreshold: 6  readinessProbe:  tcpSocket:  port: client  initialDelaySeconds: 5  periodSeconds: 10  timeoutSeconds: 5  successThreshold: 1  failureThreshold: 6  volumeMounts:  - name: data  mountPath: /bitnami/zookeeper  volumeClaimTemplates:  - metadata:  name: data  annotations:  spec:  storageClassName: nfs-storage #指定為上面創建的 storageclass  accessModes:  - ReadWriteOnce  resources:  requests:  storage: 5Gi 

部署 Zookeeper

  • -n:指定應用啟動的 Namespace,替換自己集群的 Namespace
$ kubectl apply -f zookeeper.yaml -n mydlqcloud

3、Kubernetes 部署 Kafka

創建 Kafka 部署文件

kafka.yaml

#部署 Service Headless,用於Kafka間相互通信 apiVersion: v1 kind: Service metadata:  name: kafka-headless  labels:  app: kafka spec:  type: ClusterIP  clusterIP: None  ports:  - name: kafka  port: 9092  targetPort: kafka  selector:  app: kafka --- #部署 Service,用於外部訪問 Kafka apiVersion: v1 kind: Service metadata:  name: kafka  labels:  app: kafka spec:  type: ClusterIP  ports:  - name: kafka  port: 9092  targetPort: kafka  selector:  app: kafka --- apiVersion: apps/v1 kind: StatefulSet metadata:  name: "kafka"  labels:  app: kafka spec:  selector:  matchLabels:  app: kafka  serviceName: kafka-headless  podManagementPolicy: "Parallel"  replicas: 3  updateStrategy:  type: "RollingUpdate"  template:  metadata:  name: "kafka"  labels:  app: kafka  spec:  securityContext:  fsGroup: 1001  runAsUser: 1001  containers:  - name: kafka  image: "docker.io/bitnami/kafka:2.3.0-debian-9-r4"  imagePullPolicy: "IfNotPresent"  resources:  limits:  cpu: 500m  memory: 512Mi  requests:  cpu: 250m  memory: 256Mi  env:  - name: MY_POD_IP  valueFrom:  fieldRef:  fieldPath: status.podIP  - name: MY_POD_NAME  valueFrom:  fieldRef:  fieldPath: metadata.name  - name: KAFKA_CFG_ZOOKEEPER_CONNECT  value: "zookeeper" #Zookeeper Service 名稱  - name: KAFKA_PORT_NUMBER  value: "9092"  - name: KAFKA_CFG_LISTENERS  value: "PLAINTEXT://:$(KAFKA_PORT_NUMBER)"  - name: KAFKA_CFG_ADVERTISED_LISTENERS  value: 'PLAINTEXT://$(MY_POD_NAME).kafka-headless:$(KAFKA_PORT_NUMBER)'  - name: ALLOW_PLAINTEXT_LISTENER  value: "yes"  - name: KAFKA_HEAP_OPTS  value: "-Xmx512m -Xms512m"  - name: KAFKA_CFG_LOGS_DIRS  value: /opt/bitnami/kafka/data  - name: JMX_PORT  value: "9988"  ports:  - name: kafka  containerPort: 9092  livenessProbe:  tcpSocket:  port: kafka  initialDelaySeconds: 10  periodSeconds: 10  timeoutSeconds: 5  successThreshold: 1  failureThreshold: 2  readinessProbe:  tcpSocket:  port: kafka  initialDelaySeconds: 5  periodSeconds: 10  timeoutSeconds: 5  successThreshold: 1  failureThreshold: 6  volumeMounts:  - name: data  mountPath: /bitnami/kafka  volumeClaimTemplates:  - metadata:  name: data  spec:  storageClassName: nfs-storage #指定為上面創建的 storageclass  accessModes:  - "ReadWriteOnce"  resources:  requests:  storage: 5Gi 

部署 Kafka

  • -n:指定應用啟動的 Namespace,替換自己集群的 Namespace
$ kubectl apply -f kafka.yaml -n mydlqcloud

4、Kubernetes 部署 Kafka Manager

創建 Kafka Manager 部署文件

kafka-manager.yaml

apiVersion: v1 kind: Service metadata:  name: kafka-manager  labels:  app: kafka-manager spec:  type: NodePort  ports:  - name: kafka  port: 9000  targetPort: 9000  nodePort: 30900  selector:  app: kafka-manager --- apiVersion: apps/v1 kind: Deployment metadata:  name: kafka-manager  labels:  app: kafka-manager spec:  replicas: 1  selector:  matchLabels:  app: kafka-manager  template:  metadata:  labels:  app: kafka-manager  spec:  containers:  - name: kafka-manager  image: zenko/kafka-manager:1.3.3.22  imagePullPolicy: IfNotPresent  ports:  - name: kafka-manager  containerPort: 9000  protocol: TCP  env:  - name: ZK_HOSTS  value: "zookeeper:2181"  livenessProbe:  httpGet:  path: /api/health  port: kafka-manager  readinessProbe:  httpGet:  path: /api/health  port: kafka-manager  resources:  limits:  cpu: 500m  memory: 512Mi  requests:  cpu: 250m  memory: 256Mi 

部署 Kafka Manager

  • -n:指定應用啟動的 Namespace,替換自己集群的 Namespace
$ kubectl apply -f kafka-manager.yaml -n mydlqcloud

四、進入 Kafka Manager 管理 Kafka 集群

這里的 Kubernetes 集群地址為:192.168.2.11,並且在上面設置 Kafka-Manager 網絡策略為 NodePort 方式,且設置端口為 30900,這里輸入地址:http://192.168.2.11:30900 訪問 Kafka Manager。

進入后先配置 Kafka Manager,增加一個 Zookeeper 地址。

配置三個必填參數:

  • Cluster Name:自定義一個名稱,任意輸入即可。
  • Zookeeper Hosts:輸入 Zookeeper 地址,這里設置為 Zookeeper 服務名+端口。
  • Kafka Version:選擇 kafka 版本。

配置完成后就可以看到新增了一條記錄,點進去就可以查看相關集群信息。

五、附錄:鏡像參數配置

Zookeeper 鏡像可配置變量參數

參數名稱 描述
ZOO_PORT_NUMBER Zookeeper客戶端端口。默認值:2181
ZOO_SERVER_ID 集合中服務器的ID。默認值:1
ZOO_TICK_TIME ZooKeeper用於心跳的基本時間單位(以毫秒為單位)。默認值:2000
ZOO_INIT_LIMIT ZooKeeper用於限制仲裁中ZooKeeper服務器連接到領導者的時間長度。默認值:10
ZOO_SYNC_LIMIT 服務器與領導者的過時距離。默認值:5
ZOO_MAX_CLIENT_CNXNS 限制單個客戶端可能對ZooKeeper集合的單個成員進行的並發連接數。默認60
ZOO_SERVERS 逗號,空格或冒號分隔的服務器列表。示例:zoo1:2888:3888,zoo2:2888:3888。沒有默認值。
ZOO_CLIENT_USER 將使用Zookeeper客戶端進行身份驗證的用戶。默認值:無默認值。
ZOO_CLIENT_PASSWORD 將使用Zookeeper客戶端進行身份驗證的密碼。沒有默認值。
ZOO_SERVER_USERS 逗號,分號或空格分隔的要創建的用戶列表。示例:user1,user2,admin。沒有默認值
ZOO_SERVER_PASSWORDS 逗號,半精或空格分隔的密碼列表,在創建時分配給用戶。示例:pass4user1,pass4user2,pass4admin。沒有默認值
ZOO_ENABLE_AUTH 啟用Zookeeper身份驗證。它使用SASL / Digest-MD5。默認值:否
ZOO_RECONFIG_ENABLED 啟用ZooKeeper動態重配置。默認值:否
ZOO_HEAP_SIZE Java堆選項(Xmx和XM)的大小(MB)。如果通過Xmx配置Xm,則忽略此env var JVMFLAGS。默認值:1024
ZOO_LOG_LEVEL Zookeeper日志級別。可用級別為:ALL,DEBUG,INFO,WARN,ERROR,FATAL,OFF,TRACE。默認值:INFO
ALLOW_ANONYMOUS_LOGIN 如果設置為true,則允許接受來自未經身份驗證的用戶的連接。默認值:否
JVMFLAGS ZooKeeper進程的默認JVMFLAGS。沒有默認值

Kafka 鏡像可配置變量參數

參數名稱 描述
KAFKA_CFG_ZOOKEEPER_CONNECT Zookeeper集群地址,例如”zookeeper:2181”
KAFKA_PORT_NUMBER Kafka端口,例如”9092”
ALLOW_PLAINTEXT_LISTENER 是否啟用Plaintext偵聽器,默認”false”
KAFKA_CFG_LISTENERS Kafka 監聽列表,broker對外提供服務時綁定的IP和端口
KAFKA_CFG_ADVERTISED_LISTENERS 給客戶端用的發布至zookeeper的監聽,broker 會上送此地址到zookeeper,zookeeper會將此地址提供給消費者,消費者根據此地址獲取消息。
KAFKA_HEAP_OPTS Java JVM堆內存大小配置,例如”-Xmx512m -Xms512m”
KAFKA_CFG_LOGS_DIRS Kafka 日志存儲目錄
JMX_PORT JMX端口配置,設置此參數才能開啟JMX,例如設置為”9988”
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM 是否啟用基於主機名的認證認證,例如想開啟可以設置為”https”
KAFKA_CFG_BROKER_ID Broker ID值,每個節點值都唯一。默認為”-1”,自動生成BrokerId
KAFKA_CFG_DELETE_TOPIC_ENABLE 是否允許刪除Topic,默認”false”
KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES 此項配置指定時間間隔,強制進行fsync日志,默認”10000”
KAFKA_CFG_LOG_FLUSH_INTERVAL_MS 此項配置用來置頂強制進行fsync日志到磁盤的時間間隔”1000”
KAFKA_CFG_LOG_RETENTION_BYTES 每個Topic下每個Partition保存數據的總量,超過限制都會刪除一個段文件,默認”1073741824”
KAFKA_CFG_LOG_RETENTION_CHECK_INTERVALS_MS 檢查日志分段文件的間隔時間,以確定是否文件屬性是否到達刪除要求,默認”300000”
KAFKA_CFG_LOG_RETENTION_HOURS 每個日志文件刪除之前保存的時間,默認數據保存時間對所有topic都一樣,默認”168”
KAFKA_CFG_LOG_MESSAGE_FORMAT_VERSION 指定broker將用於將消息添加到日志文件的消息格式版本
KAFKA_CFG_MAX_MESSAGE_BYTES Kafka允許的最大記錄批大小。默認”1000000”
KAFKA_CFG_SEGMENT_BYTES 日志段文件的最大大小。當達到這個大小時,將創建一個新的日志段。默認”1073741824”
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR 對replica的數目進行配置,默認值為”1”,表示不對topic進行備份。如果配置為2,表示除了leader節點,對於topic里的每一個partition,都會有一個額外的備份。
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR Topic的offset的備份份數。建議設置更高的數字保證更高的可用性
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR 事務Topic的復制因子(設置得更高以確保可用性)
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR 事務Topic的副本數
KAFKA_CFG_NUM_IO_THREADS 用來處理請求的I/O線程的數目
KAFKA_CFG_NUM_NETWORK_THREADS 用來處理網絡請求的網絡線程數目
KAFKA_CFG_NUM_PARTITIONS 每個主題的日志分區的默認數量
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR 每個數據目錄中的線程數,用於在啟動時日志恢復,並在關閉時刷新。默認”1”
KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES SO_RCVBUFF緩存大小,server進行socket連接時所用
KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES 允許的最大請求尺寸,這將避免server溢出,它應該小於Java heap size,默認”104857600”
KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES Kafka追加消息的最大尺寸
KAFKA_CFG_ZOOKEEPER_CONNECT_TIMEOUT_MS 連接到zookeeper的超時時間(ms)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM