一、 簡介
Kafka是用scala語言編寫,由Linkedin公司於2010年貢獻給Apache成為一個開源的消息系統,它主要用於處理活躍的流式數據。遵從一般的MQ結構。Kafka對消息保存時根據Topic進行歸類,此外kafka集群有多個kafka實例組成,每個實例(server)稱為broker。Kafka是依賴於zookeeper集群保存一些meta信息,來保證系統可用性。
注意:官方在Kafka 3.0版本中宣布下一次的升級將棄用java8,選擇 Java 11 作為最低支持版本。
二、 應用場景
三、 消息傳輸流程
kafka集群的負載均衡:
1.一個partition只能被一個消費者消費,一個消費者可以消費多個partition,所以如果設置的partition數量小於consumer的數量,就會導致空閑的consumer沒有消費,所以partition的數量一定要大於或等於consumer的數量。
( 注意:topic有多個分區,才能實現多個consumer消費一個topic。)
2.多副本冗余的高可用機制,比如Partition1有一個副本是Leader,另外一個副本是Follower,Leader和Follower兩個副本是分布在不同機器上的。
四、 API
- 基礎API:
- Producer :消息生產者,決定消息發送到指定Topic的哪個分區上。
- Consumer :消息消費者,向 Kafka broker 取消息的客戶端;
- Consumer Group(CG):消費者組,由多個 consumer 組成。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
- Broker :經紀人,一台Kafka服務器就是一個broker。一個集群由多個broker 組成。一個broker可以容納多個topic。
- Topic :主題,可以理解為一個隊列,生產者和消費者面向的都是一個 topic。通俗來講的話,就是放置“消息”的地方,是消息投遞的一個容器。假如把消息看作是信封的話,那么 Topic 就是一個郵箱 。
- Partition:為了實現水平擴展和負載均衡,一個非常大的 topic 可以分布到多個 broker(即服務器)上,每台服務器上又可以分為多個 partition,每個 partition 是一個有序的隊列;
- Replica:副本(Replication),為保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失,且 Kafka仍然能夠繼續工作,Kafka 提供了副本機制,一個 topic 的每個分區都有若干個副本,一個 leader 和若干個 follower。
- Leader:每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是 leader。
- Follower:每個分區多個副本中的“從”,實時從 leader 中同步數據,保持和 leader 數據的同步。 leader 發生故障時,某個 Follower 會成為新的 leader。
擴展API:
- Consumer Group:消費者組是 Kafka 提供的可擴展且具有容錯性的消費者機 制。組內可以有多個消費者,共享一個公共ID,這個ID 被稱為 Group ID。組內的所有消費者協調在一起來消費訂閱主題(Topics)的所有分區(Partition)。當然,每個分區只能由同一個消費者組內的一個 Consumer 實例來消費。(新版本的Consumer Group將位移保存在Broker端的內部主題中)
三大特性:
1. 組內可以有一個或多個 Consumer 實例。這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。在實際場景中,使用進程更為常見一些。
2. Group ID 是一個字符串,在一個 Kafka 集群中,它標識唯一的一個 Consumer Group。
3. 組內所有主題的單個分區,只能分配給組內的某個 Consumer 實例消費。這個分區當然也可以被其他的 Group 消費。
兩大模型:
Kafka僅僅使用Consumer Group這一種機制,卻同時實現了傳統消息引擎系統的兩大模型:
-
- 如果所有消費者實例都屬於同一個Group,那么它實現的是消息隊列模型; (隊列的處理方式是一組消費者從服務器讀取消息,一條消息只由其中的一個消費者來處理。)
2. 如果所有消費者實例分別屬於不同的Group,那么它實現的就是發布/訂閱模型; (發布-訂閱模型中,消息被廣播給所有的消費者,接收到消息的消費者都可以處理此消息。)
2. KRaft模式: kafka之前一直使用Zookeeper來進行所有Broker的管理,每個Broker服務器啟動時都會到連接到Zookeeper注冊,創建brokers節點,寫入IP,端口等信息。當Broker發生狀態變化,比 如下線,對應Broker節點也就被刪除。2.8版本以后使用 Raft 模式,可以不再依賴Zookeeper,將之前存放在Zookeeper的元數據(元數據將被視為日志)、配置信息都會保存在 @metadata 這個 Topic 中,自動在kafka集群中復制。這樣 Kafka 就會簡單輕巧很多。
五、 代碼實例
1.配置kafka.properties:
- 封裝的生產者:
/**
* kafka封裝生產者發送消息
* @param reqDTO 入參:1. topic(主題名)
* 2. mesgKey(消息key)
* 3. mesgData(消息體)
*/
public KafkaResDTO ProviderService(KafkaReqDTO reqDTO) throws Exception {
KafkaResDTO respDTO = new KafkaResDTO();
if (logger.isDebugEnabled()) {
logger.debug(">>>>kafka發送消息入參->{}", reqDTO);
}
if (reqDTO == null) {
logger.error("請求參數為空,kafka消息發送失敗。");
respDTO.setRespMesg("請求參數為空,消息發送失敗。");
respDTO.setRespCode("500");
return respDTO;
} else {
if (StringUtils.isBlank(reqDTO.getTopic())) {
logger.error("主題為空,kafka消息發送失敗。");
respDTO.setRespMesg("主題為空,kafka消息發送失敗。");
respDTO.setRespCode("500");
return respDTO;
} else {
try {
kafkaTemplate.send(reqDTO.getTopic(),JSONUtil.toJsonStr(reqDTO.getMesgKey()),JSONUtil.toJsonStr(reqDTO.getMesgData()));
logger.info("kafka消息發送成功:{}", reqDTO);
respDTO.setRespCode("200");
respDTO.setRespMesg("kafka消息發送成功");
} catch (Exception e) {
//e.printStackTrace();
logger.error("kafka發送消息失敗:{}", e.getMessage());
respDTO.setRespMesg("kafka發送消息失敗:{}" + e);
respDTO.setRespCode("500");
}
}
}
return respDTO;
}
- 消費者(需要引入該項目依賴:opp-code-kafka-project)
@KafkaListener(topics = {"t3"},groupId = "group1",containerFactory="kafkaListenerContainerFactory")
public void m1(ConsumerRecord<String, String> record){
try {
System.out.println("消費key和value "+record.key()+" : "+record.value());
//ack.acknowledge();
logger.info("kafka消息消費成功:{}",record);
} catch (Exception e) {
//e.printStackTrace();
logger.error("kafka消息消費失敗:{}",record,e.getMessage());
}
}
六、 消息確認機制
1.生產者:ack確認機制
2.消費者:
1.自動提交確認
2.手動提交確認
#禁止自動提交
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
#手動確認消息
spring.kafka.listener.ack-mode=manual_immediate
# 批量一次最大拉取數據量
max-poll-records: 3
# 自動提交時間間隔,這種直接拉到數據就提交 容易丟數據
auto-commit-interval: 2000
# 批量拉取間隔,要大於批量拉取數據的處理時間,時間間隔太小會有重復消費
max.poll.interval.ms: 5000
七、 重復消費問題
- 重復消費的原因:
1:消費者端宕機、重啟或者被強行kill進程,導致消費者消費的offset沒有提交。
2:設置enable.auto.commit為true,如果在關閉消費者進程之前,取消了消費者的訂閱,則有可能部分offset沒提交,下次重啟會重復消費。
3:客戶端消費超時被判定掛掉而消費者重新分配分區, 導致重復消費。
- 解決方法:
- 提高消費者的處理速度。例如:對消息處理中比較耗時的步驟可通過異步的方式進行處理、利用多線程處理等。在縮短單條消息消費的同時,根據實際場景可將max.poll.interval.ms值設置大一點,避免不必要的Rebalance。可根據實際消息速率適當調小max.poll.records的值。
- 引入消息去重機制。例如:生成消息時,在消息中加入唯一標識符如消息id等。在消費端,可以保存最近的max.poll.records條消息id到redis或mysql表中,這樣在消費消息時先通過查詢去重后,再進行消息的處理。
- 保證消費者邏輯冪等性。
八、 可視化工具(Kafka Tool)
- 下載地址:http://www.kafkatool.com/download.html
- 簡單使用(查看主題消息):
九、 linux集群部署
1.服務器
18.5.204.164
18.5.204.165
2.JDK
安裝環境所需jdk
3.zookeeper
1.使用的是穩定版:zookeeper-3.5.6-bin
官網下載鏈接:https://zookeeper.apache.org/releases.html
2.將zookeeper包解壓在 /home/fwzt/kafka/local文件夾下
3.cd zookeeper-3.5.6/conf/ 然后 cp zoo_sample.cfg zoo.cfg復制一份配置文件並重命名zoo.cfg
4.cd zookeeper-3.5.6/ 然后 新建data和zlog 兩個同級文件夾
5.vi zoo.cfg 進行編輯:
修改dataDir路徑: dataDir=/home/fwzt/kafka/local/zookeeper-3.5.6/data
修改日志路徑:dataLogDir=/home/fwzt/kafka/local/zookeeper-3.5.6/zlog
端口默認:clientPort=2181
配置兩台服務器集群: server.0=18.5.204.164:2888:3888
server.1=18.5.204.165:2888:3888
6.cd zookeeper-3.5.6/data 然后 vi myid 輸入0 保存(此處是創建serverID,此處的對應zoo.cfg文件里配置集群時在server.后面的那個0)
7.vi /etc/profile 進行編輯:
#zookeeper
export ZOOKEEPER_HOME=/home/fwzt/kafka/local/zookeeper-3.5.6
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf
8.執行 source /etc/profile使文件生效
9.其他集群服務器使用相同配置,只有第6步需要改變:vi myid 輸入的節點數字和本服務器節點數字對應。
10.啟動:sh zkServer.sh start
(先啟動的服務都會報錯 屬於正常,等最后一台啟動后,就都不會再報錯)
查看是否啟動:zkServer.sh status
此處為啟動成功(Mode: follower 為集群副本的從, Mode: leader為集群副本的主)
停止: sh zkServer.sh stop
4.Kafka
1.使用版本:kafka_2.12-2.4.0
官網下載鏈接:https://kafka.apache.org/downloads
2.將kafka_2.12-2.4.0包解壓在 /home/fwzt/kafka/local文件夾下
3.cd kafka_2.12-2.4.0/ 然后 mkdir logs
4.cd kafka_2.12-2.4.0/config/ 然后 vi server.properties 進行編輯:
修改:broker.id=0 (每台此處設置和zookeeper的myid相對應)
修改:listeners=PLAINTEXT://18.5.204.164:9092 (每台服務器使用各自的ip)
修改topic創建分區數:num.partitions=5
修改日志保留時間:log.retention.hours=72
修改日志路徑:log.dirs=/home/fwzt/kafka/local/kafka_2.12-2.4.0/logs
修改對應zookeeper的集群ip:zookeeper.connect=18.5.204.164:2181,18.5.204.165:2182
5.vi /etc/profile 進行編輯(覆蓋之前zookeeper配置):
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/home/fwzt/kafka/local/zookeeper-3.5.6
#kafka
export KAFKA_HOME=/home/fwzt/kafka/local/kafka_2.12-2.4.0
export PATH=$PATH:${KAFKA_HOME}/bin:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf
6.執行 source /etc/profile使文件生效
7.其他集群服務器使用相同配置,只需改動 broker.id 和 listeners
8.常規啟動:nohup bin/kafka-server-start.sh config/server.properties &
進程守護模式啟動:
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
查看是否啟動:ps -ef | grep kafka
關閉:bin/kafka-server-stop.sh
創建主題:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test
查看所有topic:
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2182
查看具體topic:
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2182 --topic Test
創建生產者:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
創建消費者:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
集群:
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 3 --topic my-test
bin\windows\kafka-topics.bat --zookeeper localhost:2181,localhost:2182,localhost:2183 --describe --topic my-test
bin\windows\kafka-console-producer.bat --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test1
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning
查詢是否有未消費的消息:
kafka的bin目錄下,執行命令:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group1
可以看到當前的消費進度(CURRENT-OFFSET)、消息進度(LOG-END-OFFSET)、落后量(LAG):
application.properties配置文件版本:
#====================================== kafka 生產者 ==============================================
# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 寫入失敗時,重試次數
spring.kafka.producer.retries=3
#指定創建信息nio-buffer緩沖區大小約1M
spring.kafka.producer.buffer-memory=1024000
#累計約1M條就發發送,必須小於緩沖區大小,否則報錯無法分配內存(減少IO次數,過大則延時高,瞬間IO大)
spring.kafka.producer.batch-size=1024000
#默認0ms立即發送,不修改則上兩條規則相當於無效(這個屬性時個map列表,producer的其它配置也配置在這里,詳細↑官網,這些配置會注入給KafkaProperties這個配置bean中,供#spring自動配置kafkaTemplate這個對象時使用)
#spring.kafka.producer.properties.linger.ms=1000
#acks=0 把消息發送到kafka就認為發送成功
#acks=1 把消息發送到kafka leader分區,並且寫入磁盤就認為發送成功
# acks=all 把消息發送到kafka leader分區,並且leader分區的副本follower對消息進行了同步就任務發送成功spring.kafka.producer.acks=1
#由於網絡傳輸過來的是byte[],生產者端key/value需要序列化(可轉為String和Byte數組兩種選擇)
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#====================================== kafka 消費者 ==============================================
#spring.kafka.bootstrap-servers=127.0.0.1:9092 本項目在生產者處已配置kafka的ip
#指定默認的消費組組名
spring.kafka.consumer.group-id: group-test
#自動偏移量: 1.earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# 2.latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# 3.none :topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
spring.kafka.consumer.auto-offset-reset: earliest
# 設置自動提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果設置自動提交offset,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
spring.kafka.consumer.auto-commit-interval=100
#由於網絡傳輸過來的是byte[],消費者端key/value需要反序列化(可轉為String和Byte數組兩種選擇)
#spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
消費者手動確認機制:
#禁止自動提交
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
#手動確認消息
spring.kafka.listener.ack-mode=manual_immediate
# 批量一次最大拉取數據量
max-poll-records: 3
# 自動提交時間間隔,這種直接拉到數據就提交 容易丟數據
auto-commit-interval: 2000
# 批量拉取間隔,要大於批量拉取數據的處理時間,時間間隔太小會有重復消費
max.poll.interval.ms: 5000