一、kafka搭建
二、版本
springboot版本
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
kafka版本
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.2.RELEASE</version> </dependency>
三、基本的配置application.yml
實際上只有bootstrap-servers是必須配置的。
kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 192.168.31.10:9092,192.168.31.10:9093,192.168.31.10:9094 bootstrap-servers: 192.168.31.10:9092,192.168.31.10:9093,192.168.31.10:9094 topic: Name: home.bus.log #自定義topic名稱 numPartitions: 2 #自定義分區 replicationFactor: 2 #自定義副本 consumer: group-id: home.bus.log.group.1 auto-offset-reset: latest enable-auto-commit: true auto-commit-interval: 20000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
四、自定義topic
如果不配置topic屬性,系統會用缺省的,當然名字得需要配置,也可以在生產者中直接使用常量
@Configuration @EnableKafka public class KafkaTopicConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServer; @Value("${spring.kafka.topic.Name}") private String topicName; @Value("${spring.kafka.topic.numPartitions}") private int numPartitions; @Value("${spring.kafka.topic.replicationFactor}") private int replicationFactor; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); return new KafkaAdmin(configs); } @Bean public NewTopic myTopic() { //第三個參數是副本數量,確保集群中配置的數目大於等於副本數量 return new NewTopic(topicName, numPartitions, (short) replicationFactor); } }
通過配置,如果kafka服務器上沒有創建topic,則會按照自定義屬性來創建,如果topic名稱已經創建了,那么NewTopic將不會創建新的topic,無論topic其他分區和副本屬性是否相同
五、自定義的producer和consumer
簡單的使用,不需要自定義,yml文件也進行了基本的配置,如果需要自定義在參照如下額配置:
//@Configuration //@EnableKafka public class KafkaProducerConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServer; /* --------------producer configuration-----------------**/ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /* --------------kafka template configuration-----------------**/ @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
//@Configuration //@EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServer; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ConsumerConfig.GROUP_ID_CONFIG, "0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } }
六、創建生產者producer
根據需要直接調用就可以。
@Service public class KafkaServiceImpl implements MQService { @Value("${spring.mqconfig.mq-enable}") private boolean mqEnable; @Value("${spring.kafka.topic.Name}") private String topicName; private Logger logger = LoggerFactory.getLogger(KafkaServiceImpl.class); @Resource private KafkaTemplate<String, String> kafkaTemplate; public boolean isMqEnable() { return mqEnable; } public void setMqEnable(boolean mqEnable) { this.mqEnable = mqEnable; } @Override @Async("logThread") //異步,可以注銷,除非網絡出問題,否則發送消息到kafka服務器非常的快 public void sendMessage(String msg) { if(!isMqEnable()) return; long start = System.currentTimeMillis(); kafkaTemplate.send(topicName,msg); long end = System.currentTimeMillis(); logger.info("寫入kafka,耗時:"+(end-start)+"毫秒"); } }
七、創建消費者consumer
因為生產消息的時候進行了json封裝,獲取消息的時候對應進行反序列化
@Service public class MQConsumerServiceImpl implements MQConsumerService { private ObjectMapper objectMapper; private List<MQMessage> mqMessageList; private long maxMessageCount=100; @Override public List<MQMessage> getMessage() { return mqMessageList; } @KafkaListener(topics = "${spring.kafka.topic.Name}") private void consumer(ConsumerRecord<?, ?> record) { if(objectMapper==null) objectMapper = new ObjectMapper(); if(mqMessageList==null) mqMessageList = new ArrayList<>(); Optional<?> mqMessage = Optional.ofNullable(record.value()); if (mqMessage.isPresent()) { Object message = mqMessage.get(); try { if(mqMessageList.size()>maxMessageCount) { mqMessageList.remove(0); } MQMessage mq = objectMapper.readValue((String)message, MQMessage.class);//反序列化 mqMessageList.add(mq); }catch (Exception e) { e.printStackTrace(); } } } }
八、顯示到頁面上
大部分場景,我們並不需要把消息取出來顯示,這里為了順便測試一下消費者,也可以在kafka服務器上用命令查看
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.10:9092 --topic home.bus.log --from-beginning
@Controller @RequestMapping(value = "/mq") public class mqController { @Resource(name = "MQConsumerServiceImpl") private MQConsumerService mqConsumerService; @RequestMapping(value = "/list") public String list() { return "/mq/mqList"; } @RequestMapping(value = "/getMsgList") @ResponseBody public Object getMsgList(HttpServletRequest request) { int pageSize = 50; try { pageSize = Integer.parseInt(request.getParameter("pageSize")); } catch (Exception e) { e.printStackTrace(); } int pageNumber = 0; try { pageNumber = Integer.parseInt(request.getParameter("pageNumber")) - 1; } catch (Exception e) { e.printStackTrace(); } Map<String, Object> map = new HashMap<>(); String sortName = request.getParameter("sortName") == null ? "roleId" : request.getParameter("sortName"); String sortOrder = request.getParameter("sortOrder") == null ? "asc" : request.getParameter("sortOrder"); Sort sortLocal = new Sort(sortOrder.equalsIgnoreCase("asc") ? Sort.Direction.ASC : Sort.Direction.DESC, sortName); Pageable pageable = PageRequest.of(pageNumber, pageSize, sortLocal); Page<MQMessage> mqMessagePage = new PageImpl<MQMessage>(mqConsumerService.getMessage(),pageable,this.mqConsumerService.getMessage().size()); map.put("total", mqMessagePage.getTotalElements()); map.put("rows", mqMessagePage.getContent()); return map; } }