1. 基本信息介紹
基於spring的kafka應用,非常簡單即可搭建起來,前提是要有一個kafka的broker集群。我在之前的博文里面已經介紹並搭建了一套broker環境,參考Kafka研究【一】:bring up環境。
另外,要注意的是kafka基於spring框架構建應用,需要注意版本信息,下面是官方要求:
Apache Kafka Clients 1.0.0
Spring Framework 5.0.x
Minimum Java version: 8
我這里要介紹的應用案例,是基於springboot構建的,所以,版本信息,可能不是嚴格按照上述的要求來的,但是整體還是滿足版本兼容要求。
2. 搭建基於springboot的kafka應用
2.1 首先在IDEA里面構建一個maven項目
配置好pom.xml,整個項目的pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.roomdis</groupId> <artifactId>kafka</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka</name> <description>kafka project with Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> <exclusion> <artifactId>log4j-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 添加log4j的依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
接下來,就是構建具體的消息生產者和消息消費者。這里,我們的topic是固定的,partition也是默認的1個,這里主要是介紹如何構建一個spring框架下的kafka應用,至於如何動態構建topic,下一個博文介紹深入內容。這里,介紹一個基本的消息發送和介紹流程,發送采用異步(async)的方式,接收消息的模塊,采用了應用層面控制消費確認,一般來說,生產級別的kafka應用,消息的消費確認都是會選擇應用層面控制確認邏輯,保障消息的安全處理,既不出現消息丟失,也不出現重復消費的問題。
2.2 工程配置
這里,我采用的是YAML格式的配置文件,這個也非常簡單,其實和properties的配置相比,還簡單明了。具體配置如下:
server: port: 8899 contextPath : /kafka spring: application: name: kafka kafka: bootstrapServers: 10.90.7.2:9092,10.90.2.101:9092,10.90.2.102:9092 consumer: groupId: kefu-logger enable-auto-commit: false keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer valueDserializer: org.apache.kafka.common.serialization.StringDeserializer producer: groupId: kefu-logger retries: 3 buffer-memory: 20480 keyDeserializer: org.apache.kafka.common.serialization.StringSerializer valueDserializer: org.apache.kafka.common.serialization.StringSerializer listener: ack-mode: MANUAL_IMMEDIATE
這里重點說下幾點:
A. 應用端口是8899,工程對外項目名稱是kafka,即URL里面的頭部是/kafka.
B. 另外,消息生產和消費的序列化工具都是指定的String的。
C. 消費者和生產者都在指定的組groupId為kefu-logger.注意,這里的groupId,其實是為了提高消息的消費能力做的特別處理,即同一個groupId的消費者,可以負載均衡的將partition組里面的消息消費掉。
D. 還有一點,很重要的就是監聽器的ackMode的配置,這里,指定為MANUAL_IMMEDIATE,意思就是手動立即確認,這個必須要求消費者配置enable-auto-commit為false,同時,消息消費的邏輯里面,要有相應的邏輯對消費的消息進行acknowledge操作,否則,下次消費者啟動后,將會再次消費這些offset對應的消息記錄,導致重復消費。
2.3 消息實例定義
這里,主要是考慮后續的日志集中接管處理,所以,DTO就是以日志消息維度定義的。主要有如下內容:
public class LogMessage { /* *服務類型,例如:IMS,BI等 */ private String serviceType; /* *服務器地址,IP:PORT,例如:10.130.207.221:8080 */ private String serverAddr; /* *日志產生的具體程序全路徑 */ private String fullClassPath; /* *消息產生的時間 */ private String messageTime; /* *消息的具體內容。這個很重要,是json的字符串。兼容不同服務的消息格式。 */ private String content; /* *日志的級別,主要有INFO,WARN,ERROR,DEBUG等 */ private String level; public String getServiceType() { return serviceType; } public void setServiceType(String serviceType) { this.serviceType = serviceType; } public String getServerAddr() { return serverAddr; } public void setServerAddr(String serverAddr) { this.serverAddr = serverAddr; } public String getFullClassPath() { return fullClassPath; } public void setFullClassPath(String fullClassPath) { this.fullClassPath = fullClassPath; } public String getMessageTime() { return messageTime; } public void setMessageTime(String messageTime) { this.messageTime = messageTime; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getLevel() { return level; } public void setLevel(String level) { this.level = level; } }
當然,這里的DTO里面,其實可以采用注解的方式實現setter和getter以及toString等基本函數的實現,為了方便說明問題,我這里就不要lomback注解包的功能。
2.4 消息生產者
這里重點關注消息的異步生產過程,即消息投遞到broker的過程是異步的,這個是非常有價值的,對於並發性提升。
@Service public class MessageProducer { private Logger logger = Logger.getLogger(MessageProducer.class); @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send(LogMessage logMessage) { String msg = gson.toJson(logMessage); //下面采取的是異步的方式完成消息的發送,發送成功或者失敗,都有回調函數進行后續邏輯處理,非常方便 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(Config.TOPIC, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> stringStringSendResult) { long offset = stringStringSendResult.getRecordMetadata().offset(); String cont = stringStringSendResult.getProducerRecord().toString(); logger.info("cont: " + cont + ", offset: " + offset); } @Override public void onFailure(Throwable throwable) { logger.error(throwable.getMessage()); } }); } }
2.5 消息消費者
下面的消費者邏輯中,OnMessage的入參中,必須要有Acknowledgment參數,否則沒有辦法完成MANUAL的所謂應用層面的消息消費確認。
@Service public class MessageConsumer { private Logger logger = Logger.getLogger(MessageConsumer.class); @KafkaListener(topics = Config.TOPIC) public void onMessage(@Payload String msg, Acknowledgment ack){ logger.info(msg); // long offset = record.offset(); // long partition = record.partition(); // String content = record.value(); // logger.info("offset: " + offset + ", partition: " + partition + ", content: " + content); ack.acknowledge(); } @KafkaListener(topics = Config.TOPIC) public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack){ logger.info(record); long offset = record.offset(); long partition = record.partition(); String content = record.value(); logger.info("offset: " + offset + ", partition: " + partition + ", payload: " + content); //手動確認消息已經被消費,這個很重要,靈活控制,保證消息消費確認的問題。 ack.acknowledge(); } }
3. 程序運行驗證
這里,主要是驗證消息消費后,執行了ack.acknowledge()和不執行ack.acknowledge()的區別,深刻理解不確認會導致重復消費的問題。
3.1 執行acknowledge
效果是程序啟動后offset的值會接着上次遞增,對應的消息內容payload也是不同的。這個就不給出日志內容了。
3.2 不執行acknowledge
為了對比,給出一段停應用前的日志:

. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v1.5.4.RELEASE) 2018-08-01 19:45:06.181 INFO 14264 --- [ main] c.roomdis.micros.kafka.KafkaApplication : Starting KafkaApplication on 60-361-0008 with PID 14264 (D:\Knowledge\SOURCE\springboot-kafka\target\classes started by chengsh05 in D:\Knowledge\SOURCE\springboot-kafka) 2018-08-01 19:45:06.184 INFO 14264 --- [ main] c.roomdis.micros.kafka.KafkaApplication : No active profile set, falling back to default profiles: default 2018-08-01 19:45:06.236 INFO 14264 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy 2018-08-01 19:45:07.194 INFO 14264 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2d472f92] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2018-08-01 19:45:07.655 INFO 14264 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat initialized with port(s): 8899 (http) 2018-08-01 19:45:07.672 INFO 14264 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2018-08-01 19:45:07.673 INFO 14264 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/8.5.15 2018-08-01 19:45:07.786 INFO 14264 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/kafka] : Initializing Spring embedded WebApplicationContext 2018-08-01 19:45:07.786 INFO 14264 --- [ost-startStop-1] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1552 ms 2018-08-01 19:45:07.942 INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean : Mapping servlet: 'dispatcherServlet' to [/] 2018-08-01 19:45:07.947 INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'characterEncodingFilter' to: [/*] 2018-08-01 19:45:07.947 INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'hiddenHttpMethodFilter' to: [/*] 2018-08-01 19:45:07.947 INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'httpPutFormContentFilter' to: [/*] 2018-08-01 19:45:07.947 INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'requestContextFilter' to: [/*] 2018-08-01 19:45:08.354 INFO 14264 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy 2018-08-01 19:45:08.419 INFO 14264 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error]}" onto public org.springframework.http.ResponseEntity<java.util.Map<java.lang.String, java.lang.Object>> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-08-01 19:45:08.420 INFO 14264 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error],produces=[text/html]}" onto public org.springframework.web.servlet.ModelAndView org.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-08-01 19:45:08.448 INFO 14264 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-08-01 19:45:08.448 INFO 14264 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-08-01 19:45:08.478 INFO 14264 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-08-01 19:45:08.623 INFO 14264 --- [ main] o.s.ui.freemarker.SpringTemplateLoader : SpringTemplateLoader for FreeMarker: using resource loader [org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy] and template loader path [classpath:/templates/] 2018-08-01 19:45:08.624 INFO 14264 --- [ main] o.s.w.s.v.f.FreeMarkerConfigurer : ClassTemplateLoader for Spring macros added to FreeMarker configuration 2018-08-01 19:45:08.644 WARN 14264 --- [ main] o.s.b.a.f.FreeMarkerAutoConfiguration : Cannot find template location(s): [classpath:/templates/] (please add some templates, check your FreeMarker configuration, or set spring.freemarker.checkTemplateLocation=false) 2018-08-01 19:45:08.717 INFO 14264 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2018-08-01 19:45:08.734 INFO 14264 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 2018-08-01 19:45:08.748 INFO 14264 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kefu-logger heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2018-08-01 19:45:08.751 INFO 14264 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092] check.crcs = true client.id = consumer-1 connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kefu-logger heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2018-08-01 19:45:08.796 INFO 14264 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1 2018-08-01 19:45:08.797 INFO 14264 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247 2018-08-01 19:45:08.841 INFO 14264 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8899 (http) 2018-08-01 19:45:08.848 INFO 14264 --- [ main] c.roomdis.micros.kafka.KafkaApplication : Started KafkaApplication in 3.079 seconds (JVM running for 3.484) 2018-08-01 19:45:08.859 INFO 14264 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 block.on.buffer.full = false bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092] buffer.memory = 20480 client.id = compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 3 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2018-08-01 19:45:08.859 INFO 14264 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 block.on.buffer.full = false bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092] buffer.memory = 20480 client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 3 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2018-08-01 19:45:08.873 INFO 14264 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1 2018-08-01 19:45:08.873 INFO 14264 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247 2018-08-01 19:45:08.932 INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator 10.90.2.102:9092 (id: 2147483644 rack: null) for group kefu-logger. 2018-08-01 19:45:08.936 INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group kefu-logger 2018-08-01 19:45:08.936 INFO 14264 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 2018-08-01 19:45:08.936 INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group kefu-logger 2018-08-01 19:45:08.947 INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group kefu-logger with generation 7 2018-08-01 19:45:08.948 INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [kefuLogger-0] for group kefu-logger 2018-08-01 19:45:08.958 INFO 14264 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[kefuLogger-0] 2018-08-01 19:45:09.085 INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, timestamp=null), offset: 112 2018-08-01 19:45:09.092 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 112, CreateTime = 1533123909071, checksum = 2908956415, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}) 2018-08-01 19:45:09.093 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 112, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"} 2018-08-01 19:45:11.080 INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"}, timestamp=null), offset: 113 2018-08-01 19:45:11.081 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 113, CreateTime = 1533123911078, checksum = 843723551, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"}) 2018-08-01 19:45:11.081 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 113, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"} 2018-08-01 19:45:13.082 INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"}, timestamp=null), offset: 114 2018-08-01 19:45:13.083 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 114, CreateTime = 1533123913080, checksum = 2420940286, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"}) 2018-08-01 19:45:13.083 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 114, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"} 2018-08-01 19:45:15.084 INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"}, timestamp=null), offset: 115 2018-08-01 19:45:15.084 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 115, CreateTime = 1533123915082, checksum = 2206983395, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"}) 2018-08-01 19:45:15.084 INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 115, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"} Process finished with exit code 1
停應用后,再次啟動的日志:
2018-08-01 19:45:57.562 INFO 8632 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[kefuLogger-0] 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 112, CreateTime = 1533123909071, checksum = 2908956415, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}) 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 112, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"} 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 113, CreateTime = 1533123911078, checksum = 843723551, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"}) 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 113, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"} 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 114, CreateTime = 1533123913080, checksum = 2420940286, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"}) 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 114, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"} 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 115, CreateTime = 1533123915082, checksum = 2206983395, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"}) 2018-08-01 19:45:57.580 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 115, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"} 2018-08-01 19:45:57.738 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 116, CreateTime = 1533123957726, checksum = 2375523911, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:57 CST 2018","content":"b22aa35d-2bff-4e9e-9832-56145415b075:Wed Aug 01 19:45:57 CST 2018"}) 2018-08-01 19:45:57.738 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 116, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:57 CST 2018","content":"b22aa35d-2bff-4e9e-9832-56145415b075:Wed Aug 01 19:45:57 CST 2018"} 2018-08-01 19:45:57.738 INFO 8632 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:57 CST 2018","content":"b22aa35d-2bff-4e9e-9832-56145415b075:Wed Aug 01 19:45:57 CST 2018"}, timestamp=null), offset: 116 2018-08-01 19:45:59.735 INFO 8632 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:59 CST 2018","content":"b9a9148b-d0d6-49c5-ac2a-8cfac03dad90:Wed Aug 01 19:45:59 CST 2018"}, timestamp=null), offset: 117 2018-08-01 19:45:59.736 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 117, CreateTime = 1533123959733, checksum = 2508549365, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:59 CST 2018","content":"b9a9148b-d0d6-49c5-ac2a-8cfac03dad90:Wed Aug 01 19:45:59 CST 2018"}) 2018-08-01 19:45:59.736 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 117, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:59 CST 2018","content":"b9a9148b-d0d6-49c5-ac2a-8cfac03dad90:Wed Aug 01 19:45:59 CST 2018"} 2018-08-01 19:46:01.736 INFO 8632 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:46:01 CST 2018","content":"3cbd6443-9617-4ac3-8985-f0b494187f0a:Wed Aug 01 19:46:01 CST 2018"}, timestamp=null), offset: 118 2018-08-01 19:46:01.736 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 118, CreateTime = 1533123961734, checksum = 3825449208, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:46:01 CST 2018","content":"3cbd6443-9617-4ac3-8985-f0b494187f0a:Wed Aug 01 19:46:01 CST 2018"}) 2018-08-01 19:46:01.736 INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer : offset: 118, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:46:01 CST 2018","content":"3cbd6443-9617-4ac3-8985-f0b494187f0a:Wed Aug 01 19:46:01 CST 2018"} Process finished with exit code 1
上述紅色部分,明顯是在應用重啟之前就已經顯示消費國的內容,也就是說,enable-auto-commit為false的時候,acknowledge必須應用程序執行確認,否則出現了重復消費。
4. 遇到問題
主要是實現應用層面進行消費確認過程中,遇到的,這里,要注意一點,就是enable-auto-commit設置為true是默認行為,為了應用層面控制確認消費,必須將enable-auto-commit設置為false,同時,ack-mode必須設置為MANUAL或者MANUAL-IMMEDIATE。兩個若沒有配合,消費者端就會報錯。例如,我這里,當初值配置了enable-auto-commit為false,最后ack-mode沒有配置,就出現下面的錯誤:
2018-08-01 19:49:49.469 INFO 19828 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247 2018-08-01 19:49:49.541 INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator 10.90.2.102:9092 (id: 2147483644 rack: null) for group kefu-logger. 2018-08-01 19:49:49.543 INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group kefu-logger 2018-08-01 19:49:49.544 INFO 19828 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 2018-08-01 19:49:49.544 INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group kefu-logger 2018-08-01 19:49:49.557 INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group kefu-logger with generation 11 2018-08-01 19:49:49.558 INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [kefuLogger-0] for group kefu-logger 2018-08-01 19:49:49.566 INFO 19828 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[kefuLogger-0] 2018-08-01 19:49:49.587 ERROR 19828 --- [ntainer#0-0-L-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = kefuLogger, partition = 0, offset = 112, CreateTime = 1533123909071, checksum = 2908956415, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}) org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.roomdis.micros.kafka.consumer.MessageConsumer.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)] Bean [com.roomdis.micros.kafka.consumer.MessageConsumer@27068a50]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}], failedMessage=GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:178) ~[spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_77] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}], failedMessage=GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}] ... 10 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.6.RELEASE.jar:na] ... 9 common frames omitted 2018-08-01 19:49:49.592 ERROR 19828 --- [ntainer#0-0-L-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = kefuLogger, partition = 0, offset = 113, CreateTime = 1533123911078, checksum = 843723551, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"})
補充說明ack-mode配置相關信息:
官方說法,管enable-auto-commit為false的時候ackMode取值解釋:
RECORD - commit the offset when the listener returns after processing the record.
BATCH - commit the offset when all the records returned by the poll() have been processed.
TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
下面是具體的配置操作,配合ackMode的取值,相關的參數設置:
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation. spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.type=single # Listener type.
最后,任何一個新的技術應用到實際生產,都必須弄清楚每一個關鍵環節,否則風險或者災難的產生只是遲早的事情。