kafka概念
Topic:消息根据Topic进行归类,可以理解为一个队里。
Producer:消息生产者,就是向kafka broker发消息的客户端。
Consumer:消息消费者,向kafka broker取消息的客户端。
broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
Zookeeper:依赖集群保存meta信息。
1.安装
启动kafka需要zookeeper,所以需要安装zookeeper和kafka,这里就不介绍安装过程了。
2.引入kafka的依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> </dependencies>
3.定义消息
public class Message { private Long id; //id private String msg; //消息 private Date sendTime; //时间戳 public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; } }
4.提供者,注意加注解
发送消息时会有一个topic
@Component public class KafkaSender { private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); } }
5.消费者,注意加注解
需要加一个kafka监听的注解,里面需要指定topic,表示消费哪一种消息。
@Component public class KafkaReceiver { private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = {"zhisheng"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } } }
7.启动类编写
这里调用消息提供方,写入了三条消息
@SpringBootApplication public class KafkaDemoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) { //调用消息发送类中的消息发送方法 sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
8.配置文件application.properties
#============== kafka =================== # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=127.0.0.1:9092 #=============== provider ======================= spring.kafka.producer.retries=0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默认消费者group id spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer