轉:https://yq.aliyun.com/articles/657849
技術原理
Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka為處理實時數據提供一個統一、高吞吐、低延遲的平台。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。此外,Kafka可以通過Kafka Connect連接到外部系統(用於數據輸入/輸出),並提供了Kafka Streams——一個Java流式處理庫 (計算機)。
Kafka是一個分布式的、高吞吐量、高可擴展性的消息系統。Kafka 基於發布/訂閱模式,通過消息解耦,使生產者和消費者異步交互,無需彼此等待。Ckafka 具有數據壓縮、同時支持離線和實時數據處理等優點,適用於日志壓縮收集、監控數據聚合等場景。
關鍵名詞:
- broker:kafka集群包含一個或者多個服務器,服務器就稱作broker
- producer:負責發布消息到broker
- consumer:消費者,從broker獲取消息
- topic:發布到kafka集群的消息類別。
- partition:每個topic划分為多個partition。
- group:每個partition分為多個group
架構示意圖
一個典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服務器日志等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集群。
Kafka通過Zookeeper管理Kafka集群配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,因為consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。
Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。
一個典型的Cloud Kafka集群如上所示。其中的生產者Producer可能是網頁活動產生的消息、或是服務日志等信息。生產者通過push模式將消息發布到Cloud Kafka的Broker集群,消費者通過pull模式從broker中消費消息。消費者Consumer被划分為若干個Consumer Group,此外,集群通過Zookeeper管理集群配置,進行leader選舉,故障容錯等。
kafka特點:
- 它是一個處理流式數據的”發布-訂閱“消息系統。
- 實時高效處理流式數據:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
- 將數據安全存儲在分布式集群。
- 它是運行在集群上的。
- 它將流式記錄存儲在topics中。
- 每個record由key, value和timestamp組成。
Docker搭建
參考:https://github.com/wurstmeister/kafka-docker
docker-compose.yml如下:
version: '2' services: zookeeper: image: wurstmeister/zookeeper volumes: - ./data:/data ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: xxx.xxx.xxx.xxx KAFKA_MESSAGE_MAX_BYTES: 2000000 KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - ./kafka-logs:/kafka - /var/run/docker.sock:/var/run/docker.sock kafka-manager: image: sheepkiller/kafka-manager ports: - 9020:9000 environment: ZK_HOSTS: zookeeper:2181
參數說明:
- KAFKA_ADVERTISED_HOST_NAME:Docker宿主機IP(如果你要配置多個brokers,就不能設置為 localhost 或 127.0.0.1)
- KAFKA_MESSAGE_MAX_BYTES:kafka(message.max.bytes) 會接收單個消息size的最大限制,默認值為1000000 , ≈1M
- KAFKA_CREATE_TOPICS:初始創建的topics,可以不設置
- 環境變量./kafka-logs為防止容器銷毀時消息數據丟失。
- 容器kafka-manager為yahoo出可視化kafka WEB管理平台。
操作命令:
# 啟動: $ docker-compose up -d # 增加更多Broker: $ docker-compose scale kafka=3 # 合並: $ docker-compose up --scale kafka=3
Kakfa使用
1,Kafka管理節點
2,主題
environment: KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact"
Topic1有1個Partition和3個replicas, Topic2有2個Partition,1個replica和cleanup.policy為compact。
Topic 1 will have 1 partition and 3 replicas, Topic 2 will have 1 partition, 1 replica and a cleanup.policy set to compact.
3,讀寫驗證
讀寫驗證的方法有很多,這里我們用kafka容器自帶的工具來驗證,首先進入到kafka容器的交互模式:
docker exec -it kafka_kafka_1 /bin/bash
創建一個主題:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.31.84:2181 --replication-factor 1 --partitions 1 --topic my-test
查看剛創建的主題:
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.31.84:2181
發送消息:
/opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.31.84:9092 --topic my-test This is a message This is another message
讀取消息:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.84:9092 --topic my-test --from-beginning
使用場景
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm