Spring Boot Kafka


1、创建集群

http://kafka.apache.org/documentation/#quickstart

有一句我觉得特别重要: For Kafka, a single broker is just a cluster of size one.

1.1、命令行操作

#解压文件
tar -zxf kafka_2.11-1.1.0.tgz cd kafka_2.11-1.1.0

#启动Zookeerper
bin/zookeeper-server-start.sh config/zookeeper.properties #启动Kafka
bin/kafka-server-start.sh config/server.properties & cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2 bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &

#创建集群
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic myTopic #查看主题
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTopic

1.2、图形化界面操作

除了命令行以为,也可以通过kafka-manager查看

2、Spring Boot集成Kafka

2.1、引入Maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2、配置

spring:
  kafka:
    bootstrap-servers: 10.123.52.76:9092,10.123.52.76:9093,10.123.52.76:9094
    consumer:
      group-id: myGroup

2.3、收发消息

package com.cjs.boot.message; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component public class MyListener { @KafkaListener(topics = "myTopic") public void processMessage2(String content) { log.info("【Received Message From 'myTopic'】: {}", content); } }
package com.cjs.boot.controller; import com.cjs.boot.response.RespResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Controller; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.servlet.ModelAndView; @Controller @RequestMapping("/message") public class MessageController extends BaseController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/add.html") public ModelAndView add() { return new ModelAndView("message/add"); } @PostMapping("/send.json") @ResponseBody public RespResult send(String text) { ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("myTopic", String.valueOf(System.currentTimeMillis()), text); return RespResult.success(); } }
2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-05-04 12:36:59.736  INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e2018-05-04 12:36:59.830  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From 'myTopic'】: 大家好啊 2018-05-04 12:37:24.107  INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener          : 【Received Message From 'myTopic'】: 吃饭啦

2.4、截图

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM