SpringBoot集成Kafka實踐


一、准備

1、啟動zookeeper

2、啟動kafka

3、kafka創建主題。主題名稱為:couponTopic

./kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic couponTopic

 

 

二、生產者工程

1、增加引用

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>

  

2、增加配置

spring:
  kafka:
    bootstrap-servers: 47.xx.xx.120:9092
    consumer:
      group-id:mygroup
    listener:
      concurrency: 4

  

3、服務層增加kafka調用

注入kafkaTemplate

    @Autowired
    public MerchantsServImpl(MerchantsDao merchantsDao,
                             KafkaTemplate<String, String> kafkaTemplate) {
        this.merchantsDao = merchantsDao;
        this.kafkaTemplate = kafkaTemplate;
    }

  

通過kafkaTemplate發送消息。Constants.TEMPLATE_TOPIC為couponTopic

            String passTemplate = JSON.toJSONString(template);
            kafkaTemplate.send(
                    Constants.TEMPLATE_TOPIC,
                    Constants.TEMPLATE_TOPIC,
                    passTemplate
            );

  

 

 

 

 

三、消費者工程

1、增加Kafka依賴

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>1.1.1.RELEASE</version>
		</dependency>

  

2、配置連接kafka

spring:
  application:
    name: kafkaConsume
  kafka:
    bootstrap-servers: 4x.xx.xx.xx:9092
    consumer:
      group-id: = mygroup
    listener:
      concurrency: 4

server:
  port: 9527

  

3、接收Kafka消息

主題名稱:

public static final String TEMPLATE_TOPIC = "couponTopic";
@Slf4j
@Component
public class ConsumeTemplate {



    @KafkaListener(topics = {Constants.TEMPLATE_TOPIC})
    public void receive(@Payload String passTemplate,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("Consumer Receive PassTemplate: {}", passTemplate);

        PassTemplate pt;

        try {
            pt = JSON.parseObject(passTemplate, PassTemplate.class);
        } catch (Exception ex) {
            log.error("Parse PassTemplate Error: {}", ex.getMessage());
            return;
        }

        ...
    }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM