KafKa簡介和利用docker配置kafka集群及開發環境


KafKa的基本認識,寫的很好的一篇博客:https://www.cnblogs.com/sujing/p/10960832.html

問題:
1、kafka是什么?
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據,具有高性能、持久化、多副本備份、橫向擴展能力。

2、kafka的工作原理[去耦合]
Kafka采用的是訂閱-發布的模式,消費者主動的去kafka集群拉取消息,與producer相同的是,消費者在拉取消息的時候也是找leader去拉取。

3、kafka存在的意義:去耦合、異步、中間件的消息系統。

  • kafka節點之間如何復制備份的?
  • kafka消息是否會丟失?為什么?[ACK機制]
  • kafka最合理的配置是什么?
  • kafka的leader選舉機制是什么?
  • kafka對硬件的配置有什么要求?
  • kafka的消息保證有幾種方式?
  • kafka為什么會丟消息? 

1 Kafka簡介

Kafka:是一個高吞吐量、分布式的發布-訂閱消息系統。kafka是一款開源的、輕量級的、分布式、可分區和具有復制備份[Replicated]的、基於Zookeeper協調管理的分布式流平台的功能強大的消息系統。

1.1 Kafka特性

  • 能夠允許發布和訂閱流數據
  • 存儲流數據時提供相應的容錯機制
  • 當流數據到達時能夠被及時處理[近乎實時性的消息處理能力,可以高效地存儲消息和查詢消息]

1.2 Kafka消息系統最基本的體系結構[kafka的工作模式]

1.3 Kafka生態系統

Kafka Core API:

 Kafka Extended API:

 

1.4 Kafka基本概念[核心概念]

  • Topic& 分區 &Log:Topic 是用於存儲消息的邏輯概念,可以看作一個消息集合。每個 Topic 可以有多個生產者向其中推送(push)消息,也可以有任意多個消費者消費其中的消息。每個分區[對應一個磁盤的文件夾]由一系列有序、不可變的消息組成,是一個有序隊列。Log 由多個 Segment 組成,每個 Segment 對應一個日志文件和索引文件。每個分區在物理上對應為一個文件夾,分區的命名規則為主題名稱后接“—”連接符,之 后再接分區編號,分區編號從 0 開始,編號最大值為分區的總數減 1。日志段:一個日志又被划分為多個日志段(LogSegment)[邏輯概念],日志段是 Kafka 日志對象分片的最小單位。一個日志段對應磁盤上一個具體日志文件和 兩個索引文件。日志文件是以“.log”為文件名后綴的數據文件,用於保存消息實際數據。兩個索引文件分別以“.index”和“.timeindex”作為文件名后綴,分別表示消息偏移量索引文件和消息時間戳索引文件。

Topic與Partition之間的關系:

 

  • 消息:Kafka通信的基本單位,消息由一串字節構成,其中主要由 key 和 value 構成,key 和 value 也都是 byte 數組。key的主要作用是根據一定的策略,將此消息路由到指定的分區中,這樣就可以保證包含同一 key 的消息全部寫入同一分區中,key 可以是 null。消息的真正有效負載是 value 部分的數據。
  • 副本:每個 Partition 可以有多個副本,每個副本中包含的消息是一樣的。每個分區至少有一個副本,當分區中只有一個副本時,就只有 Leader 副本,沒有 Follower 副本。所有的讀寫請求都由選舉出 的 Leader 副本處理,其他都作為 Follower 副本,Follower 副本僅僅是從 Leader 副本處把數據拉取到本地之后,同步更新到自己的 Log 中。Kafka 提供兩種刪除老數據的策略, 一是基於消息已存儲的時間長度,二是基於分區的大小。

  • 偏移量:任何發布到分區的消息會被直接追加到日志文件(分區目錄下以“.log”為文件名后綴的 數據文件)的尾部,而每條消息在日志文件中的位置都會對應一個按序遞增的偏移量。
  • 代理:每一個Kafka實例稱為代理(Broker),也稱為kafka服務器。Kafka集群一般包含一台或多台服務器,可以在一台服務器上配置一個或多個代理。

 

代理和主題之間的關系:

  • 生產者:將消息發給代理,也就是向Kafka代理發送消息的客戶端。例如:生產者將數據發送到主題。[key-value的形式寫入] 

  • 消費者和消費組:消費者(Comsumer)以拉取 (pull)方式拉取數據,它是消費的客戶端。在 Kafka 中每一 個消費者都屬於一個特定消費組(ConsumerGroup),我們可以為每個消費者指定一個消費組, 以 groupId 代表消費組名稱,通過 group.id 配置設置。

Consumer group 和topic的聯動方式:比如這里的topic1有3個分區,Consumer Group 0中有2個Consumer,Consumer 1 拉取分區0和分區1的數據,Consumer 2拉取分區2的數據。

  • ISR:Kafka 在 ZooKeeper 中動態維護了一個 ISR(In-sync Replica),即保存同步的副本列表,該列表中保存的是與 Leader 副本保持消息同步的所有副本對應的代理節點id。ISR(In-Sync Replica)集合表示的是目前“可用”(alive)且消息量與 Leader 相差不多的副本集合,這是整個副本集合的一 個子集。
  • ZooKeeper[選舉算法]:Kafka 利用 ZooKeeper 保存相應元數據信息,Kafka元數據信息包括如代理節點信息、Kafka 集群信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方 案信息、動態配置信息等。

1.5 Kafka集群架構

根據業務邏輯產生消息,在根據路由規則將消息發送到指定分區的Leader副本所在的Broker上。

1.6 Kafka設計概述

  • 動機:統一、實時處理大規模數據的平台。[類似數據庫日志系統]
    • (1)具有高吞吐量來支持諸如實時的日志集這樣的大規模事件流。
    • (2)能夠很好地處理大量積壓的數據,以便能夠周期性地加載離線數據進行處理。
    • (3)能夠低延遲地處理傳統消息應用場景。
    • (4)能夠支持分區、分布式,實時地處理消息,同時具有容錯保障機制。
  • 特性:消息持久化、高吞吐量、擴展性、多客戶端支持、Kafka Streams、安全機制、數據備份、輕量級、消息壓縮。
    • (1)消息持久化:Kafka 高度依賴於文件系統來存儲和緩存消息。
    • (2)高吞吐量:Kafka 將數據寫到磁盤,充分利用磁盤的順序讀寫。 同時,Kafka 在數據寫入及數據同步采用了零拷貝(zero-copy)技術,完全在內核中操作,從而避免了內核緩沖區與用戶緩沖區之間數據的拷貝,操作效率極高。Kafka 還支持數據壓縮及批量發送,同時 Kafka 將每個主題划分為多個分區。
    • (3)擴展性:集群能夠自動感知,重新進行負責均衡及數據復制。
  • 應用場景:
    • (1)消息系統。[在應用系統中可以將kafka作為傳統的消息中間件,實現消息隊列和消息的發布/訂閱]
    • (2)應用監控。
    • (3)網站用戶行為追蹤。[用作日志收集中心,多個系統產生的日志統一收集到Kafka中,然后由數據分析平台進行統一處理]
    • (4)流處理。
    • (5)持久性日志。[Kafka 可以為外部系統提供一種持久性日志的分布式系統。日志可以在多個節點間進行備份,Kafka為故障節點數據恢復提供了一種重新同步的機制。]
    • (6)kafka用作系統中的數據總線,將其接入多個子系統中,子系統會將產生的數據發送到kafka中保存,之后流轉到目的系統中。

 2 配置Kafka集群及開發環境

  1. 在linux中安裝並配置docker並啟動kafka開發環境
  2. 在kafka環境中對topic進行create、delete、list、descr
  3. 在kafka環境命令行中producer向topic中推送數據
  4. 在kafka環境命令行中consumer向topic中拉取數據

2.1 在linux中安裝並配置docker並啟動kafka開發環境

  • 1、ubuntu docker安裝[使用官方安裝腳本自動安裝]:https://www.runoob.com/docker/ubuntu-docker-install.html
  • 2、如何使用docker在本地建立一個kafka的開發環境:在github官網(https://github.com)中搜索lensesio/fast-data-dev
  • 3、在瀏覽器中打開網址:http://localhost:3030/,看到如下界面,表示kafka開發環境啟動成功。
#安裝curl,目的是使用官網安裝docker[linux安裝時提示沒有該命令,按需安裝]
apt install curl

#在linux[查看版本使用命令:uname -r]下安裝docker
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

#啟動kafka開發環境
docker run --rm -it \
    -p 2181:2181 -p 3030:3030 -p 8081:8081 \
    -p 8082:8082 -p 8083:8083 -p 9092:9092 \
    -e ADV_HOST=127.0.0.1 \
    lensesio/fast-data-dev 

前提是docker已安裝成功:

 

 

 

 

2.2 在kafka環境中對topic進行create、delete、list、descr

#登錄到kafka,進行kafca操作
docker run --rm -it --net=host lensesio/fast-data-dev bash

#topic對應的參數名稱顯示
kafka-topics

#創建first-topic
kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1

kafka-topics --zookeeper 127.0.0.1:2181 --create --topic second_topic --partitions 3 --replication-factor 1

#列表顯示對應的topic
kafka-topics --zookeeper 127.0.0.1:2181 --list

#刪除topic
kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic first_topic 

#查看存在的topic
kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic second_topic 

2.3 在kafka環境命令行中producer向topic中推送數據

#登錄到kafka,進行kafca操作
docker run --rm -it --net=host lensesio/fast-data-dev bash

#用console producer向topic中推送數據
#producer對應的參數名稱顯示
kafka-console-producer

#推送數據hi、hello、today、nice、kafka
[未指定key,value隨機指派給分區] kafka
-console-producer --broker-list 127.0.0.1:9092 --topic first_topic >hi >hello [指定key,value指派給對應的分區] kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property "parse.key=true" \
--property "key.separator=:"

2.4 在kafka環境命令行中consumer向topic中拉取數據

#登錄到kafka,進行kafca操作
docker run --rm -it --net=host lensesio/fast-data-dev bash

#用console consumer向topic中拉取數據,下列命令可以列出所有需要使用的參數
kafka-console-consumer

#創建consumer,從當前位置開始讀取
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic 

#創建consumer,從起始位置開始讀取,my-group-1中的一員
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic \
--consumer-property group.id=my-group-1 --from-beginning #創建consumer,從起始位置開始讀取 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM