1.生產者的基本實現
1.1 引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!--版本號根據kafka安裝包版本指定,比如kafka_2.12-2.0.0-->
<version>2.0.0</version>
</dependency>
1.2 具體代碼實現
同步發送消息
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//1. 設置參數
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092");
//把發送的key從字符串序列化為字符數組
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把發送消息的value從字符串序列化為字節數組
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//發送消息的客戶端
Producer<String, String> producer = new KafkaProducer<String, String>(props);
User user = new User();
user.setErpId(500000L);
user.setErpName("張三三");
user.setRealName("張三三");
user.setCreateTime(new Date());
user.setUpdateTime(new Date());
//指定發送分區
//key: 作用是決定了往哪個分區上發送消息,value表示具體要發送的消息內容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,
user.getErpId().toString(), JSON.toJSONString(user));
//發送消息,得到消息發送的元數據並輸出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發送消息結果:" + "topic-" +
metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
}
}
在同步發送消息的場景下,如果生產者發送消息三次沒有收到ack,生產者會阻塞,阻塞到3s的時間,如果還沒有收到消息,會進行重試。重試的次數為3次。
異步發送消息
//異步發送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e!=null){
System.out.println("發送消息失敗:"+e.getMessage());
}
if(metadata!=null){
System.out.println("異步方式發送消息結果:" + "topic-" +
metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
// 因為異步提交后主線程可能已經停止,沒有拿到onCompletion的回調,因此可讓主線程阻塞一段時間看到效果
Thread.sleep(100000L);
1.3 生產者中ack的配置
在同步發送的前提下,生產者在獲得集群返回的ack之前會一直阻塞。集群中ack共有三個配置:
- ack = 0, kafka-cluster不需要任何broker收到消息,就立即返回ack給生產者,同時,這也是最容易丟消息的,但效率也是最高的。
- ack = 1(default), 多副本之間的leader已經收到消息,並且把消息寫入到本地log中,才會返回ack給生產者,性能和安全性也是最均衡的。
- ack = -1 /all。里面有默認的配置min.insync.replicas=2(默認為1,推薦配置大於等於2), 此時就需要leader和一個follower同步完后,才會返回ack給生產者(此時集群中有2個 broker已完成數據的接收),這種方式最安全,但性能最差。
對應於ack和重試(如果沒有收到ack,就開始重試)的配置
//ack和重試
props.put(ProducerConfig.ACKS_CONFIG,"1");
/**
* 發送失敗會重試,默認重試間隔100ms,重試能保證消息發送的可靠性,但是也可能造成消息重復發送,
* 比如網絡抖動,所以需要在接收者那邊做好消息接收的冪等性處理
*/
props.put(ProducerConfig.RETRIES_CONFIG,3);
//重試間隔設置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
1.4 生產者發送消息的緩沖區配置
- kafka默認會創建一個消息緩沖區,用來存放要發送的消息,緩沖區大小為32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- kafka本地線程會去緩沖區中一次拉取16k的數據,發送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果線程拉不到16k的數據,間隔10ms也會將已拉到的數據發送給broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
2. 消費者的實現
2.1 消費者的基本實現
private static final String TOPIC_NAME = "my-replicated-topic";
private static final String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092");
//消費分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//1.創建一個消費者的客戶端信息
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
//2. 消費者訂閱主題列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
//3. poll() API是拉取消息的長輪詢
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records){
//4. 打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
2.2 消費者自動提交和手動提交offset
消費者無論是自動提交還是手動提交,都需要把所屬的消費組+消費的某個主題+消費的某個分區及消費的偏移量,這樣的信息提交到集群的_consumer_offsets主題里面
自動提交
消費者poll消息下來以后就會自動提交offset
//是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
自動提交會丟消息。因為消費者在消費前提交offset,有可能提交完后還沒消費時消費者掛了。
手動提交
需要把自動提交的配置改為false
//是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
手動同步提交:在消費完消息后調用同步提交的方法,當集群返回ack前一直阻塞,返回ack后表示提交完成,執行之后的邏輯
while (true){
//3. poll() API是拉取消息的長輪詢
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records){
//4. 打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息都已消費完
if(records.count()>0){
//手動同步提交offset,當前線程會阻塞直到offset提交成功
consumer.commitSync();
}
}
手動異步提交:在消息消費完后提交,不需要等待集群ack,直接執行之后的邏輯,可以設置一個毀掉方法,供集群調用
while (true){
//3. poll() API是拉取消息的長輪詢
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records){
//4. 打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息都已消費完
if(records.count()>0){
//手動異步提交offset,當前線程提交offset不會阻塞,可以繼續執行后面的程序邏輯
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " +
exception.getStackTrace());
}
}
});
}
}
2.3 長輪詢poll消息
-
默認情況下,消費者一次會poll 500條消息
// 一次poll最大拉取消息的條數,可以根據消費速度的快慢來設置 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,500);
2.4 消費者健康狀態檢查
消費者每隔1s向kafka集群發送心跳,集群發現如果有超過10s沒有續約的消費者,將被提出消費組,觸發該消費組的rebalance機制,將該分區交給消費組里的其他消費者進行消費。
//consumer給broker發送心跳的間隔時間
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
//kafka如果超過10s沒有收到消費者的心跳,則會把消費者提出消費組,進行reBalance,把分區分配給其他消費者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);
2.5 指定分區、偏移量和實際那消費
- 指定分區消費
//指定分區消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
- 消息回溯消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
收到消息:partition = 3,offset = 0, key = 500000, value = {"createTime":1636702968492,"erpId":500000,"erpName":"張三三","realName":"張三三","updateTime":1636702968492}
收到消息:partition = 3,offset = 1, key = 500005, value = {"createTime":1636957199355,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957199355}
收到消息:partition = 3,offset = 2, key = 500005, value = {"createTime":1636957211271,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957211271}
收到消息:partition = 3,offset = 3, key = 500005, value = {"createTime":1636958064780,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636958064780}
- 指定offset消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
收到消息:partition = 3,offset = 2, key = 500005, value = {"createTime":1636957211271,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957211271}
收到消息:partition = 3,offset = 3, key = 500005, value = {"createTime":1636958064780,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636958064780}
- 指定時間去消費
根據時間,去所有的Partition中確定該時間對應的offset,然后去所有的partition中找到該offset之后的消息開始消費
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//從1小時前開始消費
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +
"|offset-" + offset);
System.out.println();
//根據消費里的timestamp確定offset if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
2.6 新消費組的消費offset規則
新消費組的消費者在啟動后,默認會從當前分區的最后一條消息offset+1開始消費(消費新消息)。可以通過以下設置,讓新的消費者第一次從頭開始消費,之后開始消費新消息(最后消費的位置的偏移量+1)
-
latest: 默認的,消費新消息
-
earliest: 第一次從頭開始消費,之后開始消費新消息(最后消費的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
收到消息:partition = 4,offset = 0, key = 500004, value = {"createTime":1636948165301,"erpId":500004,"erpName":"張三三","realName":"張三三","updateTime":1636948165301}
收到消息:partition = 1,offset = 0, key = 500001, value = {"createTime":1636703013717,"erpId":500001,"erpName":"李絲絲","realName":"李絲絲","updateTime":1636703013717}
收到消息:partition = 3,offset = 0, key = 500000, value = {"createTime":1636702968492,"erpId":500000,"erpName":"張三三","realName":"張三三","updateTime":1636702968492}
收到消息:partition = 3,offset = 1, key = 500005, value = {"createTime":1636957199355,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957199355}
收到消息:partition = 3,offset = 2, key = 500005, value = {"createTime":1636957211271,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957211271}
收到消息:partition = 3,offset = 3, key = 500005, value = {"createTime":1636958064780,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636958064780}
3. SpringBoot整合Kafka
3.1 引入spring-kafka依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.2 application.yml中的配置
spring:
kafka:
bootstrap-servers: 192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
# 生產者
producer:
# 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 手動調用Acknowledgment.acknowledge()后立即提交,一般使用這種
ack-mode: manual_immediate
3.3 編寫消息生產者
private final static String TOPIC_NAME= "my-replicated-topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping(value ="/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,"key","this is a test kafka message");
return "send message success";
}
3. 4 編寫消息消費者
@KafkaListener(topics = "my-replicated-topic")
public void listenGroup(ConsumerRecord<String, String> record,
Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手動提交offset
ack.acknowledge();
}
3.5 消費者中配置消費主題、分區和偏移量
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",
initialOffset = "100"))
},concurrency = "3")//concurrency就是同組下的消費者個數,就是並發消費數,建 議小於等於分區總數
public void listenGroupPro(ConsumerRecord<String, String> record,
Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手動提交offset
ack.acknowledge();
}