記錄下和kafka相關的生產者和消費者,文中很多理解參考文末博文、書籍還有某前輩。
生產者
包含原理和代碼兩部分。
原理相關
如果要寫生產者代碼,需要先了解其發送的大致過程,即消息經歷了什么才存儲到對應topic的對應分區,大致如下。
(1)執行生產者代碼后,會在JVM中啟動一個producer,它會將數據發送到指定的topic。
(2)message不會直接就發送出去,會首先封裝成ProducerRecord,構造ProducerRecord實例對象時,可以傳入topic、key、value等。當需要指定消息發送到哪個分區,就需要傳入key。value里是消息內容,一般是json格式。
(3)消息還需要序列化,因為涉及到數據的磁盤落地,然后又重新從磁盤讀取數據,因此需要使用序列化(生產者)和反序列化(消費者)。
(4)序列化后的數據,還會經過分區器,這里可以指定自定義分區器,如果不指定就是默認分區器。分區器決定數據將存在topic哪個分區,那如何知道這個topic有幾個分區?知道了又如何確定哪個分區就是leader分區,就算知道leader分區,又如何判斷屬於哪個broker呢?這一切都需要通過獲取broker上的元數據來得到答案。
在0.8版本,這些元數據是存在zookeeper中的,這樣設計是有弊端的,zookeeper本來不是為高並發設計的,如果大量訪問涌入zookeeper獲取元數據,可能會出問題。在0.10.x之后,這些原數據通過存在某個broker的controller,將從zookeeper獲取的元數據都分發到各個broker一份,因此從其中一個broker獲取到的數據就是元數據,這樣各個broker分攤了zookeeper的壓力,將以前從zookeeper獲取元數據,分到多個broker去提供了。
(5)接下來數據還不會直接發送出去,會先存入到一個默認是32M大小的內存緩沖區。
(6)緩沖區的數據,會先填入一個又一個的batch,默認一個batch是16K,這個也是可以設置batch.size修改的,需要根據實際情況來配置。batch大小達到指定大小就會發送出去,如果大小沒達到16K,還有一個時間限定,可以通過linger.ms來設置,當達到指定的時間不管batch有沒有達到指定大小都會發送出去。
producer會有一個專門的sender線程,將滿足條件的batch一起發送過去,這樣可以將多條消息批量的發送,比一條條的發送更加的節省資源,不用頻繁的創建和銷毀連接,在0.8版本,是沒有batch這個東西的,來一條就發送一條(有改進的空間,仿造批量發送可以提高性能,來自某前輩的經驗)。
(7)消息通過sender發送給leader分區,需要經過三層網絡架構,然后先寫入到broker的os cache里,然后再落地到本地磁盤,落地到磁盤是采用順序寫的方式,一般不會直接寫入到磁盤,這樣會影響性能(datanode寫入數據是直接寫入到磁盤的,如果也先寫入到os cache,會提高整體性能)。

代碼相關
有了上面的原理,生產者的代碼部分相對就好理解了,涉及到性能的優化,也會在代碼中實現,具體參考代碼注釋。
package com.boe.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 定義一個生產者,將消息發送出去
*/
public class MyProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//step1 配置參數,這些跟優化kafka性能有關系
Properties props=new Properties();
//1 連接broker
props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
//2 key和value序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3 acks
// -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功
// 0 代表消息只要發送出去就行,其他不管
// 1 代表發送消息到leader partition寫入成功就可以
props.put("acks","-1");
//4 重試次數
props.put("retries",3);//大部分問題,設置這個就可以解決,生產環境可以設置多些 5-10次
// 5 隔多久重試一次
props.put("retry.backoff.ms",2000);
//6 如果要提升kafka的吞吐量,可以指定壓縮類型,如lz4
props.put("compression.type","none");
//7 緩沖區大小,默認是32M
props.put("buffer.size",33554432);
//8 一個批次batch的大小,默認是16k,需要根據一條消息的大小去調整
props.put("batch.size",323840);//設置為32k
//9 如果一個batch沒滿,達到如下的時間也會發送出去
props.put("linger.ms",200);
//10 一條消息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯
props.put("max.request.size",1048576);
//11 一條消息發送出去后,多久還沒收到響應,就認為是超時
props.put("request.timeout.ms",5000);
//step2 創建生產者對象
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
//step3 使用消息的封裝形式,注意value一般是json格式
ProducerRecord<String,String> record=new ProducerRecord<String,String>("topicA","{'name':'clyang','age':'34','salary':'8848'}");
//ProducerRecord<String,String> record=new ProducerRecord<String,String>("topicA","I am sorry");
//step4 調用生產者對象的send方法發送消息,有異步和同步兩種選擇
//1 異步發送,一般使用異步,發送后會執行一個回調函數
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//判斷是否有異常
if(exception==null){
System.out.println("消息發送到分區"+metadata.partition()+"成功");
}else{
System.out.println("消息發送失敗");
//TODO 可以寫入到redis,或mysql
}
}
});
Thread.sleep(10*1000);
//2 同步發送,需要等待一條消息發送完成,才能發送下一條消息
//RecordMetadata recordMetadata = producer.send(record).get();
//System.out.println("發送到的分區是:"+recordMetadata.partition());
//step5 關閉連接
producer.close();
}
}
執行后,控制台顯示發送消息成功,並打印出發送到了哪個分區。

消費者
包含原理和代碼兩部分。
原理相關
消費者消費數據,需要反序列化數據,且采用了零拷貝的技術,由於消費者和broker都在同一個操作系統下,一般都是linux,不涉及到linux到windows這種跨平台的數據讀取,因此數據反序列化后讀取到了os cache,然后發送到網關就直接被消費者消費,如下圖。如果數據反序列化到os cache(理解為數據的內核態),再拷貝一次到用戶態(這個狀態的數據可以跨系統平台)再消費,在同一平台下這會是一次多余的拷貝,kafka中省略了這個動作,這大大提高了消費者讀取數據的速度。
消費者消費某個leader分區的數據,會從消費者offset的下一個位置開始消費,如圖所示上一次消費到了offset 7的位置,下一次消費就從offset 8的位置開始消費。在zookeeper 0.8版本前,消費者的offset都保存在zookeeper中的,后面考慮到多個消費者要和zookeeper通信獲取offset會增加zookeeper的壓力,從1.0.x開始,這些消費者的offset改保存到了__consumer_offset這個主題里,而它分布在多個broker,將壓力就分攤了。
注意消費者能消費到的數據offset,需要小於這個分區的HW(高水印值),比如下圖這個分區的HW是9,則offset 10開始的數據就不可以消費,后面將整理HW和LEO相關的知識。

代碼相關
有了上面的原理,消費者的代碼部分相對就好理解了,涉及到性能的優化,也會在代碼中實現,具體參考代碼注釋。但是一般消費者是storm、spark streaming或者flink,又是另外的寫法了。
package com.boe.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.*;
/**
* 自定義一個消費者,從指定的topic消費數據
*/
public class MyConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//step1 配置消費者參數,也跟kafka性能有關
Properties props=new Properties();
//1 連接broker
props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
//2 指定key和value的反序列化
//還需要指定消費組id,否則報錯
props.put("group.id","clyang");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//3 消費者給coordinator發送心跳的時間間隔
props.put("heartbeat.interval.ms",1000);
//4 coordinator認為多久沒接受到心跳,就認為超時
props.put("session.timout.ms",10*1000);
//5 隔多久執行一次poll
props.put("max.poll.interval.ms",10*1000);
//6 一次poll返回多少條record,默認是500條
props.put("max.poll.records",1000);
//7 不要回收socket連接
//consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,
//但是下次消費就要重新建立socket連接,這個建議設置為-1,不要去回收連接
props.put("connection.max.idle.ms",-1);
//8 設置自動提交offset
props.put("enable.auto.commit","true");//注意kafka版本,1.0.x是這么寫
//9 多久自動提交offset
props.put("auto.commit.interval.ms",1000);
//10 設置consumer重啟后,從分區最新的offset讀取
//latest:如果分區下有提交的offset,從這個offset開始讀取,否則從最新的數據開始讀取
//earliest:如果分區下有提交的offset,從這個offset開始讀取,否則從頭開始讀取
//none:如果分區下有提交的offset,從這個offset開始讀取,只要有一個分區沒有提交的offset,就報錯
props.put("auto.offset.reset","latest");
//step2 創建一個消費者對象
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
//step3 訂閱主題
consumer.subscribe(Arrays.asList("topicA"));
//創建線程池,小池子大隊列,只有核心線程,沒有臨時線程,工作隊列是個阻塞式隊列
ExecutorService threadPool= Executors.newFixedThreadPool(5);
//step4 不斷消費數據,並對數據進行處理
try {
while(true){
//超時時間是3s
//新版本的kafka,這個poll方法將干很多事情
//如監聽這個消費者跟多個topic的分區所在broker的通信,如有新的數據就會拉取過來,緩存數據、內存里更新offset
ConsumerRecords<String, String> consumerRecords = consumer.poll(3000);
for(ConsumerRecord<String, String> record:consumerRecords){
//1 寫法1
//如果value是json格式,將其轉換成JSON對象
//JSONObject json=JSONObject.parseObject(record.value());
//System.out.println("消費的消息是"+json.toJSONString()+",name為:"+json.getString("name"));
//2 寫法2 可以放到線程池去消費
//實現Runnable接口
threadPool.submit(new ConsumerTask(record));
}
}
}catch (Exception e) {
e.printStackTrace();
System.out.println("消費消息失敗");
consumer.close();
}
}
}
/**
* 如果實現Runnable接口,出現異常,需要在run方法進行捕獲
*/
class ConsumerTask implements Runnable{
private ConsumerRecord<String, String> record;
public ConsumerTask(ConsumerRecord<String, String> record) {
this.record = record;
}
@Override
public void run() {
JSONObject json=JSONObject.parseObject(record.value());
System.out.println("消費的消息是"+json.toJSONString()+",消息的分區為:"+record.partition()+",消息的offse為:"+record.offset());
}
}
執行后,控制台顯示消費成功,並且從消息的offset變化可以看出,每生產一條數據,同一個分區的消息,其offset都會加1。

分區器
kafka中也可以自定義分區器,根據key的不同,實現數據寫入到指定分區的效果,下面簡單的實現一個,實現以下效果。
-
key為"china",發給0號分區
-
key為"usa",發給1號分區
-
key為"korea",發給2號分區
以下是代碼部分,類似MapReduce的自定義分區器,它需要實現一個kafka提供的接口Partitioner,實現里面的partition方法 。
package com.boe.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
/**
* 自定義分區器
*/
public class MyPartitioner implements Partitioner {
//初始化值
int partitionNum;
/**
* 主要重寫這個方法,假設有topic country三個分區,producer將key為china、usa和korea的消息分開存儲到不同的分區,否則都放到0號分區
* @param topic 要使用自定義分區的topic
* @param key 消息key
* @param keyBytes 消息key序列化字節數組
* @param value 消息value
* @param valueBytes 消息value序列化字節數組
* @param cluster 集群元信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String keyStr=(String) key;
//獲取分區信息
List<PartitionInfo> partitionInfoList=cluster.availablePartitionsForTopic("country");
int partitionInfoListSize=partitionInfoList.size();
//判斷是否有三個分區
if(partitionInfoListSize==3){
switch (keyStr){
case "china":
partitionNum=0;
break;
case "usa":
partitionNum=1;
break;
case "korea":
partitionNum=2;
break;
default:
partitionNum=0;
break;
}
}
//返回分區序號
return partitionNum;
}
@Override
public void close() {
//資源的清理工作在這里執行
System.out.println("-----分區結束-----");
}
@Override
public void configure(Map<String, ?> configs) {
//資源的初始化工作在這里執行
partitionNum=0;
}
}
實現了自定義分區器,需要在上面生產者producer的代碼中,添加分區器到props文件中,才能生效!
package com.boe.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 定義一個生產者,將消息發送出去
*/
public class MyProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//step1 配置參數,這些跟優化kafka性能有關系
Properties props=new Properties();
//1 連接broker
props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
//2 key和value序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3 acks
// -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功
// 0 代表消息只要發送出去就行,其他不管
// 1 代表發送消息到leader partition寫入成功就可以
props.put("acks","-1");
//4 重試次數
props.put("retries",3);//大部分問題,設置這個就可以解決,生產環境可以設置多些 5-10次
// 5 隔多久重試一次
props.put("retry.backoff.ms",2000);
//6 如果要提升kafka的吞吐量,可以指定壓縮類型
props.put("compression.type","none");
//7 緩沖區大小,默認是32M
props.put("buffer.size",33554432);
//8 一個批次batch的大小,默認是16k,需要根據一條消息的大小去調整
props.put("batch.size",323840);//設置為32k
//9 如果一個batch沒滿,達到如下的時間也會發送出去
props.put("linger.ms",200);
//10 一條消息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯
props.put("max.request.size",1048576);
//11 一條消息發送出去后,多久還沒收到響應,就認為是超時
props.put("request.timeout.ms",5000);
//12 使用自定義分區器
props.put("partitioner.class","com.boe.partitioner.MyPartitioner");
//step2 創建生產者對象
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
//step3 使用消息的封裝形式
//自定義分區測試用的,可以看到自定了key,以下每條消息發送兩次
//ProducerRecord<String,String> record=new ProducerRecord<String,String>("country","china","{'name':'china','population','14'}");
//ProducerRecord<String,String> record=new ProducerRecord<String,String>("country","usa","{'name':'usa','population','3'}");
//ProducerRecord<String,String> record=new ProducerRecord<String,String>("country","korea","{'name':'korea','population','1'}");
//step4 調用生產者對象的send方法發送消息,有異步和同步兩種選擇
//1 異步發送,一般使用異步
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception==null){
System.out.println("消息發送到分區"+metadata.partition()+"成功");
}else{
System.out.println("消息發送失敗");
//TODO 寫入到redis
}
}
});
Thread.sleep(10*1000);
//2 同步發送,需要等待一條消息發送完成,才能發送下一條消息
//RecordMetadata recordMetadata = producer.send(record).get();
//System.out.println("發送到的分區是:"+recordMetadata.partition());
//step5 關閉連接
producer.close();
}
}
為了驗證分區器的效果,先創建一個測試的topic。
# 三個分區,三個replica,topic名為country
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 3 --partitions 3 --topic country
Created topic "country".
然后上面生產者代碼執行發送消息,每發送一條使用kafka shell查看一次結果,發現數據都發送到了指定的分區。最后每個分區,都是2條消息,實現分區的效果。
# key="china"->分區0 key="usa"->分區1 key="korea"->分區2
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092 --topic country
country:2:2
country:1:2
country:0:2
以上,理解不一定正確,但學習就是一個不斷了解和糾錯的過程。
參考博文:
(1)《Apache Kafka實戰》
(2)http://kafka.apache.org/documentation.html#producerconfigs 生產者配置說明
(3)http://kafka.apache.org/documentation.html#consumerconfigs 消費者配置說明
(4)https://www.cnblogs.com/youngchaolin/p/12535704.html controller獲取元數據
