spring-kafka
使用spring-kafka的小伙伴,看過來。
說明
因為spring-kafka
封裝的比較厲害,可能跟你實際使用起來有很大的差別。
一個簡單的消費例子
在spring-boot
基礎上添加依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
注意要檢查下依賴的kakfa-clients
,是否與你服務端的匹配。
不要忘記了通過注解@EnableKafka
開啟自動配置。
采用默認的配置:
spring:
kafka:
consumer:
bootstrap-servers:
- 127.0.0.1:9092
# 消費組
group-id: myGroup
# 消費者是否自動提交偏移量,默認為true
enable-auto-commit: false
# 消費者在讀取一個沒有偏移量或者偏移量無效的情況下,從起始位置讀取partition的記錄,默認是latest
auto-offset-reset: earliest
# 單次調用poll方法能夠返回的消息數量
max-poll-records: 50
然后寫個測試用例試試:
package tk.fishfish.easyjava.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 消費者
*
* @author 奔波兒灞
* @since 1.0
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ConsumerTest {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerTest.class);
@KafkaListener(topics = "test")
public void onMessage(ConsumerRecord<String, String> record) {
LOG.info("record: {}", record);
String value = record.value();
if (value.length() % 2 == 0) {
throw new RuntimeException("模擬業務出錯");
}
}
@Test
public void run() {
try {
// 阻塞5分鍾,方便調試
Thread.sleep(5 * 60 * 1000);
} catch (InterruptedException e) {
LOG.warn("sleep error", e);
}
}
}
上面通過@KafkaListener
來監聽topic
,處理消息。
為了模擬業務會出現一些異常,我特意在判斷value長度為偶數的情況下拋出異常,看在默認配置的情況下,如果業務出錯,是否仍會提交offsets
。
結果發現,仍提交了offsets。
下面是日志:
INFO 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] tk.fishfish.easyjava.kafka.ConsumerTest - record: ConsumerRecord(topic = test, partition = 0, offset = 261, CreateTime = 1558059947687, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
ERROR 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = test, partition = 0, offset = 261, CreateTime = 1558059947687, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void tk.fishfish.easyjava.kafka.ConsumerTest.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException: 模擬業務出錯
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 模擬業務出錯
at tk.fishfish.easyjava.kafka.ConsumerTest.onMessage(ConsumerTest.java:29)
at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
... 13 common frames omitted
DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.a.RecordMessagingMessageListenerAdapter - Processing [GenericMessage [payload=d, headers={kafka_offset=262, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@f46d581, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test, kafka_receivedTimestamp=1558059947851}]]
INFO 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] tk.fishfish.easyjava.kafka.ConsumerTest - record: ConsumerRecord(topic = test, partition = 0, offset = 262, CreateTime = 1558059947851, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {test-0=OffsetAndMetadata{offset=263, metadata=''}}
DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {test-0=OffsetAndMetadata{offset=263, metadata=''}}
DEBUG 2019-05-17 10:25:48.560 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=myGroup] Committed offset 263 for partition test-0
從日志可以看到offset = 261
的記錄處理失敗了,但最后仍提交了Committed offset 263 for partition test-0
。
總結:如果你采取我這樣的配置,當處理record
出錯的時候,仍會提交偏移量。那么我們就需要業務處理失敗的情況了。比如try...catch
之后保存錯誤的record
,然后定時重試。
出錯的情況下不提交offsets
那么,能不能在出錯的情況下不提交咧?
通過查看文檔發現,發現可以使用Acknowledgment
去確認該條record
是否提交。
修改下配置,配置spring.kafka.listener.*
:
spring:
kafka:
consumer:
bootstrap-servers:
- 127.0.0.1:9092
group-id: myGroup
# 消費者是否自動提交偏移量,默認為true
enable-auto-commit: false
# 消費者在讀取一個沒有偏移量或者偏移量無效的情況下,從起始位置讀取partition的記錄,默認是latest
auto-offset-reset: earliest
# 單次調用poll方法能夠返回的消息數量
max-poll-records: 50
listener:
# Listener AckMode
ack-mode: MANUAL_IMMEDIATE
# 並發消費者
concurrency: 1
然后代碼有些調整(就不貼全了),使用Acknowledgment
:
@KafkaListener(topics = "test")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
LOG.info("record: {}", record);
String value = record.value();
if (value.length() % 2 == 0) {
throw new RuntimeException("模擬業務出錯");
}
// 業務處理成功確認
ack.acknowledge();
}
如果在業務出錯的情況下,不會提交offsets
,然而真的是這樣的嗎?
測試發現,在業務出錯的情況下,確實不會提交offsets
,但是只要后面的記錄處理成功,就會提交offsets
,這樣前面的失敗的數據還是需要自己去手動處理。要么重新獲取該offset
的數據,要么記錄錯誤record
,業務重試。
總結
在錯誤的情況下,建議自己業務記錄后,去重試;或者使用spring-kafka
的ErrorHandler
,處理錯誤的情況。