kafka的生產者
1. 生產者客戶端開發
熟悉kafka的朋友都應該知道kafka客戶端有新舊版本,老版本采用scala編寫,新版本采用java編寫。隨着kafka版本的升級,舊版本客戶端已經快被完全替代了。因此,我們以新客戶端為例進行介紹。
客戶端開發的步驟如下:
配置生產者客戶端參數及創建相應的生產者實例
構建待發送的信息
發送信息
關閉生產者實例
代碼如下:
public class ProducerFastStart {
public static final String brokerList="node112:9092,node113:9092,node114:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
//配置生產者客戶端參數
Properties prop = new Properties();
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("bootstrap.servers",brokerList);
//創建生產者客戶端
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//構建所要發送到的生產者消息
ProducerRecord record = new ProducerRecord(topic, "hello,Kafka");
try {
producer.send(record);//發送消息
} catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();//關閉生產者客戶端
}
}
}
需要maven依賴如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
這里有必要對構建的消息對象ProduceRecord進行說明,ProduceRecord對象包括以下幾個屬性:
topic和partititon用來指定消息發送到主題分區。header是指消息頭部,從0.11.x這個版本引進的。Key是指消息的鍵,可通過分區號讓消息發往特定的分區【可以使key相同的消息發送到同一分區】,有key的消息還可以支持日志壓縮的功能。value為消息體,一般不為空,如果為空則表示特定的消息——墓碑消息。timestamp指消息的時間戳,有兩種類型CreateTime和LogAppendTime,前者表示消息創建時間,后者表示消息追加到日志文件的時間。
public class ProducerRecord<K, V> {
private final String topic;//主題
private final Integer partition;//分區號
private final Headers headers;//消息頭部,其實就是一個Iterable<Header>
private final K key;//消息的key
private final V value;//消息的value
private final Long timestamp;//消息的時間戳
}
通過以下這種方式創建ProduceRecord對象,只是指定了最基本的兩個屬性,topic和value。ProducerRecord包括多個構造函數,可靈活使用。
1.1 必要的參數配置
bootstrap.servers:指定客戶端連接的broker地址清單。
Key.serializer和value.serializer用於指定消息的key和value的序列化器。
client.id指定KafkaProducer的id,默認系統會自動生成。
由於參數的名稱特別多,而且是字符串容易寫錯,因此客戶端提供了一個類ProducerConfig,包括所有的參數名稱。同樣需要注意,由於key和value的序列化器需要類的全限定名,可通過一下方式改進。
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
Producer是thread safe,通常情況下,多個線程共享一個Producer實例要比使用多個Producer實例效率要高。生產者包括一個緩沖區空間池,其中保存尚未傳輸到服務器的記錄,以及一個后台I/O線程,該線程負責將這些記錄轉換為請求並將它們傳輸到集群。使用后不關閉生產商將泄漏這些資源。
1.2 消息的發送
發送消息主要有三種方式:發后即忘(fire-and-forget)、同步sync和一部async。
(1)fire-and-forget
只管往kafka中發送消息,不管消息是否到達。大多數情況下,這種方式不會出現問題,但當發生不可重試異常時,會造成數據丟失。性能最高,可靠性最差。以下這種方式就是fire-and-forget:
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
(2)sync
send()方法是有返回值的,是一個Future對象。
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
實際上send方法本身是異步,可以通過調用Future對象的get方法阻塞等待Kafka的響應,直到消息發送成功或拋出異常【對異常可以做響應的處理】,實現同步。可以從RecordMetadata獲取發送成功的ProduceRecord的相關元數據,包括topic、partition、offset、timestamp等。當然也可以通過Future的get(long timeout, TimeUnit unit)實現超時阻塞。
try {
//producer.send(record).get();
Future<RecordMetadata> future = producer.send(record);
RecordMetadata rm = future.get();
System.out.println(rm.topic()+"-"+rm.partition()+"-"+rm.offset()+"-"+rm.timestamp());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
另外,KafkaProducer一般會產生兩類異常:可重試異常和不可重試異常。可充實異常有NetworkException、LeaderNotAvaliableException、UnKnownTopicOrPartitonException、NotEnoughRepliasException、NotCoordinatorException。對於可重試異常,可以通過配置retires屬性,進行特定次數的重試,重試成功不會拋出異常,重試失敗拋出異常。
prop.put(ProducerConfig.RETRIES_CONFIG, 10);
對於不可重復異常,如RecordTooLargeException,發生后支持拋出異常。
同步方式可靠性高,要么消息發送成功,要么發生異常,可捕獲進行處理。不過同步方式的性能要差一些,需要阻塞等待消息發送完之后才能發送下一條消息。
(3)async
send()方法也是重載的,可以傳入一個CallBack回調函數,kafka在響應時調用該函數來實現異步發送的確認。onCompletion這兩個方法是互斥的,要么exception為null,要么metadata為null。另外需要注意的是回調函數能夠保證分區有序。即如果record1先於record2先發送 ,則對應的callback1先於callback2被調用。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
exception.printStackTrace();
}else {
System.out.println(metadata.topic() + "-" + metadata.partition());
}
}
});
對於producer的close方法也是重載,可以實現超時強行關閉,但是一般不這樣使用。
public void close()
public void close(long timeout, TimeUnit timeUnit)
1.3 序列化
生產者需要用序列化器把對象轉換成字節數組才能通過網絡發送給Kafka集群,同樣消費者必須通過與之對應的反序列化器進行解析。kafka-client提供了多種數據類型對象的序列化器,父接口為org.apache.kafka.common.serialization.Serializer接口。
public interface Serializer<T> extends Closeable {
void configure(Map<String, ?> configs, boolean isKey);//通常重寫為空方法
byte[] serialize(String topic, T data);//序列化方法
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
@Override
void close();//通常重寫為空方法
}
自定義序列化器:
這里為了方面使用了lombok框架,maven依賴如下,注意在idea中還要安裝響應的插件,否則注解不生效。
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<scope>provided</scope>
</dependency>
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
private String name;
private String address;
}
創建Company對象的序列化器:
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CompanySerializer implements Serializer<Company> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
//空實現
}
@Override
public byte[] serialize(String topic, Company data) {
if(data == null) {
return null;
}
byte[] name, address;
try {
if(data.getName() == null) {
name = new byte[0];
}else {
name = data.getName().getBytes("UTF-8");
}
if(data.getAddress() == null) {
address = new byte[0];
}else {
address = data.getAddress().getBytes("UTF-8");
}
//分別用4個字節用來存儲name的長度和address的長度,然后是name和address,定義一種規則方便反序列化的保證正確
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
//空實現
}
}
使用的話只需要設置prop的key.serializer等設置為CompanySerializer即可。
1.4 分區器
消息通過send()方法發送到broker的過程中,有可能經過攔截器(Interceptor)、序列化器(Serializer)和分區器(Partitioner)的一系列作用之后才會發往broker。攔截器不是必須的,序列化器是必須的。消息進過序列化之后就要確定發往那個分區。如果ProducerRecord中指定了partition字段,則不需要分區器的作用,如果沒有,則需需要依賴於分區器,根據Producerrecord的key進行分區。
1.4.1 默認分區器
分區器的父接口為org.apache.kafka.clients.producer.Partitioner接口。
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
默認分區器為:org.apache.kafka.clients.producer.internals.DefalutPartitoner,,源碼如下:
public class DefaultPartitioner implements Partitioner {
//線程安全的HashMap,用於存放每個topic關聯的一個原子對象
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//獲取kafka集群中對應topic的所有分區。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//如果Record的key為null
if (keyBytes == null) {
//取出當前Topic的原子變量值並加1,實際就是輪訓
int nextValue = nextValue(topic);
//獲取當前可用的分區
List<PartitionInfo> availablePa
rtitions = cluster.availablePartitionsForTopic(topic);
//如果存在可用的分區,則在可用分區中進行輪訓
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 如果沒有可用分區就在所有分區中進行輪訓
return Utils.toPositive(nextValue) % numPartitions;
}
} else {//如果Record的key不為null
// 根據key進行hash值的計算,放入到對應分區,保證相同的key永遠放入到同一分區。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
//nextValue()方法就是為每個topic的產生一個隨機值,便於輪訓
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
默認分區器的邏輯就是:
如果key不為空,則進行對key進行hash計算分區
如果為空,且存在可用分區,則在可用分區中輪訓,不存在可用分區,則在所有分區中輪訓。
1.4.2 自定義分區器
可通過實現partitioner接口,自定義分區器。
分區邏輯就是,有key進行hash分區,無key在所有分區中輪訓
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitioners = cluster.availablePartitionsForTopic(topic);
int numPartitions = partitioners.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
分區器的配置:
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
1.5 生產者攔截器
攔截器是在Kafka0.10.0.0版本出現的,有生產者攔截器和消費者攔截器兩種。
生產者攔截器將消息進行序列化和計算分區之前進行"攔截",這里所謂攔截主要體現在兩個方面:
(1)為消息提供定制化的操作
(2)可以用來在發送回掉邏輯前做一些定制化的需求。
攔截器通過自定義實現org.apache.kafka.clients.producer.ProducerInterceptor。onSend()方法對消息進行相應的定制化操作;onAcknowledgement()方法會在消息在應答之前或消息發送失敗時被調用,因此此方法優先於callback方法執行。
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
public class ProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String,String> {
private volatile long sendSuccess = 0;
private volatile long sendFaliure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "prefix-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(metadata == null) {
sendFaliure++;
}else {
sendSuccess++;
}
}
@Override
public void close() {
System.out.println("發送成功率為:" + (double) sendSuccess / (sendSuccess + sendFaliure));
}
@Override
public void configure(Map<String, ?> configs) {
}
}
攔截器的使用:
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class.getName());
多個攔截器之間用“,”隔開,注意多個攔截器是有順序的。
2. 原理分析
KafkaProudcer在真正把消息發往Kafka集群時,會依次經歷攔截器、序列化器、和分區器,然后緩存到消息累加器RecordAccumulator中,Sender線程負責從RecordAccumulator中獲取消息並發送到Kafka集群。
2.1 消息發送到RecordAccumulator
KafkaProducer調用send方法發送ProducerRecord,首先會通過攔截器鏈進行定制化操作,然后調用了doSend方法。發放如下:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
//1. 阻塞式獲取metaData
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
//2.對key進行序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
//3.value進行序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
//根據消息和集群metaData計算發往的分區
int partition = partition(record, serializedKey, serializedValue, cluster);
//創建主題分區
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
//估算消息記錄的總長度
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
//消息長度的有效性檢驗
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
//創建CallBack對象
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
//消息記錄提交到Accumulator。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
(1)waitOnMetadata阻塞式獲取metaData,超過${max.block.ms}時間依舊未獲取到,則拋TimeoutException,消息發送失敗。
(2)對key和value進行序列化
(3)根據消息和集群metaData計算發往的分區,,並創建主題分區對象。
(4)估算消息記錄的總長度的上限,並對消息記錄的總長度進行檢驗。如果上限大於{max.request.size},拋出RecordTooLargeException異常。如果上限大於{buffer.memory},也會拋出RecordTooLargeException異常。
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
if (size > this.totalMemorySize)
throw new RecordTooLargeException("The message is " + size +" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
(5)將消息記錄append到RecordAccumulator。
RecordAccumulator對象就是為了緩存消息便於Sender線程可以批量發送,減少網絡傳輸的資源消耗,提升性能。既然是緩存,就肯定有大小。RecordAccumulator可由參數{buffer.memory}指定,默認是32M。那它內部數據的組織形式是怎樣的呢。它內部有個private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches屬性,是一個Map對象,key為主題分區,value是ProducerBatch的雙端隊列,正如圖上所示,它對消息記錄按照分區進行緩存,每個分區對應一個ProducerBatch的雙端隊列。那ProducerBatch又是什么呢。ProducerBatch就是ProducerRecord的批次,可以包括一個或多個ProduerRecord,ProducerBatch的大小可以通過batch.size這個參數設置,不過當一個ProducerRecord的大小超過batch.size的大小時,就會生產一個新的ProducerBatch,這個ProducerBatch的大小就是該ProducerRecord的大小。也許你會產生一個疑問,既然ProducerBatch的大小不一定等於batch.size,那么為什么還要使用這個參數,其實是為了更好的管理內存,在kafka中通過java.io.ByteBuffer實現消息內存的創建和釋放,不過為了減少頻繁的創建和釋放內存空間,RecordAccumulator內部使用了BufferPool實現對特定大小的ByteBuffer進行管理,實現復用,特定大小就是通過batch.size這個參數進行設置,同樣如果當前ProducerBatch的大小超過batch.size,那個這個ByteBuffer不能實現復用。
RecordAccumulator通過append方法將ProducerRecord追加到具體的ProducerBatch中,過程如下:
(1)記錄當前正在進行append消息的線程數,方便當客戶端調用 KafkaProducer.close()強制關閉發送消息操作時放棄未處理完的請求,釋放資源
(2)getOrCreateDeque,獲得或創建主題分區對應的ProducerBatch的雙端隊列。
(3)tryAppend(timestamp, key, value, headers, callback, dq),嘗試將消息append到雙端隊列。
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,Callback callback, Deque<ProducerBatch> deque) {
//從雙端隊列中獲得最后一個ProducerBatch
ProducerBatch last = deque.peekLast();
if (last != null) {//如果ProducerBatch存在,則嘗試將消息追加到這個ProducerBatch中。
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
//如果追加不成功,關閉這個batch的記錄追加
if (future == null)
last.closeForRecordAppends();
else//追加陳宮返回一個RecordAppendResult對象
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
//如果這個隊列為空,即不存在任何一個ProducerBatch,返回null
return null;
}
再來看一下ProducerBatch如何嘗試append消息。
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
//檢查是否有足夠的空間用來緩存該消息,如果沒有返回null
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {//如果有則進行緩存
//追加消息
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
//計算最大消息記錄
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
//計算最后一個添加消息的時間
this.lastAppendTime = now;
//構建一個FutureRecordMetadata對象
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
//添加thunck對象
thunks.add(new Thunk(callback, future));
//記錄加1
this.recordCount++;
return future;
}
}
(4)若上述嘗試append消息失敗,即返回null,此時需要向BufferPool申請空間用於創建新的ProducerBatch對象,並將消息append到新創建的ProducerBatch中,最后返回處理結果。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//記錄當前正在進行append消息的線程數,方便當客戶端調用 KafkaProducer.close()強制關閉發送消息操作時放棄未處理完的請求,釋放資源
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 獲得或創建主題分區對應的ProducerBatch的雙端隊列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//嘗試將消息append到雙端隊列
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 4.向BufferPool申請空間用於創建新的ProducerBatch對象
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
//將消息append到新創建的ProducerBatch中
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
2.2 Sender發送消息至Kafka集群
Sender從RecordAccumulator中獲取獲取緩存的消息后,會進一步將<TopicPartition, Deque<ProducerBatch>>封裝成<Node, List<ProducerBatch>>,還會進一步封裝為<Node,Request>的形式。Sender發送到kafka之前還會保存到保存到InFlightRequest中,InFlightRequest保存對象的具體格式為Map<NodeId, Deque<Request>>,主要作用是緩存了已經發出去的Request。其中可通過一個參數max.in.flight.requests.per.connection(默認為5)設置客戶端與每個Node之間緩存的Request的最大值。超過這份最大值,就不能再向這個連接發送請求了。因此可以通過 Deque<Request>的size來判斷對用的Node中是否堆積了很多未處理的消息,如果真是如此,說明Node節點的網絡負載較大或者連接有問題。
2.3 元數據的更新
所謂的元數據是指的Kafka集群的元數據,包括集群中的主題、分區、Leader、Follower等,當客戶端不存在需要使用的元數據信息或者超過metadata.max.age.ms[默認5分鍾],會引起元數據的更新。當元數據需要更新時,會首先挑選出負載最小的node,向他發送MetaDataRequest請求,這個更新操作由send線程發起,同樣會存入InFlightRequest中。由於主線程也需要元數據,因此需要通過synchronize和final關鍵字保證。
2.4 生產者客戶端的重要參數
acks
取值有0,1,-1,用於指定分區中至少有多少副本收到這個現象,之后生產者才會認為該消息被寫入。
max.request.size
限制生產者客戶端發送消息的最大值
reties 生產者發送出現異常時的重試次數
retry.backoff.ms 每次重試的時間間隔
compression.type 生產者端消息的壓縮方式
connections.max.idles.ms 連接限制關閉時間
linger.ms 用於配置ProducerBatch等待加入ProducerRecord的時間
receive.buffer.bytes Socket接收消息緩沖區
send.buffer.bytes Socket發送器的緩沖區
request.timeout.ms 生產者等待請求響應的最長時間,請求超時可以進行重試。這個參數大於broker端的replia.lag.time.max.ms
buffer.memory 生產者客戶端用於緩存消息的緩沖區大小
batch.size 指定ProducerBatch可以復用緩沖區的大小
max.block.ms 生產者send方法和paritionFor方法的阻塞時間
max.in.flight.requests.per.connection 限制每個鏈接最多緩存的請求數量
metadata.max.age.ms 更新元數據的時間