springboot整合kafka


一、启动zookeeper和kafka

bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties

 

二、新建项目

新建一个SpringBoot项目,引入所需jar包。

 1         <dependency>
 2             <groupId>org.springframework.kafka</groupId>
 3             <artifactId>spring-kafka</artifactId>
 4         </dependency>
 5 
 6         <dependency>
 7             <groupId>org.projectlombok</groupId>
 8             <artifactId>lombok</artifactId>
 9             <optional>true</optional>
10         </dependency>
11 
12         <dependency>
13             <groupId>com.google.code.gson</groupId>
14             <artifactId>gson</artifactId>
15             <version>2.8.2</version>
16         </dependency>

这是主要用到的,注意版本问题,开始我就是spring-kafka版本写错了一直报错(技巧是这里不写具体版本,它会自动引入),具体参考这里

配置文件application.properties,配置在bootstrap.yml上不行。

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.75.132:9092

#=============== provider  =======================

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

代码部分:

 1 package cn.sp.component;  2 
 3 import cn.sp.entity.Message;  4 import com.google.gson.Gson;  5 import com.google.gson.GsonBuilder;  6 import lombok.extern.slf4j.Slf4j;  7 import org.springframework.beans.factory.annotation.Autowired;  8 import org.springframework.boot.CommandLineRunner;  9 import org.springframework.kafka.core.KafkaTemplate; 10 import org.springframework.stereotype.Component; 11 
12 import java.util.Date; 13 import java.util.UUID; 14 
15 /**
16  * @Author: 2YSP 17  * @Description: 18  * @Date: Created in 2018/5/2 19  */
20 @Component 21 //如果不想每次都写private final Logger logger = LoggerFactory.getLogger(XXX.class); 可以用注解@Slf4j
22 @Slf4j 23 public class KafkaSender implements CommandLineRunner{ 24 
25  @Autowired 26     private KafkaTemplate<String,String> kafkaTemplate; 27 
28     private Gson gson = new GsonBuilder().create(); 29 
30     public void send(){ 31         Message message = new Message(); 32  message.setId(System.currentTimeMillis()); 33  message.setMsg(UUID.randomUUID().toString()); 34         message.setSendTime(new Date()); 35         log.info("++++++++++++++message:{}",gson.toJson(message)); 36         kafkaTemplate.send("ship",gson.toJson(message)); 37  } 38 
39  @Override 40     public void run(String... strings) throws Exception { 41         for(int i=0;i<3;i++){ 42  send(); 43             try { 44                 Thread.sleep(1000); 45             }catch (InterruptedException e){ 46  e.printStackTrace(); 47  } 48  } 49  } 50 }

项目一启动就会发送3次消息。

 1 package cn.sp.component;  2 
 3 import lombok.extern.slf4j.Slf4j;  4 import org.apache.kafka.clients.consumer.ConsumerRecord;  5 import org.springframework.kafka.annotation.KafkaListener;  6 import org.springframework.stereotype.Component;  7 
 8 import java.util.Optional;  9 
10 /**
11  * @Author: 2YSP 12  * @Description: 13  * @Date: Created in 2018/5/2 14  */
15 @Component 16 @Slf4j 17 public class KafkaReceiver { 18 
19   @KafkaListener(topics = {"ship"}) 20   public void listen(ConsumerRecord<?,?> record){ 21       Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 22       if (kafkaMessage.isPresent()){ 23           Object message = kafkaMessage.get(); 24           log.info("===========record:{}",record); 25           log.info("===========message:{}",message); 26  } 27  } 28 }

这里可以指定多个主题,topics={"ship","test"}。

三、启动测试

启动项目可以看到控制台日志输出如下:

消费了3次,生产消息3个。

代码地址:点击这里

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM