0.前言
今天(2020-02-24)是開工的第一天,來到公司后,服務器出現問題,網管正在處理。沒有服務器的后端,就像沒有武器的劍客。沒辦法進行開發,就看看資料學習一點技術。
疫情期間,雖然沒有上班,但是自己的物聯網平台還是在慢慢的優化中。下面這個圖是規划后的V2版本架構圖。
架構圖里面用到Kafka中間件,是作為數據流來處理。由於MQTT(EMQ)無法進行數據的持久化,所以需要引入Kafka來實現處理。EMQ用來保證通信的實時性和高效性。Kafka利用消息隊列特性用來進行非實時與離線處理。
比如架構圖所示,可以利用EMQ的Kafka插件或者訂閱MQTT根Topic的方式,把通信內容按照規則發往Kafka,作為生產者。而后面的支付服務,離線大數據處理服務,數據存儲服務等,作為消費者。

1. 利用Docker-Compose搭建kafka
docker-compose.yml
1 version: '3' 2 services: 3 zookeeper: 4 image: wurstmeister/zookeeper 5 ports: 6 - "2181:2181" 7 kafka: 8 image: "wurstmeister/kafka:2.12-2.4.0" 9 ports: 10 - "9092:9092" 11 environment: 12 KAFKA_ADVERTISED_HOST_NAME: 192.168.0.106 13 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 14 volumes: 15 - /var/run/docker.sock:/var/run/docker.sock 16 - /root/workspace/kafka/data:/kafka 17 - /etc/localtime:/etc/localtime 18 kafka-manager: 19 image: "sheepkiller/kafka-manager" 20 restart: always 21 container_name: kafka-manger 22 ports: 23 - "9091:9000" 24 links: 25 - zookeeper 26 - kafka 27 environment: 28 ZK_HOSTS: zookeeper:2181 29 KAFKA_BROKERS: kafka:9092 30 KM_ARGS: -Djava.net.preferIPv4Stack=true 31 KM_USERNAME: admin 32 KM_PASSWORD: admin
下面是一些基礎操作
1 #進入容器 2 docker exec -it ${CONTAINER ID} /bin/bash 3 #進入目錄 4 cd opt/kafka_2.11-0.10.1.1 5 #創建Topic 6 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test 7 #查詢Topic 8 bin/kafka-topics.sh --list --bootstrap-server localhost:9092 9 10 #運行一個生產者 11 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 12 >This is Message 13 14 #運行一個消費者 15 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. Kafka Manager界面
訪問192.168.0.106:9091,這個界面就是Kafka Manager管理界面,我們增加一個Cluster。然后可以查看對應的kafka信息
3. SpringBoot集成Kafka
3.1 pom.xml
1 <dependency> 2 <groupId>org.springframework.kafka</groupId> 3 <artifactId>spring-kafka</artifactId> 4 </dependency> 5 <dependency> 6 <groupId>com.google.code.gson</groupId> 7 <artifactId>gson</artifactId> 8 </dependency>
3.2 KafkaController.java
1 package com.wunaozai.demo.kafka; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.web.bind.annotation.RequestMapping; 5 import org.springframework.web.bind.annotation.RestController; 6 7 @RestController 8 @RequestMapping(value="/kafka") 9 public class KafkaController { 10 11 @Autowired 12 private KafkaSender kafkaSender; 13 14 @RequestMapping(value="/send") 15 public String send(String msg) { 16 boolean flag = kafkaSender.send(msg); 17 return flag + ""; 18 } 19 }
3.3 KafkaMessage.java
1 package com.wunaozai.demo.kafka; 2 3 import java.sql.Timestamp; 4 5 public class KafkaMessage { 6 7 private Long id; 8 private String msg; 9 private Timestamp ts; 10 public Long getId() { 11 return id; 12 } 13 14 public void setId(Long id) { 15 this.id = id; 16 } 17 18 public String getMsg() { 19 return msg; 20 } 21 22 public void setMsg(String msg) { 23 this.msg = msg; 24 } 25 26 public Timestamp getTs() { 27 return ts; 28 } 29 30 public void setTs(Timestamp ts) { 31 this.ts = ts; 32 } 33 34 }
3.4 KafkaReceiver.java
1 package com.wunaozai.demo.kafka; 2 3 import java.util.Optional; 4 5 import org.apache.kafka.clients.consumer.ConsumerRecord; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.kafka.annotation.KafkaListener; 9 import org.springframework.stereotype.Component; 10 11 @Component 12 public class KafkaReceiver { 13 14 private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class); 15 16 @KafkaListener(topics= {"iot"}) 17 public void listen(ConsumerRecord<?, ?> record) { 18 Optional<?> message = Optional.ofNullable(record.value()); 19 if(message.isPresent()) { 20 Object msg = message.get(); 21 log.info("record : " + record); 22 log.info("message : " + msg); 23 } 24 } 25 }
3.5 KafkaSender.java
1 package com.wunaozai.demo.kafka; 2 3 import java.sql.Timestamp; 4 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.kafka.core.KafkaTemplate; 7 import org.springframework.stereotype.Component; 8 9 import com.google.gson.Gson; 10 import com.google.gson.GsonBuilder; 11 12 @Component 13 public class KafkaSender { 14 15 @Autowired 16 private KafkaTemplate<String, String> kafkaTemplate; 17 18 private Gson gson = new GsonBuilder().create(); 19 20 public boolean send(String msg) { 21 KafkaMessage message = new KafkaMessage(); 22 message.setId(System.currentTimeMillis()); 23 message.setTs(new Timestamp(System.currentTimeMillis())); 24 message.setMsg(msg); 25 kafkaTemplate.send("iot", gson.toJson(message)); 26 return true; 27 } 28 }
3.6 application.properties
1 #指定kafka 代理地址,可多個 2 spring.kafka.bootstrap-servers=192.168.0.106:9092 3 4 #provider 5 spring.kafka.producer.retries=0 6 #每次批量發送消息的數量 7 spring.kafka.producer.batch-size=16384 8 spring.kafka.producer.buffer-memory=33554432 9 #指定消息key和消息體body的編解碼方式 10 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 11 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 12 13 #consumer 14 # 指定默認消費者group id 15 spring.kafka.consumer.group-id=0 16 spring.kafka.consumer.auto-offset-reset=earliest 17 spring.kafka.consumer.enable-auto-commit=true 18 spring.kafka.consumer.auto-commit-interval=100 19 # 指定消息key和消息體的編解碼方式 20 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
運行效果圖

經過測試,通過java方式與console方式都是可以互相通信。
假設在KafkaReceiver.java 第23行增加 Thread.sleep(10000); 用來模擬實際業務延時。類似搶票或者秒殺的應用場景。突然高峰,然后把所有訂單數據都放到Kafka,然后在慢慢消費。生成實際業務訂單,再通知用戶付款。
按照架構設計,平台部分會訂閱EMQ的Topic,過濾部分數據或者完整數據報都發往Kafka,然后讓Kafka后面的消費者根據需要自己進行消費。比如對所有 iot/product-uuid/device-uuid/device/+/property 所有屬性日志相關的消息都通過Kafka的property這個Topic發送。消費者訂閱property后,就可以消費。至於消費后可以存入influxdb進行持久化,也可以預處理后顯示。MQTT設計中還有一類是 iot/product-uuid/device-uuid/device/+/event 事件類。這些相關的消息,會發往Kafka的event主題。然后由后面的消費者來消費event。進而進行報警等處理。
參考資料:
http://kafka.apachecn.org/documentation.html#operations
https://hub.docker.com/r/wurstmeister/kafka
https://mp.weixin.qq.com/s?__biz=MzU2NDg0OTgyMA==&mid=2247484570&idx=1&sn=1ad1c96bc7d47b88e976cbd045baf7d7
本文地址:https://www.cnblogs.com/wunaozai/p/12358247.html
本系列目錄: https://www.cnblogs.com/wunaozai/p/8067577.html
個人主頁:https://www.wunaozai.com/

