Docker實踐:部署Kafka


轉:https://yq.aliyun.com/articles/657849

技術原理

Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka為處理實時數據提供一個統一、高吞吐、低延遲的平台。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。此外,Kafka可以通過Kafka Connect連接到外部系統(用於數據輸入/輸出),並提供了Kafka Streams——一個Java流式處理庫 (計算機)。

Kafka是一個分布式的、高吞吐量、高可擴展性的消息系統。Kafka 基於發布/訂閱模式,通過消息解耦,使生產者和消費者異步交互,無需彼此等待。Ckafka 具有數據壓縮、同時支持離線和實時數據處理等優點,適用於日志壓縮收集、監控數據聚合等場景。

關鍵名詞:

  • broker:kafka集群包含一個或者多個服務器,服務器就稱作broker
  • producer:負責發布消息到broker
  • consumer:消費者,從broker獲取消息
  • topic:發布到kafka集群的消息類別。
  • partition:每個topic划分為多個partition。
  • group:每個partition分為多個group

架構示意圖

一個典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服務器日志等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集群。

Kafka通過Zookeeper管理Kafka集群配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,因為consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。

Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。

一個典型的Cloud Kafka集群如上所示。其中的生產者Producer可能是網頁活動產生的消息、或是服務日志等信息。生產者通過push模式將消息發布到Cloud Kafka的Broker集群,消費者通過pull模式從broker中消費消息。消費者Consumer被划分為若干個Consumer Group,此外,集群通過Zookeeper管理集群配置,進行leader選舉,故障容錯等。

kafka特點:

  • 它是一個處理流式數據的”發布-訂閱“消息系統。
  • 實時高效處理流式數據:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
  • 將數據安全存儲在分布式集群。
  • 它是運行在集群上的。
  • 它將流式記錄存儲在topics中。
  • 每個record由key, value和timestamp組成。

Docker搭建

參考:https://github.com/wurstmeister/kafka-docker

docker-compose.yml如下:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - "2181:2181"
       
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: xxx.xxx.xxx.xxx
      KAFKA_MESSAGE_MAX_BYTES: 2000000
      KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - ./kafka-logs:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
 
  kafka-manager:
    image: sheepkiller/kafka-manager
    ports:
      - 9020:9000
    environment:
      ZK_HOSTS: zookeeper:2181
 

參數說明:

  • KAFKA_ADVERTISED_HOST_NAME:Docker宿主機IP(如果你要配置多個brokers,就不能設置為 localhost 或 127.0.0.1)
  • KAFKA_MESSAGE_MAX_BYTES:kafka(message.max.bytes) 會接收單個消息size的最大限制,默認值為1000000 , ≈1M
  • KAFKA_CREATE_TOPICS:初始創建的topics,可以不設置
  • 環境變量./kafka-logs為防止容器銷毀時消息數據丟失。
  • 容器kafka-manager為yahoo出可視化kafka WEB管理平台。

操作命令:

# 啟動:
$ docker-compose up -d
 
# 增加更多Broker:
$ docker-compose scale kafka=3
 
# 合並:
$ docker-compose up --scale kafka=3

Kakfa使用

1,Kafka管理節點

2,主題

environment: KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact"

Topic1有1個Partition和3個replicas, Topic2有2個Partition,1個replica和cleanup.policy為compact。

Topic 1 will have 1 partition and 3 replicas, Topic 2 will have 1 partition, 1 replica and a cleanup.policy set to compact.

3,讀寫驗證

讀寫驗證的方法有很多,這里我們用kafka容器自帶的工具來驗證,首先進入到kafka容器的交互模式:

docker exec -it kafka_kafka_1 /bin/bash

創建一個主題:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.31.84:2181 --replication-factor 1 --partitions 1 --topic my-test

查看剛創建的主題:

/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.31.84:2181

發送消息:

/opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.31.84:9092 --topic my-test This is a message This is another message

讀取消息:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.84:9092 --topic my-test --from-beginning

使用場景

  • 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。

  • 消息系統:解耦和生產者和消費者、緩存消息等。
  • 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
  • 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
  • 流式處理:比如spark streaming和storm
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM