kafka-clients介紹


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

這里是基於2.4.0進行說明的。

一、kafka生產者

org.apache.kafka.clients.producer.KafkaProducer -- 生產者
org.apache.kafka.clients.producer.ProducerConfig -- 生產者配置
org.apache.kafka.clients.producer.ProducerRecord -- 消息

不同的應用場景對消息有不同的需求,即是否允許消息丟失、重復、延遲以及吞吐量的要求。不同場景對Kafka生產者的API使用和配置會有直接的影響。

kakfa發送消息的主要步驟:

每個消息是一個ProducerRecord對象,必須指定消息所屬的Topic和消息值Value,此外還可以指定消息所屬的Partition以及消息的Key。
① 序列化ProducerRecord有多個構造器
② 如果ProducerRecord中指定了Partition,則Partitioner不做任何事情;否則,Partitioner根據消息的key得到一個Partition。這是生產者就知道向哪個Topic下的哪個Partition發送這條消息。
③ 消息被添加到相應的batch中,獨立的線程將這些batch發送到Broker上。
④ broker收到消息會返回一個響應。如果消息成功寫入Kafka,則返回RecordMetaData對象,該對象包含了Topic信息、Patition信息、消息在Partition中的Offset信息;若失敗,返回一個錯誤。

 Kafka的順序保證。Kafka保證同一個partition中的消息是有序的,即如果生產者按照一定的順序發送消息,broker就會按照這個順序把他們寫入partition,消費者也會按照相同的順序讀取他們。

1. 生產者配置

bootstrap.servers 
    list。
    用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。
    格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了。

acks 
    字符串, 默認值是1。
    Server完成 producer request 前需要確認的數量。
    acks=0時,producer不會等待確認,直接添加到socket等待發送;
    acks=1時,等待leader寫到local log就行;
    acks=all或acks=-1時,等待isr中所有副本確認
    (注意:確認都是 broker 接收到消息放入內存就直接返回確認,不是需要等待數據寫入磁盤后才返回確認,這也是kafka快的原因)

buffer.memory
    long, 默認值33554432。
    Producer可以用來緩存數據的內存大小。該值實際為RecordAccumulator類中的BufferPool,即Producer所管理的最大內存。
    如果數據產生速度大於向broker發送的速度,producer會阻塞max.block.ms,超時則拋出異常。

compression.type
    字符串,默認值none。
    Producer用於壓縮數據的壓縮類型,取值:none, gzip, snappy, or lz4。
    
batch.size
    int,默認值16384。
    Producer可以將發往同一個Partition的數據做成一個Produce Request發送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。
    另外每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request發送的目的Broker均為這些partition的leader副本。
    若將該值設為0,則不會進行批處理。

linger.ms
    long, 默認值0。
    Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然后再發送,以此提高吞吐量,而linger.ms則更進一步,這個參數為每次發送增加一些delay,以此來聚合更多的Message。
    官網解釋翻譯:producer會將request傳輸之間到達的所有records聚合到一個批請求。通常這個值發生在欠負載情況下,record到達速度快於發送。但是在某些場景下,
    client即使在正常負載下也期望減少請求數量。這個設置就是如此,通過人工添加少量時延,而不是立馬發送一個record,producer會等待所給的時延,以讓其他records發送出去,
    這樣就會被聚合在一起。這個類似於TCP的Nagle算法。該設置給了batch的時延上限:當我們獲得一個partition的batch.size大小的records,就會立即發送出去,而不管該設置;
    但是如果對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設置的默認值為0(無時延)。例如,設置linger.ms=5,會減少request發送的數量,
    但是在無負載下會增加5ms的發送時延。

max.request.size
    int, 默認值1048576。
    請求的最大字節數。這也是對最大消息大小的有效限制。注意:server具有自己對消息大小的限制,這些大小和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。

receive.buffer.bytes
    int, 默認值32768。
    TCP的接收緩存 SO_RCVBUF 空間大小,用於讀取數據。

request.timeout.ms
    int,默認值30000。
    client等待請求響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求,超過重試次數發送失敗。


send.buffer.bytes
    int,默認值131072。
    TCP的發送緩存 SO_SNDBUF 空間大小,用於發送數據。
    
timeout.ms
    int,默認值30000。
    指定server等待來自followers的確認的最大時間,根據acks的設置,超時則返回error。


max.in.flight.requests.per.connection
    int,默認值5。
    在block前一個connection上允許最大未確認的requests數量。當設為1時,即是消息保證有序模式,
    注意:這里的消息保證有序是指對於單個Partition的消息有順序,因此若要保證全局消息有序,可以只使用一個Partition,當然也會降低性能

metadata.fetch.timeout.ms
    long,默認值60000。
    在第一次將數據發送到某topic時,需先fetch該topic的metadata,得知哪些服務器持有該topic的partition,該值為最長獲取metadata時間。

reconnect.backoff.ms
    long,默認值50.
    連接失敗時,當我們重新連接時的等待時間。

retry.backoff.ms
    long, 默認值100。
    在重試發送失敗的request前的等待時間,防止若目的Broker完全掛掉的情況下Producer一直陷入死循環發送,折中的方法。

metrics.sample.window.ms
    long, 默認值3000
    metrics系統維護可配置的樣本數量,在一個可修正的window size

metrics.num.samples
    int, 默認值2
    用於維護metrics的樣本數

metric.reporters
    數組[]
    類的列表,用於衡量指標。實現MetricReporter接口

metrics.log.level
    metrics日志記錄級別,默認info

metadata.max.age.ms
    long, 默認值300000
    強制刷新metadata的周期,即使leader沒有變化

security.protocol
    默認值PLAINTEXT
    與broker會話協議,取值:LAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

partitioner.class
    默認值class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    分區類,實現Partitioner接口
    
max.block.ms
    long,默認值60000
    控制block的時長,當buffer空間不夠或者metadata丟失時產生block

connections.max.idle.ms
    long,默認值540000
    設置多久之后關閉空閑連接

client.id
    字符串,默認值""
    當向server發出請求時,這個字符串會發送給server,目的是能夠追蹤請求源

retries
    int,默認值0
    發生錯誤時,重傳次數。當開啟重傳時,需要將`max.in.flight.requests.per.connection`設置為1,否則可能導致失序

key.serializer
    key 序列化方式,類型為class,需實現Serializer interface, 如:org.apache.kafka.common.serialization.StringSerializer

value.serializer
    value 序列化方式,類型為class,需實現Serializer interface,如org.apache.kafka.common.serialization.StringSerializer

enable.idempotence
    true為開啟冪等性

transaction.timeout.ms 
    事務超時時間,默認60000,單位ms

transactional.id 
    設置事務id,必須唯一

2. 創建Kafka生產者

import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * kafka生產者
 */
public class KafkaProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
1       properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            //參數1:topic名, 參數2:消息文本; ProducerRecord多個重載的構造方法
            kafkaProducer.send(new ProducerRecord<String, String>("test20200519", "message"+i));
            System.out.println("message"+i);
        }
        kafkaProducer.close();
    }
}

Kafka的生產者有如下三個必選的屬性:
① bootstrap.servers,指定broker的地址清單
② key.serializer必須是一個實現org.apache.kafka.common.serialization.Serializer接口的類,將key序列化成字節數組。注意:key.serializer必須被設置,即使消息中沒有指定key。
③ value.serializer,將value序列化成字節數組。

3. 發送消息到kafka

(1) 同步發送消息

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//KafkaProducer類發送數據,kafka Producer是線程安全的,可以在多個線程之間共享生產者實例
org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
// -- 同步發送消息
for (int i = 1; i <= 600; i++) {
    //參數1:topic名, 參數2:消息文本; ProducerRecord多個重載的構造方法
    kafkaProducer.send(new ProducerRecord<String, String>("test20200519", "message"+i));
    System.out.println("message"+i);
}

//或者
ProducerRecord<String, String> syncRecord = new ProducerRecord<>("test20200519", "Kafka_Products", "測試"); //Topic Key Value
try{
    Future future = kafkaProducer.send(syncRecord);
    future.get();//不關心是否發送成功,則不需要這行。
} catch(Exception e) {
    e.printStackTrace();//連接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常
}

kafkaProducer.close();

(2) 異步發送消息

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//KafkaProducer類發送數據,kafka Producer是線程安全的,可以在多個線程之間共享生產者實例
org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);


// -- 異步發送消息
ProducerRecord<String, String> asyncRecord = new ProducerRecord<String, String>("test20200519", "Kafka_Products","測試--1");//Topic Key Value
kafkaProducer.send(asyncRecord, new DemoProducerCallback());//發送消息時,傳遞一個回調對象,該回調對象必須實現org.apache.kafka.clients.producer.Callback接口

kafkaProducer.close();



class DemoProducerCallback implements Callback{
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {//如果Kafka返回一個錯誤,onCompletion方法拋出一個non null異常。
            e.printStackTrace();//對異常進行一些處理,這里只是簡單打印出來
        }
    }
}

4. 序列化器

建議使用JSON、Apache Avro、Thrift或者Protobuf這些成熟的序列化/反序列化方案。

5. 分區

我們創建消息的時候,必須要提供主題和消息的內容,而消息的key是可選的,當不指定key時默認為null。消息的key有兩個重要的作用:1)提供描述消息的額外信息;2)用來決定消息寫入到哪個分區,所有具有相同key的消息會分配到同一個分區中。

如果key為null,那么生產者會使用默認的分配器,該分配器使用輪詢(round-robin)算法來將消息均衡到所有分區。

如果key不為null而且使用的是默認的分配器,那么生產者會對key進行哈希並根據結果將消息分配到特定的分區。注意的是,在計算消息與分區的映射關系時,使用的是全部的分區數而不僅僅是可用的分區數。這也意味着,如果某個分區不可用(雖然使用復制方案的話這極少發生),而消息剛好被分配到該分區,那么將會寫入失敗。另外,如果需要增加額外的分區,那么消息與分區的映射關系將會發生改變,因此盡量避免這種情況。

6. 自定義分配器

下面將key為Banana的消息單獨放在一個分區,與其他的消息進行分區隔離:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;

public class BananaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {

    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceof String)))
            throw new InvalidRecordException("We expect all messages to have customer name as key");
        if (((String) key).equals("Banana"))
            return numPartitions; // Banana will always go to last partition

        // Other records will get hashed to the rest of the partitions
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
    }

    public void close() {
    }
}

二、kafka消費者

1. 消費者配置

group.id
    字符串
    消費者所屬消費組的唯一標識

max.poll.records 
    int,默認值500
    一次拉取請求的最大消息數

max.poll.interval.ms 
    int, 默認值300000
    指定拉取消息線程最長空閑時間

session.timeout.ms 
    int, 默認值10000
    檢測消費者是否失效的超時時間

heartbeat.interval.ms 
    int, 默認值10000
    消費者心跳時間,默認3000ms

bootstrap.servers 
    連接集群broker地址

enable.auto.commit 
     boolean, 默認值true
    是否開啟自動提交消費位移的功能

auto.commit.interval.ms 
    int,默認值5000
    自動提交消費位移的時間間隔

partition.assignment.strategy 
    range 或 roundrobin,默認的值是range。
    消費者的分區配置策略。

auto.offset.reset 
    字符串,默認值"latest"
    如果分區沒有初始偏移量,或者當前偏移量服務器上不存在時,將使用的偏移量設置,earliest從頭開始消費,latest從最近的開始消費,none拋出異常

fetch.min.bytes 
    int,默認值1
    消費者客戶端一次請求從Kafka拉取消息的最小數據量,如果Kafka返回的數據量小於該值,會一直等待,直到滿足這個配置大小,默認1b

fetch.max.bytes 
    int,默認值50*1024*1024,即50MB
    消費者客戶端一次請求從Kafka拉取消息的最大數據量

fetch.max.wait.ms 
    int,默認500。
    從Kafka拉取消息時,在不滿足fetch.min.bytes條件時,等待的最大時間

metadata.max.age.ms 
    強制刷新元數據時間,毫秒,默認300000,5分鍾

max.partition.fetch.bytes 
    int,默認1*1024*1024,即1MB
    設置從每個分區里返回給消費者的最大數據量,區別於fetch.max.bytes。

send.buffer.bytes
    int,默認128*1024
    Socket發送緩沖區大小,默認128kb,-1將使用操作系統的設置

receive.buffer.bytes 
    Socket發送緩沖區大小,默認64kb,-1將使用操作系統的設置

client.id 
    消費者客戶端的id

reconnect.backoff.ms 
    連接失敗后,嘗試連接Kafka的時間間隔,默認50ms

reconnect.backoff.max.ms 
    嘗試連接到Kafka,生產者客戶端等待的最大時間,默認1000ms

retry.backoff.ms 
    消息發送失敗重試時間間隔,默認100ms

metrics.sample.window.ms 
    樣本計算時間窗口,默認30000ms

metrics.num.samples 
    用於維護metrics的樣本數量,默認2

metrics.log.level 
    metrics日志記錄級別,默認info

metric.reporters 
    類的列表,用於衡量指標,默認空list

check.crcs 
    boolean,默認true
    自動檢查CRC32記錄的消耗

key.deserializer 
    key反序列化方式

value.deserializer 
    value反序列化方式

connections.max.idle.ms 
    設置多久之后關閉空閑連接,默認540000ms

request.timeout.ms 
    客戶端將等待請求的響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求,超過重試次數將拋異常,默認30000ms

default.api.timeout.ms 
    設置消費者api超時時間,默認60000ms

interceptor.classes 
    自定義攔截器

exclude.internal.topics 
    內部的主題:一consumer_offsets 和一transaction_state。該參數用來指定 Kafka 中的內部主題是否可以向消費者公開,默認值為 true。如果設置為 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式來訂閱內部主題,設置為 false 則沒有這個限制。

isolation.level 
    用來配置消費者的事務隔離級別。如果設置為“read committed”,那么消費者就會忽略事務未提交的消息,即只能消 費到 LSO (LastStableOffset)的位置,默認情況下為 “read_uncommitted”,即可以消 費到 HW (High Watermark)處的位置

2.  創建kafka消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * kafka消費者
 */
public class KafkaConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");
        //KafkaConsumer類不是線程安全的
        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test20200519"));
        try{
            while (true) {
                //拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }finally{
            consumer.close();
        }
    }
}

3. kafka多線程消費

kafka的KafkaConsumer不是線程安全的,必須保證只能被一個線程操作。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM