一、启动zookeeper和kafka
bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties
二、新建项目
新建一个SpringBoot项目,引入所需jar包。
1 <dependency>
2 <groupId>org.springframework.kafka</groupId>
3 <artifactId>spring-kafka</artifactId>
4 </dependency>
5
6 <dependency>
7 <groupId>org.projectlombok</groupId>
8 <artifactId>lombok</artifactId>
9 <optional>true</optional>
10 </dependency>
11
12 <dependency>
13 <groupId>com.google.code.gson</groupId>
14 <artifactId>gson</artifactId>
15 <version>2.8.2</version>
16 </dependency>
这是主要用到的,注意版本问题,开始我就是spring-kafka版本写错了一直报错(技巧是这里不写具体版本,它会自动引入),具体参考这里。
配置文件application.properties,配置在bootstrap.yml上不行。
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.75.132:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
代码部分:
1 package cn.sp.component; 2
3 import cn.sp.entity.Message; 4 import com.google.gson.Gson; 5 import com.google.gson.GsonBuilder; 6 import lombok.extern.slf4j.Slf4j; 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.boot.CommandLineRunner; 9 import org.springframework.kafka.core.KafkaTemplate; 10 import org.springframework.stereotype.Component; 11
12 import java.util.Date; 13 import java.util.UUID; 14
15 /**
16 * @Author: 2YSP 17 * @Description: 18 * @Date: Created in 2018/5/2 19 */
20 @Component 21 //如果不想每次都写private final Logger logger = LoggerFactory.getLogger(XXX.class); 可以用注解@Slf4j
22 @Slf4j 23 public class KafkaSender implements CommandLineRunner{ 24
25 @Autowired 26 private KafkaTemplate<String,String> kafkaTemplate; 27
28 private Gson gson = new GsonBuilder().create(); 29
30 public void send(){ 31 Message message = new Message(); 32 message.setId(System.currentTimeMillis()); 33 message.setMsg(UUID.randomUUID().toString()); 34 message.setSendTime(new Date()); 35 log.info("++++++++++++++message:{}",gson.toJson(message)); 36 kafkaTemplate.send("ship",gson.toJson(message)); 37 } 38
39 @Override 40 public void run(String... strings) throws Exception { 41 for(int i=0;i<3;i++){ 42 send(); 43 try { 44 Thread.sleep(1000); 45 }catch (InterruptedException e){ 46 e.printStackTrace(); 47 } 48 } 49 } 50 }
项目一启动就会发送3次消息。
1 package cn.sp.component; 2
3 import lombok.extern.slf4j.Slf4j; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.springframework.kafka.annotation.KafkaListener; 6 import org.springframework.stereotype.Component; 7
8 import java.util.Optional; 9
10 /**
11 * @Author: 2YSP 12 * @Description: 13 * @Date: Created in 2018/5/2 14 */
15 @Component 16 @Slf4j 17 public class KafkaReceiver { 18
19 @KafkaListener(topics = {"ship"}) 20 public void listen(ConsumerRecord<?,?> record){ 21 Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 22 if (kafkaMessage.isPresent()){ 23 Object message = kafkaMessage.get(); 24 log.info("===========record:{}",record); 25 log.info("===========message:{}",message); 26 } 27 } 28 }
这里可以指定多个主题,topics={"ship","test"}。
三、启动测试
启动项目可以看到控制台日志输出如下:
消费了3次,生产消息3个。
代码地址:点击这里