Springboot整合kafka


首先在windows下啟動kafka

啟動方法如下:

首先下載kafka,zookeeper安裝包:

 

修改下

為你配置的文件路徑

修改如圖文件

zookeeper啟動:

復制下面那個配置文件,重命名為zoo.cnf,然后啟動就可以了

再啟動kafka不知道為什么我再本機上,一直點擊啟動文件無法啟動,后來采用啟動窗口啟動的:

.\bin\windows\kafka-server-start.bat .\config\server.properties

 

進入那個包里就可以無需要進入bin下

 然后整合springboot

 

 

,

 

可以看到初始化的進行發送消息了,

 

看具體代碼:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.cxy</groupId>
    <artifactId>skafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>skafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

啟動類:

package com.cxy.skafka;

import com.cxy.skafka.component.UserLogProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class SkafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(SkafkaApplication.class, args);
    }

    @Autowired
    private UserLogProducer userLogProducer;

    @PostConstruct
    public  void init() {
        for (int i = 0; i < 10; i++) {
            userLogProducer.sendlog(String.valueOf(i));
        }
    }
}

model

package com.cxy.skafka.model;

import lombok.Data;
import lombok.experimental.Accessors;

/***
 * @ClassName: Usrlog
 * @Description:
 * @Auther: cxy
 * @Date: 2019/6/1:16:47
 * @version : V1.0
 */
@Data
@Accessors
public class Userlog {
    private  String username;
    private String userid;
    private String state;
}

 

producer

package com.cxy.skafka.component;

import com.alibaba.fastjson.JSON;
import com.cxy.skafka.model.Userlog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/***
 * @ClassName: UserLogProducer
 * @Description:
 * @Auther: cxy
 * @Date: 2019/6/1:16:48
 * @version : V1.0
 */
@Component
public class UserLogProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendlog(String userid){
        Userlog userlog = new Userlog();
        userlog.setUsername("cxy");
        userlog.setState("1");
        userlog.setUserid(userid);

        System.err.println(userlog+"1");

        kafkaTemplate.send("userLog",JSON.toJSONString(userlog));

    }
}

消費者:

package com.cxy.skafka.component;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/***
 * @ClassName: UserLogConsumer
 * @Description:
 * @Auther: cxy
 * @Date: 2019/6/1:16:54
 * @version : V1.0
 */
@Component
public class UserLogConsumer {
    @KafkaListener(topics = {"userLog"})
    public  void consumer(ConsumerRecord consumerRecord){
      Optional kafkaMsg=  Optional.ofNullable(consumerRecord.value());
      if (kafkaMsg.isPresent()){
        Object msg=  kafkaMsg.get();
        System.err.println(msg);
      }
    }
}

 

配置文件:

server.port=8080
#制定kafka代理地址
spring.kafka.bootstrap-servers=localhost:9092
#消息發送失敗重試次數
spring.kafka.producer.retries=0
#每次批量發送消息的數量
spring.kafka.producer.batch-size=16384
#每次批量發送消息的緩沖區大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默認消費者group id
spring.kafka.consumer.group-id=user-log-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

啟動之后就是上面的效果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM