背景
某系統使用 Kafka 存儲實時的行情數據,為了保證數據的實時性,需要在多地機房維護多個 Kafka 集群,並將行情數據同步到這些集群上。
一個常用的方案就是官方提供的 KafkaMirrorMaker 方案:

該方案的優點是能盡可能保證兩個 Kafka 集群的數據一致(為了避免網絡故障導致丟數據,要將其與 Kafka Cluster B 部署在同個機房),並且使用者無需進行開發工作,只需要進行響應的配置即可。
存在的問題
行情數據具有數據量大且時效性強的特點:
- 跨機房同步行情數據會消耗較多的專線帶寬
- 網絡故障恢復后繼續同步舊數據意義不大並且可能引起副作用(行情數據延遲較大意味着已經失效)
因此 KafkaMirrorMaker 的同步方式存在以下兩個不合理的地方:
- 無法實現多機房廣播,會造成專線帶寬浪費(多個機房同時拉取同一份數據)
- 單個 Producer 可能成為系統吞吐量的瓶頸(降低一致性以提高性能)
Producer 發送鏈路
主要的發送流程發送流程如下:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 1. 阻塞獲取集群信息,超時后拋出異常
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 2. 序列化要發送的數據
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 3. 決定數據所屬的分區
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 4. 將數據追加到發送緩沖,等待發送線程異步發送
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 5. 喚醒異步發送線程,將緩沖中的消息發送給 brokers
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
} catch (Exception e) {
// ...
}
}
決定分區
Producer 的功能是向某個 topic 的某個分區消息,所以它首先需要確認到底要向 topic 的哪個分區寫入消息:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 如果 key 為空,使用 round-robin 策略確認目標分區(保證數據均勻)
int nextValue = nextValue(topic);
return Utils.toPositive(nextValue) % numPartitions;
} else {
// 如果 key 不為空,使用 key 的 hash 值確認目標分區(保證數據有序)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
追加緩沖
為了保證防止過量消息積壓在內存中,每個 Producer 會設置一個內存緩沖,其大小由buffer.memory選項控制。
如果緩沖區的數據超過該值,會導致Producer.send方法阻塞,等待內存釋放(記錄被發送出去或超時后被清理):
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 如果緩沖中存在未滿的 ProducerBatch,則會嘗試將記錄追加到其中
// ...
// 估計記錄所需要的空間
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 分配內存空間給當前記錄
// 如果內存空間不足則會阻塞等待內存空間釋放,如果超過等待時間會拋出異常
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// 再次嘗試向現存的 ProducerBatch 中追加數據,如果成功則直接返回
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
return appendResult;
}
// 新建 ProducerBatch 並將當前記錄追加到其中
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()))
;
dq.addLast(batch);
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
}
}
異步發送
每個 Producer 都有一個發送線程KafkaProducer.ioThread,該線程會不停地調用Sender.sendProducerData方法將緩沖中的 RecordBatch 發送出去:
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// 獲取就緒的 broker 節點信息,准備發送
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (!result.unknownLeaderTopics.isEmpty()) {
// 如果部分 topic 沒有 leader 節點,則觸發強制刷新
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 根據就緒 broker 節點信息,獲取緩沖中對應的 ProducerBatch,准備發送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// 排除已經檢查過的分區,避免重復檢查
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 清理已經過期的 ProducerBatch 數據,釋放被占用的緩沖內存
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
}
// 如果任意 broker 節點已經就緒,則將 pollTimeout 設置為 0
// 這是為了避免不必要的等待,讓內存中的數據能夠盡快被發送出去
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
pollTimeout = 0;
}
// 通過 NetworkClient -> NetworkChannel -> TransportLayer
// 最終將將消息寫入 NIO 的 Channel
sendProduceRequests(batches, now);
return pollTimeout;
}
優化方案
從前面的分析我們可以得知以下兩點信息:
- 每個 Producer 有一個內存緩沖區,當空間耗盡后會阻塞等待內存釋放
- 每個 Producer 有一個異步發送線程,且只維護一個 socket 連接(每個 broker 節點)
為了提高轉發效率、節省帶寬,使用 Java 復刻了一版 KafkaMirrorMaker 並進行了一些優化:

- 支持將一個集群的數據廣播到多個集群
- 使用多個 Producer 同時進行轉發提高效率
數據保序
如果同時使用多個 Producer,可能在轉發過程中發生數據亂序,折中的策略是根據 key 的 hash 值來選擇 Producer,保證 key 相同的數據會使用同個 Producer 進行發送:
void send(ConsumerRecord<byte[], byte[]> message) {
ProducerRecord record = new ProducerRecord<>(message.topic(), message.key(), message.value());
int hash = Math.abs(Arrays.hashCode(message.key()));
producers[hash % producers.length].send(record, onSend);
}
水位控制
多集群廣播雖然能夠一定程度上節省流量與機器資源,但是需要處理多個集群間發送速度不一致的問題。
極端情況下,如果其中某個機房的專線發生故障,Producer 會阻塞等待消息超時。當過量消息積壓在 Queue 中,會導致 JMV 頻繁的 FullGC,最終影響到對另一個機房的轉發。
為了處理這一情況,需要在發送隊列上加上水位線watermark限制:
interface Watermark {
default long high() { return Long.MAX_VALUE; }
default long low() { return 0; }
}
final BlockingQueue<byte[]> messageQueue = new LinkedBlockingQueue<>();
final AtomicLong messageBytes = new AtomicLong();
private void checkWatermark(Watermark bytesWatermark) {
long bytesInQueue = messageBytes.get();
if (bytesInQueue > bytesWatermark.high()) {
long discardBytes = bytesInQueue - bytesWatermark.low();
WatermarkKeeper keeper = new WatermarkKeeper(Integer.MAX_VALUE, discardBytes);
keeper.discardMessage(messageQueue);
long remainBytes = messageBytes.addAndGet(-discard.bytes());
}
}
為了實現高效的數據丟棄,使用BlockingQueue.drainTo減少鎖開銷:
public class WatermarkKeeper extends AbstractCollection<byte[]> {
private final int maxDiscardCount; // 丟棄消息數量上限
private final long maxDiscardBytes; // 丟棄消息字節上限
private int count; // 實際丟棄的消息數
private long bytes; // 實際丟棄消息字節數
public MessageBlackHole(int maxDiscardCount, long maxDiscardBytes) {
this.maxDiscardCount = maxDiscardCount;
this.maxDiscardBytes = maxDiscardBytes;
}
public void discardMessage(BlockingQueue<byte[]> queue) {
try {
queue.drainTo(this);
} catch (StopDiscardException ignore) {}
}
@Override
public boolean add(byte[] record) {
if (count >= maxDiscardCount || bytes >= maxDiscardBytes) {
throw new StopDiscardException();
}
count++;
bytes += record.length;
return true;
}
@Override
public int size() {
return count;
}
public long bytes() {
return bytes;
}
@Override
public Iterator<byte[]> iterator() {
throw new UnsupportedOperationException("iterator");
}
// 停止丟棄
private static class StopDiscardException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}
監控優化
不使用 KafkairrorMaker 的另一個重要原因是其 JMX 監控不友好:
- RMI 機制本身存在安全隱患
- JMX 監控定制化比較繁瑣(使用 jolokia 也無法解決這一問題)
一個比較好的方式是使用 SpringBoot2 的 micrometer 框架實現監控:
// 監控注冊表(底層可以接入不同的監控平台)
@Autowired
private MeterRegistry meterRegistry;
// 接入 Kafka 的監控信息
new KafkaClientMetrics(consumer).bindTo(meterRegistry);
new KafkaClientMetrics(producer).bindTo(meterRegistry);
// 接入自定義監控信息
Gauge.builder("bytesInQueue", messageBytes, AtomicLong::get)
.description("Estimated message bytes backlog in BlockingQueue")
.register(meterRegistry);
通過這一方式能夠最大程度地利用現有可視化監控工具,減少不必要地開發工作。
