SpringBoot整合kafka配置多個kafka地址參考:https://www.cnblogs.com/pxblog/p/17662170.html
引入依賴 (可以不指定版本)
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.1</version> </dependency>
如果啟動報錯
Caused by: java.lang.NoClassDefFoundError: org/springframework/core/log/LogAccessor
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.<init>(KafkaListenerAnnotationBeanPostProcessor.java:148)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:172)
... 19 common frames omitted
或者
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception during bean creation; nested exception is java.lang.TypeNotPresentException: Type org.springframework.kafka.listener.CommonErrorHandler not present
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:555) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:953) ~[spring-beans-5.3.15.jar:5.3.15]
就把指定版本去掉
配置文件yml
修改kafka連接地址 其他按需修改
#kafka的topic名稱
kafkaTopic: topic-test
spring:
kafka:
bootstrap-servers: 192.168.1.12:9092 #kafka連接地址
producer:
acks: 1 #應答級別:多少個分區副本備份完成時向生產者發送ack確認(可選0、1、all/-1)
batch-size: 16384 #批量大小
properties:
linger.ms: 0 # 當生產端積累的消息達到batch-size或接收到消息linger.ms后,生產者就會將消息提交給kafka linger.ms為0表示每接收到一條消息就提交給kafka,這時候batch-size其實就沒用了
buffer-memory: 33554432 #生產端緩沖區大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: defaultConsumerGroup # 默認的消費組ID
enable-auto-commit: true # 是否自動提交offset
## 當kafka中沒有初始offset或offset超出范圍時將自動重置offset
## earliest:重置為分區中最小的offset;
## latest:重置為分區中最新的offset(消費分區中新產生的數據);
## none:只要有一個分區不存在已提交的offset,就拋出異常;
auto-commit-interval:
ms: 1000
auto-offset-reset: latest
properties:
session.timeout.ms: 120000 # 消費會話超時時間(超過這個時間consumer沒有發送心跳,就會觸發rebalance操作)
request.timeout.ms: 180000 # 消費請求超時時間
listener:
missing-topics-fatal: false # 消費監聽接口監聽的主題不存在時,自動創建,true時表示如果不存在啟動報錯
flyway:
connect-retries: 0 #重試次數
如果設置了賬號密碼,那就需要配置賬號密碼
sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用戶名" password="密碼"; #分號不能去
不加分號會報錯
java.lang.IllegalArgumentException: JAAS config entry not terminated by semi-colon
at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:121) ~[kafka-clients-3.0.0.jar:na]
消費者也有的話 ,消費者也配置
消費者:
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author yvioo */ @Component public class KafkaConsumer { /** * 消費監聽 * @param record */ @KafkaListener(topics = "${kafkaTopic}") public void onMessage(ConsumerRecord<?, ?> record){ System.out.println("收到消息:topic名稱:"+record.topic()+",分區:"+record.partition()+",值:"+record.value()); } }
生產者
KafkaProducer.java
import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author 。 */ @RestController public class KafkaProducer { @Value("${kafkaTopic}") private String kafkaTopic; @Resource private KafkaTemplate<String, Object> kafkaTemplate; /** * 發送消息 * @param message */ @GetMapping("/send") public void sendMessage1(String message) { kafkaTemplate.send(kafkaTopic, message); } /** * 有發送結果回調 * @param message */ @GetMapping("/send/callback") public void sendMessage3(String message) { kafkaTemplate.send(kafkaTopic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("fail:"+ex.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("success:topic名稱:" + result.getRecordMetadata().topic() + ",分區:" + result.getRecordMetadata().partition() + ",消息在分區中的標識:" + result.getRecordMetadata().offset()); } }); } }
自定義發送的分區器
MyPartitioner.java
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 自定義分區器 * @author 。 */ public class MyPartitioner implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { //設置分區邏輯 return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
配置文件增加 后面跟類全路徑
partitioner.class: com.example.kafka.config.MyPartitioner #自定義分區器
很常見的場景就是我們希望下單、支付消息有順序,這樣以訂單ID作為key發送消息就達到了分區有序性的目的。
自定義生成者攔截器
KafkaProducerInterceptor.java
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * 生產者攔截器 * @author 。 */ public class KafkaProducerInterceptor implements ProducerInterceptor<String,String> { @Override public void configure(Map<String, ?> map) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { //這里可以改變發送的數據,比如加個時間戳 return new ProducerRecord<>(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"_"+record.value()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (recordMetadata != null) { //發送成功 System.out.println("發送成功"); }else { System.out.println("發送失敗"); } } @Override public void close() { } }
Kerberos證書連接配置文件方式
properties:
java.security.auth.login.config: /kerberos/kafka_kafka_jaas.conf
java.security.krb5.conf: /kerberos/krb5.conf
sasl.mechanism: GSSAPI
security.protocol: SASL_PLAINTEXT
sasl.kerberos.service.name: kafka
sasl.jaas.config: com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/kerberos/kafka.keytab" principal="kafka/admin@ETAIN.COM"; #分號不能少
Kerberos證書連接方式代碼demo
public static void connectKafka() { //准備JAAS配置文件路徑 String kafkaClientJaasFile = "/kerberos/kafka_kafka_jaas.conf"; // Kerberos配置文件路徑 String krb5FilePath = "/kerberos/krb5.conf"; // .keytab證書文件路徑 String keyTabPath = "/kerberos/dsmm_kafka.keytab"; // Kerberos Principal String principal = "kafka/admin@DFS.COM"; System.setProperty("java.security.auth.login.config", kafkaClientJaasFile); System.setProperty("java.security.krb5.conf", krb5FilePath); Properties props = new Properties(); props.setProperty("bootstrap.servers", "102.123.23.11:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //使用kerberos安全認證連接kafka props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "GSSAPI"); props.setProperty("sasl.kerberos.service.name", "kafka"); props.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required\n" + " useKeyTab=true\n" + " storeKey=true\n" + " keyTab=\"" + keyTabPath + "\"\n" + " principal=\"" + principal + "\";"); /** * 從Kafka topic中消費消息 */ props.setProperty("topic", "test"); //設置消費的位置,earliest表示從頭開始消費,latest表示從最新的位置開始消費 props.setProperty("auto.offset.reset", "earliest"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("test"); for (PartitionInfo partitionInfo : partitionInfos) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } kafkaConsumer.assign(topicPartitions); }
配置參數說明
enable.auto.commit
指定了消費者是否自動提交偏移量,默認值是true,為了盡量避免重復數據和數據丟失,可以把它設置為false,有自己控制合適提交偏移量,如果設置為true, 可以通過設置 auto.commit.interval.ms屬性來控制提交的頻率。
詳細地來說:
當一個consumer因某種原因退出Group時,進行重新分配partition后,同一group中的另一個consumer在讀取該partition時,怎么能夠知道上一個consumer該從哪個offset的message讀取呢?也是是如何保證同一個group內的consumer不重復消費消息呢?上面說了一次走網絡的fetch請求會拉取到一定量的數據,但是這些數據還沒有被消息完畢,Consumer就掛掉了,下一次進行數據fetch時,是否會從上次讀到的數據開始讀取,而導致Consumer消費的數據丟失嗎?
為了做到這一點,當使用完poll從本地緩存拉取到數據之后,需要client調用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪一個offset的message。
而這個commit方法會通過走網絡的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀取(不論進行了rebalance)時,既不會重復消費消息,也不會遺漏消息。
對於offset的commit,Kafka Consumer Java Client支持兩種模式:由KafkaConsumer自動提交,或者是用戶通過調用commitSync、commitAsync方法的方式完成offset的提交。
auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量后者偏移量無效(消費者長時間失效當前的偏移量已經過時並且被刪除了)的分區的情況下,應該作何處理,默認值是latest,也就是從最新記錄讀取數據(消費者啟動之后生成的記錄),另一個值是earliest,意思是在偏移量無效的情況下,消費者從起始位置開始讀取數據。
session.timeout.ms
該屬性指定了當消費者被認為已經掛掉之前可以與服務器斷開連接的時間。默認是3s,消費者在3s之內沒有再次向服務器發送心跳,那么將會被認為已經死亡。此時,協調器將會出發再均衡,把它的分區分配給其他的消費者,該屬性與heartbeat.interval.ms緊密相關,該參數定義了消費者發送心跳的時間間隔,也就是心跳頻率,一般要同時修改這兩個參數,heartbeat.interval.ms參數值必須要小於session.timeout.ms,一般是session.timeout.ms的三分之一,比如,session.timeout.ms設置成3min,那么heartbeat.interval.ms一般設置成1min,這樣,可以更快的檢測以及恢復崩潰的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡(有一種情況就是網絡延遲,本身消費者是沒有掛掉的,但是網絡延遲造成了心跳超時,這樣本不該發生再均衡,但是因為網絡原因造成了非預期的再均衡),把該屬性的值設置得大一些,可以減少意外的再均衡,不過檢測節點崩憤-需要更長的時間。
max.partition.fetch.bytes
該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值是lMB , 也
就是說,kafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超max.partitions.fetch.bytes 指定的字節。如果一個主題有20 個分區和5 個消費者,那么每個消費者需要至少4MB 的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發生奔潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比broker 能夠接收的最大消息的字節數(通過max.message.size 屬性配置)大, 否則消費者可能無法讀取這些消息,導致消費者一直掛起重試,例如,max.message.size設置為2MB,而該屬性設置為1MB,那么當一個生產者可能就會生產一條大小為2MB的消息,那么就會出現問題,消費者能從分區取回的最大消息大小就只有1MB,但是數據量是2MB,所以就會導致消費者一直掛起重試。
在設置該屬性時,另一個需要考慮的因素是消費者處理數據的時間。消費者需要頻繁調用poll()方法
來避免會話過期和發生分區再均衡,如果單次調用poll()返回的數據太多,消費者需要更多的時間來處理,可能無怯及時進行下一個輪詢來避免會話過期。如果出現這種情況, 可以把max.partitioin.fetch.bytes 值改小,或者延長會話過期時間。
fetch.min.bytes
消費者從服務器獲取記錄的最小字節數,broker收到消費者拉取數據的請求的時候,如果可用數據量小於設置的值,那么broker將會等待有足夠可用的數據的時候才返回給消費者,這樣可以降低消費者和broker的工作負載。
因為當主題不是很活躍的情況下,就不需要來來回回的處理消息,如果沒有很多可用數據,但消費者的CPU 使用率卻很高,那么就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值設置得大一點可以降低broker 的工作負載。
fetch.max.wait.ms
fetch.min.bytes設置了broker返回給消費者最小的數據量,而fetch.max.wait.ms設置的則是broker的等待時間,兩個屬性只要滿足了任何一條,broker都會將數據返回給消費者,也就是說舉個例子,fetch.min.bytes設置成1MB,fetch.max.wait.ms設置成1000ms,那么如果在1000ms時間內,如果數據量達到了1MB,broker將會把數據返回給消費者;如果已經過了1000ms,但是數據量還沒有達到1MB,那么broker仍然會把當前積累的所有數據返回給消費者。
max.poll.records
控制單次調用call方法能夠返回的記錄數量,幫助控制在輪詢里需要處理的數據量。
receive.buffer.bytes + send.buffer.bytes
socket 在讀寫數據時用到的TCP 緩沖區也可以設置大小。如果它們被設為-1 ,就使用操作系統的默認值。如果生產者或消費者與broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
partition.assignment.strategy
分區分配策略,kafka有兩個默認策略:
- Range:該策略會把主題的若干個連續的分區分配給消費者
- Robin:該策略把主題的所有分區逐個分配給消費者
分區策略默認是:org.apache.kafka.clients.consumer.RangeAssignor=>Range策略
org.apache.kafka.clients.consumer.RoundRobinAssignor=>Robin策略