本文為博主原創,未經允許不得轉載:
目錄:
1. 自定義生產消息 kafkaTemplate 實例
2. 封裝 kafka 發送消息的service 方法
3. 測試 kafka 發送消息service 的方法
4. 自定義 kafka 消費消息的工廠 bean
5. kafka 監聽消費消息
1. 自定義 kafkaTemplate 實例
a : 使用 @ConditionalOnProperty 注解屬性控制是否加載 kafka 相關初始化配置,因為在項目開發過程中,如kafka 或redis 等工具容易封裝為
工具類,被各微服務引用並進行加載。使用 @ConditionalOnProperty 注解的 havingValue 屬性可以控制服務中是否進行加載對應的配置。
該屬性的值,可在 yaml 配置文件中指定: kafka.used = true 。如果為true 則加載,false則不加載
b. 使用工廠實例生成指定的 kafkaTemplate 實例
package com.example.demo.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @ConditionalOnProperty(prefix="kafka",name = "isClose",havingValue = "true") public class KafkaTemplateConfig { /** * Producer Template 配置 */ @Bean(name="kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * Producer 工廠配置 */ @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * Producer 參數配置 */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // 指定多個kafka集群多個地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092"); // 重試次數,0為不啟用重試機制 props.put(ProducerConfig.RETRIES_CONFIG, 0); //同步到副本, 默認為1 // acks=0 把消息發送到kafka就認為發送成功 // acks=1 把消息發送到kafka leader分區,並且寫入磁盤就認為發送成功 // acks=all 把消息發送到kafka leader分區,並且leader分區的副本follower對消息進行了同步就任務發送成功 props.put(ProducerConfig.ACKS_CONFIG, 1); // 生產者空間不足時,send()被阻塞的時間,默認60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 控制批處理大小,單位為字節 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,從而提高並發量 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 消息的最大大小限制,也就是說send的消息大小不能超過這個限制, 默認1048576(1MB) props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576); // 鍵的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 值的序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 壓縮消息,支持四種類型,分別為:none、lz4、gzip、snappy,默認為none。 // 消費者默認支持解壓,所以壓縮設置在生產者,消費者無需設置。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none"); return props; } }
2. 封裝 kafka 發送消息的service 方法:
package com.example.demo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @Service public class KafkaProduceService { @Autowired private KafkaTemplate kafkaTemplate; /** * producer 同步方式發送數據 * * @param topic topic名稱 * @param message producer發送的數據 */ public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException { kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); } /** * producer 異步方式發送數據 * * @param topic topic名稱 * @param message producer發送的數據 */ public void sendMessageAsync(String topic, String message) { kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable throwable) { System.out.println("success"); } @Override public void onSuccess(Object o) { System.out.println("failure"); } }); } }
3. 測試 kafka 發送消息service 的方法:
package com.example.demo; import com.example.demo.service.KafkaProduceService; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class ProduceServiceTest { @Autowired private KafkaProduceService producerService; @Test public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException { producerService.sendMessageSync("test","同步發送消息測試"); } @Test public void sendMessageAsync() { producerService.sendMessageAsync("test","異步發送消息測試"); } }
4. 自定義 kafka 消費消息的工廠 bean :
package com.example.demo.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 設置消費者工廠 factory.setConsumerFactory(consumerFactory()); // 消費者組中線程數量 factory.setConcurrency(3); // 拉取超時時間 factory.getContainerProperties().setPollTimeout(3000); // 當使用批量監聽器時需要設置為true factory.setBatchListener(true); return factory; } // @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } // @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // Kafka地址 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092"); //配置默認分組,這里沒有配置+在監聽的地方沒有設置groupId,多個服務會出現收到相同消息情況 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup"); // 是否自動提交offset偏移量(默認true) propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自動提交的頻率(ms) propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // Session超時設置 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // 鍵的反序列化方式 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 值的反序列化方式 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // offset偏移量規則設置: // (1)、earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 // (2)、latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 // (3)、none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } }
5. kafka 監聽消費消息:
package com.example.demo.service; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumerListener { @KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory") public void kafkaListener(String message){ System.out.println(message); } }