一、部署kafka集群
啟動zookeeper服務:
zkServer.sh start
修改配置文件config/server.properties
#broker 的全局唯一編號,不能重復
broker.id=0
#刪除 topic 功能使能
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤 IO 的現成數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka 運行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic 在當前 broker 上的分區個數
num.partitions=1
#用來恢復和清理 data 下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接 Zookeeper 集群地址
zookeeper.connect=localhost:2181
配置環境變量
vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
啟動kafka服務:
cd /usr/local/kafka/
nohup bin/kafka-server-start.sh config/server.properties &
創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.12:2181,192.168.1.12:2181,192.168.1.14:2181 --replication-factor 1 --partitions 1 --topic mmc
查看topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
查看topic詳情
bin/kafka-topics.sh --describe --topic mmc --zookeeper localhost:2181
產生消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
接收消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
二、Kafka架構
kafka沒有實現JMS協議,但其消費組可以像點對點模型一樣讓消息被一組進程處理,同時也可以像發布/訂閱模式一樣,讓你發送廣播消息到多個消費組。
簡單來說:一個消費組就是點對點,多個消費組就能實現發布、訂閱。
一個Topic可以有多個分區,每個分區是一個有序的,不可變的消息序列。新的消息不斷追加,同時分區會給每個消息記錄分配一個順序ID號 – 偏移量。盡管記錄被消費了,也不會馬上刪除,只是移動偏移量,Kafka會有可配置的保留策略刪除(默認7天)。
Kafka只保證一個分區內的消息有序,不能保證一個主題的不同分區之間的消息有序。但是,如果你想要保證所有的消息都絕對有序可以只為一個主題分配一個分區,雖然這將意味着每個消費群同時只能有一個消費進程在消費。
分區策略
生產者發送消息后會進入哪個分區?
- 用戶可以指定消息的分區
- 也可以指定key,系統根據key的hash值取模得到分區
- 如果用戶不指定分區,也不聲明key,那么系統會自動生成key,並根據自動生成的key進行hash之后取模然后算出分區
2.2 存儲架構
- 將一個topic的多個parition大文件分為多個小文件段(segment)存儲。segment文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示索引文件和數據文件
- 通過索引文件可以快速定位到message和確定response的最大大小。
00000000000000170410.log這個文件記錄了第170411到~(下一個log文件編號)的消息。圖中第三條消息對應的是348,也就是說在log文件中,第三條消息的偏移量是348.
三、保證數據可靠性
3.1 副本同步策略
一般有兩種方案,Kafka選擇了第二種。
方案 | 優點 | 缺點 |
---|---|---|
半數以上完成同步,發送ack | 延遲低 | 選取新的leader時,容忍n個節點故障時,必須要有2n+1個副本 |
全數以上完成同步,發送ack | 選取新的leader時,容忍n個節點故障時,需要n+1個副本 | 延遲高 |
3.2 ISR
ISR(In-sync replica set)意為與leader保持同步的follower集合。當ISR中的follower完成與leader的數據同步時,向生產者發送ask。如果在規定的時間內(replica.lag.time.max.ms 此參數設定)follower未同步數據,則踢出ISR。leader出現故障后,就在ISR隊列里選舉。
3.3 ack應答機制
通過設置request.required.acks應答來保證。有如下三種設置方式
- 1(默認):代表producer在ISR中的leader成功接收到數據並確認時,繼續發送下一條數據。如果leader宕機,則丟失數據
- 0:無需確認則直接發送下一條,可靠性最低
- -1:等待producer在ISR中的所有follower確認再發送下一條。此時消息副本數越多則可靠性越高。
3.4 故障處理細節
LEO:每個副本最大的offset
HW:消費者能見到的最大的offet,ISR中最小的LEO
(1)follower故障時
follower發生故障后會被臨時踢出ISR,待該follower重啟后,follower會讀取本地磁盤記錄的上次的HW,然后將他log文件中高於HW的部分截掉,然后從leader開始同步,直到該follower的LEO大於或等於該Partition的HW,就可以重新加入ISR。
(2)leader故障時
leader發生故障后,會重新選取一個leader,為了保證多個副本的數據一致性,其余的follower會將高於HW的部分截掉,然后從新leader那里同步
Exactly Once 語義
當ack設置為-1時,可以保證producer到Server之間不丟數據,即至少一次
而ack設置為0,則以保證消息至多一次。
而對於某些非常重要的消息,要保證既不丟失又不重復,即Exactly Once語義。在 0.11 版本以前的 Kafka是做不到的。0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指Producer不論向 Server發送多少次重復數據,Server 端都只會持久化一條。冪等性結合 At Least Once 語
義,就構成了 Kafka 的 Exactly Once 語義。即:
At Least Once + 冪等性 = Exactly Once
要啟用冪等性,將enable.idompotence 設置為 true 即可
實現方式;開啟冪等性的Producer在初始化的時候會分配一個PID,發往同一個Partition的消息會附帶Sequence Number。而Broker端會對<PID,Partition,SeqNumber>做緩存,當相同主鍵的消息提交時,只會持久化一條。但是PID重啟就會發生變化,不同的Partition也具有不同的主鍵,所以他的冪等性無法保證跨分區跨會話。
四、消費者
4.1 消費方式
消息是采用的pull的方式,pull方式的不足之處是如果沒有數據,會造成空輪詢,針對這一點,Kafka的消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可供消費,consumer會等待一段時間之后再返回,這段時長即為 timeout。
4.2 分區分配策略
Kafka消息消費的時候,有兩種策略,RoundRobin和Range
4.3 消費者Offset位置保存
Offset是以消費者組+Topic+Partatition為key來保存的。
0.9版本前保存在Zookeeper里
0.9版本之后保存在Kafka內置的一個Topic中,該topic為__consumer_offsets
5.1 Kafka高效讀寫數據
- 順序寫磁盤
順序寫可達到600M/s,而隨機寫只有100K/s
2. 零拷貝技術
5.2 Kafka事務
Kafka在0.11版本后引入了事務支持。事務可以保證消息正好一次語義的基礎上,生產和消費可以跨分區和會話。
為了實現跨分區跨會話的事務,需要引入一個全局唯一的 Transaction ID,並將 Producer獲得的PID和Transaction ID 綁定。這樣當Producer 重啟后就可以通過正在進行的Transaction ID 獲得原來的 PID。
六、Kafka API
6.1 Producer API
消息發送過程
代碼示例
package com.mmc.springbootstudy.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @description:
* @author: mmc
* @create: 2021-04-22 21:22
**/
public class ProductDemo {
public final static String TOPIC = "mmc";
/**
* 不帶回調的發送API
*/
public void send() {
Properties props = new Properties();
props.put("bootstrap.servers", "49.234.77.60:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
String key="test";
String value="我是一個小紅花222";
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(TOPIC,key,value));
producer.close();
}
/**
* 帶回調的發送
*/
public void callSend() {
Properties props = new Properties();
props.put("bootstrap.servers", "49.234.77.60:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
String key="test";
String value="我是一個小紅花222";
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(TOPIC, key, value), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("發送成功");
}else {
e.printStackTrace();
}
}
});
producer.close();
}
/**
* 同步發送API
* 一條消息發送之后,會阻塞當前線程,直至返回 ack。
*/
public void syncSend() throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "49.234.77.60:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
String key="test";
String value="我是一個小紅花222";
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(TOPIC, key, value)).get();
System.out.println("----------recordMetadata:"+recordMetadata);
producer.close();
}
public static void main( String[] args ) throws ExecutionException, InterruptedException {
// new ProductDemo().send();
new ProductDemo().syncSend();
}
}
KafkaProducer 對象是比較重的,並且他是線程安全的,所以可以全局都用同一個對象去發消息。
6.2 Consumer API
消費者消費的時候有區分自動提交、手動同步提交和手動異步提交。手動同步提交會阻塞當前線程直到成功提交,並有失敗重試。而異步手動提交沒有失敗重試。
不管是同步提交還是異步提交,都會可能造成數據漏消費和重復消費。如果先提交offset后消費,有可能導致數據漏消費,如果先消費后提交offset就有可能導致數據重復消費。
package com.mmc.springbootstudy.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/**
* @description:
* @author: mmc
* @create: 2021-04-22 21:32
**/
public class ConsumerDemo {
public final static String TOPIC = "mmc";
/**
* 自動提交offset
* @throws InterruptedException
*/
void receive() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "49.234.77.60:9092");
props.put("group.id", "group_id");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 1000);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true){
ConsumerRecords<String, String> msgList=consumer.poll(1000);
for (ConsumerRecord<String,String> record:msgList){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
/**
* 手動同步提交
* @throws InterruptedException
*/
void commitSyncReceive() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "49.234.77.60:9092");
props.put("group.id", "group_id");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 1000);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true){
ConsumerRecords<String, String> msgList=consumer.poll(1000);
for (ConsumerRecord<String,String> record:msgList){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//同步提交,當前線程會阻塞直到 offset 提交成功
consumer.commitSync();
}
}
/**
* 手動異步提交
* @throws InterruptedException
*/
void commitAsyncReceive() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "49.234.77.60:9092");
props.put("group.id", "group_id");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 1000);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true){
ConsumerRecords<String, String> msgList=consumer.poll(1000);
for (ConsumerRecord<String,String> record:msgList){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.err.println("commit failed for "+map);
}
}
});
}
}
public static void main(String[] args) throws InterruptedException {
// new ConsumerDemo().receive();
new ConsumerDemo().commitSyncReceive();
}
}
6.3 自定義分區
實現Partitioner接口,並在配置中加入
props.put("partitioner.class", "com.mmc.springbootstudy.kafka.MyPartition");
自定義分區實現類:
package com.mmc.springbootstudy.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @description:
* @author: mmc
* @create: 2021-04-27 20:41
**/
public class MyPartition implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
自定義存儲offset
consumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}
});
6.4 自定義攔截器
package com.mmc.springbootstudy.kafka;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* @description:
* @author: mmc
* @create: 2021-04-30 20:47
**/
public class CountIntercepter implements ProducerInterceptor<String,String> {
private int successCount=0;
private int failCount=0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
System.out.println("攔截到消息的分區:"+producerRecord.topic());
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if(e==null){
successCount++;
}else {
failCount++;
}
}
@Override
public void close() {
System.out.println("success count:"+successCount);
}
@Override
public void configure(Map<String, ?> map) {
}
}
在生產者中需要加入
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.mmc.springbootstudy.kafka.CountIntercepter");
七、第三方擴展
7.1 Kafka Eagle 監控
八、面試題
- Kafka 中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?
答: kafka中與leader副本保持一定同步程度的副本(包括leader)組成ISR。與leader滯后太多的副本組成OSR。分區中所有的副本通稱為AR。
- Kafka 中的HW、LEO等分別代表什么?
答:HW:高水位,指消費者只能拉取到這個offset之前的數據
LEO:標識當前日志文件中下一條待寫入的消息的offset,大小等於當前日志文件最后一條消息的offset+1.
- Kafka 中是怎么體現消息順序性的?
生產者:向leader副本負責消息的順序寫入
消費者:同一個分區只能被同一個消費者組中的一個消費者消費。
kafka只保證同一個分區的順序性,所以如果是想保證全局順序,可以自定義分區策略,將關聯的消息發到同一個分區。如同一個訂單的各個狀態。
4. Kafka生產者客戶端的結構
答:整個生產者客戶端主要有兩個線程,主線程以及Sender線程。Producer在主線程中產生消息,然后通過攔截器,序列化器,分區器之后緩存到消息累加器RecordAccumulator中。Sender線程從RecordAccumulator中獲取消息並發送到kafka中。RecordAccumulator主要用來緩存消息,這樣發送的時候進行批量發送以便減少相應的網絡傳輸。RecordAccumulator緩存的大小可以通過配置參數buffer.memory配置,默認是32M。如果創建消息的速度過快,超過sender發送給kafka服務器的速度,會導致緩存空間不足,這個時候sender線程可能會阻塞或者拋出異常,max.block.ms配置決定阻塞的最大時間。
RecordAccumulator中為每個分區維護了一個雙端隊列,隊列中的內容是ProducerBatch,即Deque
- 分區策略有哪些?
答:有兩種,一種是 RangeAssignor 分配策略(范圍分區),另一種是RoundRobinAssignor分配策略(輪詢分區)。默認采用 Range 范圍分區。
Range策略:
如有10個分區,3個消費者,那么通過10/3=3算出一個消費者消費3個分區。多出的分區由排在前面的消費者消費。那么消費者1消費0,1,2,3分區。消費者2消費4,5,6分區。消費者3消費7,8,9分區。
缺點就是前面的消費者就會多消費到一個分區,如果是多個topic,那么這個消費者就會多消費到多個分區。
RandRobin策略:同樣的例子,分區0被消費者1消費,分區1被消費者2消費,分區2被消費者3消費
注意:這種策略需要一個組內的消費者訂閱的主題相同。這樣輪詢的時候才是均勻的。
當出現以下幾種情況時,Kafka 會進行一次分區分配操作,即 Kafka 消費者端的 Rebalance 操作
- 同一個 consumer 消費者組 group.id 中,新增了消費者進來,會執行 Rebalance 操作
- 消費者離開當期所屬的 consumer group組。比如宕機
- 分區數量發生變化時(即 topic 的分區數量發生變化時)
- 消費者主動取消訂閱
- kafka中哪些地方會選舉?
答:BrokerController:
在broker啟動的時候,都會創建BrokerController,第一個在zookeeper中創建指定臨時節點成功的那個節點就是BrokerController。他負責管理集群 broker的上下線,所有topic的分區副本分配和 leader 選舉等工作。
Partition Leader:
- 從Zookeeper中讀取當前分區的所有ISR(in-sync replicas)集合
- 調用配置的分區選擇算法選擇分區的leader
- 分區數能新增或減少嗎?
答:能新增,不能減少。因為減少的話,分區內已有的數據不好處理。