物聯網架構成長之路(49)-SpringBoot集成KafKa中間件


0.前言

  今天(2020-02-24)是開工的第一天,來到公司后,服務器出現問題,網管正在處理。沒有服務器的后端,就像沒有武器的劍客。沒辦法進行開發,就看看資料學習一點技術。
  疫情期間,雖然沒有上班,但是自己的物聯網平台還是在慢慢的優化中。下面這個圖是規划后的V2版本架構圖。
  架構圖里面用到Kafka中間件,是作為數據流來處理。由於MQTT(EMQ)無法進行數據的持久化,所以需要引入Kafka來實現處理。EMQ用來保證通信的實時性和高效性。Kafka利用消息隊列特性用來進行非實時與離線處理。
  比如架構圖所示,可以利用EMQ的Kafka插件或者訂閱MQTT根Topic的方式,把通信內容按照規則發往Kafka,作為生產者。而后面的支付服務,離線大數據處理服務,數據存儲服務等,作為消費者。

1. 利用Docker-Compose搭建kafka

  docker-compose.yml

 1 version: '3'
 2 services:
 3     zookeeper:
 4         image: wurstmeister/zookeeper
 5         ports:
 6             - "2181:2181"
 7     kafka:
 8         image: "wurstmeister/kafka:2.12-2.4.0"
 9         ports:
10             - "9092:9092"
11         environment:
12             KAFKA_ADVERTISED_HOST_NAME: 192.168.0.106
13             KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
14         volumes:
15             - /var/run/docker.sock:/var/run/docker.sock
16             - /root/workspace/kafka/data:/kafka
17             - /etc/localtime:/etc/localtime
18     kafka-manager:
19         image: "sheepkiller/kafka-manager"
20         restart: always
21         container_name: kafka-manger
22         ports:
23             - "9091:9000"
24         links:
25             - zookeeper
26             - kafka
27         environment:
28             ZK_HOSTS: zookeeper:2181
29             KAFKA_BROKERS: kafka:9092
30             KM_ARGS: -Djava.net.preferIPv4Stack=true
31             KM_USERNAME: admin
32             KM_PASSWORD: admin

  下面是一些基礎操作

 1 #進入容器
 2 docker exec -it ${CONTAINER ID} /bin/bash 
 3 #進入目錄
 4 cd opt/kafka_2.11-0.10.1.1
 5 #創建Topic
 6 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
 7 #查詢Topic
 8 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
 9 
10 #運行一個生產者
11 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
12 >This is Message
13 
14 #運行一個消費者
15 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 

2. Kafka Manager界面
  訪問192.168.0.106:9091,這個界面就是Kafka Manager管理界面,我們增加一個Cluster。然后可以查看對應的kafka信息

 

3. SpringBoot集成Kafka

3.1 pom.xml

1         <dependency>
2             <groupId>org.springframework.kafka</groupId>
3             <artifactId>spring-kafka</artifactId>
4         </dependency>
5         <dependency>
6             <groupId>com.google.code.gson</groupId>
7             <artifactId>gson</artifactId>
8         </dependency>

3.2 KafkaController.java

 1 package com.wunaozai.demo.kafka;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.web.bind.annotation.RequestMapping;
 5 import org.springframework.web.bind.annotation.RestController;
 6 
 7 @RestController
 8 @RequestMapping(value="/kafka")
 9 public class KafkaController {
10 
11     @Autowired
12     private KafkaSender kafkaSender;
13     
14     @RequestMapping(value="/send")
15     public String send(String msg) {
16         boolean flag = kafkaSender.send(msg);
17         return flag + "";
18     }
19 }

3.3 KafkaMessage.java

 1 package com.wunaozai.demo.kafka;
 2 
 3 import java.sql.Timestamp;
 4 
 5 public class KafkaMessage {
 6     
 7     private Long id;
 8     private String msg;
 9     private Timestamp ts;
10     public Long getId() {
11         return id;
12     }
13     
14     public void setId(Long id) {
15         this.id = id;
16     }
17     
18     public String getMsg() {
19         return msg;
20     }
21     
22     public void setMsg(String msg) {
23         this.msg = msg;
24     }
25     
26     public Timestamp getTs() {
27         return ts;
28     }
29     
30     public void setTs(Timestamp ts) {
31         this.ts = ts;
32     }
33     
34 }

3.4 KafkaReceiver.java

 1 package com.wunaozai.demo.kafka;
 2 
 3 import java.util.Optional;
 4 
 5 import org.apache.kafka.clients.consumer.ConsumerRecord;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.stereotype.Component;
10 
11 @Component
12 public class KafkaReceiver {
13 
14     private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
15 
16     @KafkaListener(topics= {"iot"})
17     public void listen(ConsumerRecord<?, ?> record) {
18         Optional<?> message = Optional.ofNullable(record.value());
19         if(message.isPresent()) {
20             Object msg = message.get();
21             log.info("record : " + record);
22             log.info("message : " + msg);
23         }
24     }
25 }

3.5 KafkaSender.java

 1 package com.wunaozai.demo.kafka;
 2 
 3 import java.sql.Timestamp;
 4 
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.kafka.core.KafkaTemplate;
 7 import org.springframework.stereotype.Component;
 8 
 9 import com.google.gson.Gson;
10 import com.google.gson.GsonBuilder;
11 
12 @Component
13 public class KafkaSender {
14 
15     @Autowired
16     private KafkaTemplate<String, String> kafkaTemplate;
17     
18     private Gson gson = new GsonBuilder().create();
19     
20     public boolean send(String msg) {
21         KafkaMessage message = new KafkaMessage();
22         message.setId(System.currentTimeMillis());
23         message.setTs(new Timestamp(System.currentTimeMillis()));
24         message.setMsg(msg); 
25         kafkaTemplate.send("iot", gson.toJson(message));
26         return true;
27     }
28 }

3.6 application.properties

 1 #指定kafka 代理地址,可多個
 2 spring.kafka.bootstrap-servers=192.168.0.106:9092
 3 
 4 #provider
 5 spring.kafka.producer.retries=0
 6 #每次批量發送消息的數量
 7 spring.kafka.producer.batch-size=16384
 8 spring.kafka.producer.buffer-memory=33554432
 9 #指定消息key和消息體body的編解碼方式
10 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
11 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
12 
13 #consumer
14 # 指定默認消費者group id
15 spring.kafka.consumer.group-id=0
16 spring.kafka.consumer.auto-offset-reset=earliest
17 spring.kafka.consumer.enable-auto-commit=true
18 spring.kafka.consumer.auto-commit-interval=100
19 # 指定消息key和消息體的編解碼方式
20 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 運行效果圖

  經過測試,通過java方式與console方式都是可以互相通信。

  假設在KafkaReceiver.java 第23行增加 Thread.sleep(10000); 用來模擬實際業務延時。類似搶票或者秒殺的應用場景。突然高峰,然后把所有訂單數據都放到Kafka,然后在慢慢消費。生成實際業務訂單,再通知用戶付款。

  按照架構設計,平台部分會訂閱EMQ的Topic,過濾部分數據或者完整數據報都發往Kafka,然后讓Kafka后面的消費者根據需要自己進行消費。比如對所有 iot/product-uuid/device-uuid/device/+/property 所有屬性日志相關的消息都通過Kafka的property這個Topic發送。消費者訂閱property后,就可以消費。至於消費后可以存入influxdb進行持久化,也可以預處理后顯示。MQTT設計中還有一類是 iot/product-uuid/device-uuid/device/+/event 事件類。這些相關的消息,會發往Kafka的event主題。然后由后面的消費者來消費event。進而進行報警等處理。


參考資料:
  http://kafka.apachecn.org/documentation.html#operations
  https://hub.docker.com/r/wurstmeister/kafka
  https://mp.weixin.qq.com/s?__biz=MzU2NDg0OTgyMA==&mid=2247484570&idx=1&sn=1ad1c96bc7d47b88e976cbd045baf7d7

本文地址:https://www.cnblogs.com/wunaozai/p/12358247.html
本系列目錄: https://www.cnblogs.com/wunaozai/p/8067577.html
個人主頁:https://www.wunaozai.com/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM