一、kafka 介紹
1.1、kafka 介紹
Kafka 是一個分布式消息引擎與流處理平台,經常用做企業的消息總線、實時數據管道,有的還把它當做存儲系統來使用。
早期 Kafka 的定位是一個高吞吐的分布式消息系統,目前則演變成了一個成熟的分布式消息引擎,以及流處理平台。
Kafka 主要起到削峰填谷(緩沖)、系統解構以及冗余的作用,主要特點有:
- 高吞吐、低延時:這是 Kafka 顯著的特點,Kafka 能夠達到百萬級的消息吞吐量,延遲可達毫秒級;
- 持久化存儲:Kafka 的消息最終持久化保存在磁盤之上,提供了順序讀寫以保證性能,並且通過 Kafka 的副本機制提高了數據可靠性。
- 分布式可擴展:Kafka 的數據是分布式存儲在不同 broker 節點的,以 topic 組織數據並且按 partition 進行分布式存儲,整體的擴展性都非常好。
- 高容錯性:集群中任意一個 broker 節點宕機,Kafka 仍能對外提供服務。
使用消息隊列的好處:
解耦、冗余(每個分區都有副本)、提高擴展性、靈活性 & 峰值處理能力、可恢復性(有副本)、順序保證、緩沖、異步通信
1.2、kafka術語
-
生產者(Producer):
- 向 broker 發布消息的應用程序。
- 生產者也負責選擇發布到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。
-
消費者(Consumer):
-
從消息隊列中獲取消息的客戶端應用程序。
-
一個 topic 可以讓若干個消費者進行消費,若干個消費者組成一個 Consumer Group 即消費組,一條消息只能被消費組中一個 Consumer 消費。
-
假如所有的消費者都在一個組中,那么這就變成了 queue 模型。 假如所有的消費者都在不同的組中,那么就完全變成了發布-訂閱模型。
-
更通用的,我們可以創建一些消費者組作為邏輯上的訂閱者。每個組包含數目不等的消費者,一個組內多個消費者可以用來擴展性能和容錯。如下圖所示:
-
-
2個 kafka 集群托管4個分區(P0-P3),2個消費者組,消費組 A 有2個消費者實例,消費組 B 有4個。
-
kafka 中消費者組有兩個概念:隊列:消費者組(consumer group)允許消費者組成員瓜分處理。發布訂閱:允許你廣播消息給多個消費者組(不同名)。
-
傳統的消息有兩種模式:隊列和發布訂閱。
-
在隊列模式中,消費者池從服務器讀取消息(每個消息只被其中一個讀取),優點是允許多個消費者瓜分處理數據,這樣可以擴展處理。;
-
發布訂閱模式:消息廣播給所有的消費者。允許你廣播數據到多個消費者,由於每個訂閱者都訂閱了消息,所以沒辦法縮放處理。
-
-
broker:
- Kafka 實例,多個 broker 組成一個 Kafka 集群,Kafka 以集群方式運行,集群中每個服務器稱為 broker。
- 通常一台機器部署一個 Kafka 實例,一個實例掛了不影響其他實例
-
主題(Topic):
- 一組消息的歸納(代表不同的業務,如超市辦會員,付款)。
- 服務端消息的邏輯存儲單元。一個 topic 通常包含若干個 Partition 分區。
-
分區(Partition):
- 一個 Topic 中的消息數據按照多個分區組織,分區是 Kafka 消息隊列組織的最小單位,一個分區可以看作是一個隊列。
- 分布式存儲在各個 broker 中, 實現發布與訂閱的負載均衡。
- 若干個分區可以被若干個 Consumer 同時消費,達到消費者高吞吐量。
- 單個 partition 有序,整體無序,整體有序就將數據都放到一個 partition 中,但是效率極低。
- 每個分區有一個 leader,零或多個 follower。Leader 處理此分區的所有的讀寫請求,而follower被動的復制數據。如果leader宕機,其它的一個follower會被推舉為新的leader。
- 一台服務器可能同時是一個分區的 leader,另一個分區的 follower。 這樣可以平衡負載,避免所有的請求都只讓一台或者某幾台服務器處理。
-
message:
- 消息,或稱日志消息,是 Kafka 服務端實際存儲的數據,每一條消息都由一個 key、一個 value 以及消息時間戳 timestamp 組成。
-
offset:
- 偏移量,分區中的消息位置,由 Kafka 自身維護,Consumer 消費時也要保存一份 offset 以維護消費過的消息位置。
1.3、四個核心 API
- Producer API 發布消息到1個或多個 topic(主題)中。
- Consumer API 來訂閱一個或多個topic,並處理產生的消息。
- Streams API 充當一個流處理器,從1個或多個 topic 消費輸入流,並生產一個輸出流到1個或多個輸出 topic,有效地將輸入流轉換到輸出流。
- Connector API 可構建或運行可重用的生產者或消費者,將 topic 連接到現有的應用程序或數據系統。例如,連接到關系數據庫的連接器可以捕獲表的每個變更。

二、kafka 客戶端
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
2.1、 KafkaProduce
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author xiandongxie 2020-06-04
*/
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092");
// 判別請求是否為完整的條件(判斷是不是成功發送了)。指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的
props.put("acks", "all");
// 如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重復消息的可能性
props.put("retries", 0);
// 生產者緩存每個分區未發送的消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。並需要更多的內存(每個“活躍”的分區都有1個緩沖區)
props.put("batch.size", 16384);
// 默認緩沖可立即發送,即便緩沖空間還沒有滿,但是,如果想減少請求的數量,可以設置 linger.ms 大於0。
// 這將指示生產者發送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似於TCP的算法,例如,可能100條消息在一個請求發送,因為我們設置了linger(逗留)時間為1毫秒,然后,如果我們沒有填滿緩沖區,這個設置將增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處於高負載的情況下,如果設置比0大,以少量的延遲代價換取更少的,更有效的請求。
props.put("linger.ms", 1);
// 控制生產者可用的緩存總量,如果消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用將被阻塞,阻塞時間的閾值通過 max.block.ms 設定,之后它將拋出一個TimeoutException。
props.put("buffer.memory", 33554432);
// key.serializer 和 value.serializer,將用戶提供的 key 和 value 對象 ProducerRecord 轉換成字節,可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 100; i < 500; i++)
// send()方法是異步的,添加消息到緩沖區等待發送,並立即返回。生產者將單個的消息批量在一起發送來提高效率
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
2.2、KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 【自動提交偏移量】的簡單的kafka消費者API
*
* @author xiandongxie
*/
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092");
// 消費者組名稱
props.put("group.id", "test");
// 設置 enable.auto.commit,偏移量由 auto.commit.interval.ms 控制自動提交的頻率。
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 指定訂閱 topic 名稱
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}