springboot整合kafka


TOC

springboot整合kafka

參考:

配置

依賴

需要web和kafka

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

注意,springboot版本對kafka版本影響不小,1.x可以使用1.x的kafka(比如1.1.1.RELEASE),2.0.x使用2.1.7.RELEASE,2.1.x使用 2.2.x.RELEASE;
版本不對都會導致項目無法啟動

yml配置

#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring:
  kafka:
  #指定kafka server的地址,集群配多個,中間,逗號隔開,或者使用  列表格式  
  # - 服務1
  # - 服務2   ....
    bootstrap-servers: 192.168.88.128:9092
    #=============== provider  =======================
    producer:
      retries: 0
      # 每次批量發送消息的數量
      batch-size: 16384
      acks: 1
      #這個值只能大不能小了,否則會影響sleuth。可以使用的最大內存來緩存等待發送到server端的消息
      buffer-memory: 1048576  # 這是最小的?
      retries: 0
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 單個請求的最大大小(以字節為單位)
        max.request.size: 2097152
        # 從發送請求到收到ACK確認等待的最長時間(超時時間)
        request.timeout.ms: 40000
        # 這項設置設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比這項設置要小的多,
        # 我們需要“linger”特定的時間以獲取更多的消息。 這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。
        linger.ms: 1
        # 消息發送失敗的情況下,重試發送的次數 存在消息發送是成功的,只是由於網絡導致ACK沒收到的重試,會出現消息被重復發送的情況
        message.send.max.retries: 0
    consumer:
      # 指定默認消費者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

測試

消息實體

@Data
@Accessors(chain = true)
@NoArgsConstructor
public class Message {
    /**
     * id
     */
    private Long id;
    /**
     * 消息
     */
    private String msg;
    /**
     * 時間戳
     */
    private Date sendTime;

}

發送消息

/** 消息發送方
 * @author jingshiyu
 * @date 2019/7/31 14:04:21
 * @desc
 */
@RestController
@Slf4j
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        Message message=new Message();
        message.setId(123L).setMsg(msg).setSendTime(new Date());
        kafkaTemplate.send("kafka_one", JSON.toJSONString(message));
    }

}

就這樣,發送消息代碼就實現了。

這里關鍵的代碼為 kafkaTemplate.send() 方法,kafka_one 是 Kafka 里的 topic ,這個 topic 在 Java 程序中是不需要提前在 Kafka 中設置的,因為它會在發送的時候自動創建你設置的 topic, JSON.toJSONString(message) 是消息內容

接收消息

/**
 * 監聽服務器上的kafka是否有相關的消息發過來
 */
@Component
@Slf4j
public class KafkaReceiver {
    /**
     * 定義此消費者接收topics = {"kafka_one"}的消息,與controller中的topic對應上即可
     * @param record 變量代表消息本身,可以通過ConsumerRecord<?,?>類型的record變量來打印接收的消息的各種信息
     */
    @KafkaListener(topics = {"kafka_one"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }
    }
}

客戶端 consumer 接收消息特別簡單,直接用@KafkaListener 注解即可,並在監聽中設置監聽的 topic ,topics 是一個數組所以是可以綁定多個主題的,上面的代碼中修改為 @KafkaListener(topics = {"zhisheng","tian"}) 就可以同時監聽兩個 topic 的消息了。需要注意的是:這里的 topic 需要和消息發送類 KafkaSender.java 中設置的 topic 一致。

發送消息

啟動項目之后,調用接口發送消息

http://192.168.0.173:8083/send?msg=測試消息

將會接收到消息

record =ConsumerRecord(topic = kafka_one, partition = 0, offset = 0, CreateTime = 1564556254952, serialized key size = -1, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":123,"msg":"測試消息","sendTime":1564556254808})

message ={"id":123,"msg":"測試消息","sendTime":1564556254808}

kafka查看

./kafka-topics.sh --list --zookeeper localhost:2181  在kafka上查看topic列表


就會發現剛才我們程序中的 kafka_one 已經自己創建了






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM