spring整合kafka項目生產和消費測試結果記錄(一)


使用spring+springMVC+mybatis+kafka做了兩個web項目,一個是生產者,一個是消費者。

通過JMeter測試工具模擬100個用戶並發訪問生產者項目,發送json數據給生產者的接口,生產者將json數據發送到kafka集群,

消費者監聽到kafka集群中的消息就開始消費,並將json解析成對象存到MySQL數據庫。

下面是使用JMeter測試工具模擬100個並發的線程設置截圖:

請求所發送的數據:

 

下面是100個用戶10000個請求的聚合報告:

 

下面是生產者截圖生產完10000條消息的時間截圖:

 

下面是消費者項目消費入庫的結束時間截圖:

 

可見,10000條消息從生產完成到入庫(消費完10000條消息的時間只是比生產完成的時間落后了幾十秒,但是消費端真正入庫完成所需要的時間很長)完成時間相差了10幾分鍾。

 

下面是MySQL數據庫截圖,數據全部入庫成功:

下面是消息對應的POJO:

 1 package com.xuebusi.pojo;
 2 
 3 public class TbPerson {
 4     private Long id;
 5 
 6     private String name;
 7 
 8     private Integer age;
 9 
10     public Long getId() {
11         return id;
12     }
13 
14     public void setId(Long id) {
15         this.id = id;
16     }
17 
18     public String getName() {
19         return name;
20     }
21 
22     public void setName(String name) {
23         this.name = name == null ? null : name.trim();
24     }
25 
26     public Integer getAge() {
27         return age;
28     }
29 
30     public void setAge(Integer age) {
31         this.age = age;
32     }
33 
34     @Override
35     public String toString() {
36         return "TbPerson [id=" + id + ", name=" + name + ", age=" + age + "]";
37     }
38 }

 

下面是生產端的邏輯:

 1 package com.xuebusi.controller;
 2 
 3 import com.alibaba.fastjson.JSON;
 4 import com.xuebusi.pojo.TbPerson;
 5 import com.xuebusi.service.KafkaService;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 import org.springframework.stereotype.Controller;
 9 import org.springframework.web.bind.annotation.RequestBody;
10 import org.springframework.web.bind.annotation.RequestMapping;
11 import org.springframework.web.bind.annotation.RequestMethod;
12 import org.springframework.web.bind.annotation.ResponseBody;
13 
14 import javax.annotation.Resource;
15 
16 @Controller
17 @RequestMapping("/producer")
18 public class KafkaController {
19 
20     private static final Logger logger = LoggerFactory.getLogger(KafkaController.class);
21 
22     @Resource
23     private KafkaService kafkaService;
24 
25     /**
26      * 發消息到ssmk這個topic
27      * @param person
28      * @return
29      */
30     @RequestMapping(value = "/person", method = RequestMethod.POST)
31     @ResponseBody
32     public String createPerson(@RequestBody TbPerson person) {
33         if (person == null){
34             return "fail, data can not be null.";
35         }
36         String json = JSON.toJSONString(person);
37         boolean result = kafkaService.sendInfo("ssmk", json);
38         logger.info("生產者發送消息[" + result + "]:" + json);
39         return "success";
40     }
41 }

 

下面是消費端的邏輯:

 1 package com.xuebusi.consumer;
 2 
 3 import com.alibaba.fastjson.JSON;
 4 import com.xuebusi.pojo.TbPerson;
 5 import com.xuebusi.service.PersonService;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 import org.springframework.beans.factory.annotation.Autowired;
 9 import org.springframework.stereotype.Service;
10 
11 import java.util.List;
12 import java.util.Map;
13 
14 @Service
15 public class KafkaConsumerService {
16     private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
17 
18     @Autowired
19     private PersonService personService;
20 
21     public void processMessage(Map<String, Map<Integer, String>> msgs) {
22         /*for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
23             String topic = entry.getKey();
24             Map<Integer, String> value = entry.getValue();
25             for (Map.Entry<Integer, String> entrySet : value.entrySet()) {
26                 Integer partiton = entrySet.getKey();
27                 String msg = entrySet.getValue();
28                 logger.info("消費的主題:" + topic + ",消費的分區:" + partiton + ",消費的消息:" + msg);
29                 logger.info("=======使用JSON解析對象=========");
30                 TbPerson person = JSON.parseObject(msg, TbPerson.class);
31                 logger.info("=======對象開始入庫=========");
32                 personService.insert(person);
33                 logger.info("=======對象入庫成功=========");
34             }
35         }*/
36 
37         for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
38             String topic = entry.getKey();
39             Map<Integer, String> value = entry.getValue();
40             for (Map.Entry<Integer, String> entrySet : value.entrySet()) {
41                 Integer partiton = entrySet.getKey();
42                 String msg = entrySet.getValue();
43                 logger.info("消費的主題:" + topic + ",消費的分區:" + partiton + ",消費的消息:" + msg);
44                 msg = "[" + msg + "]";//注意這里要在前后都加上中括號,否則下面在解析json成對象的時候會報json格式不對的異常(spring會對多條json數據用逗號分隔) 45                 logger.info("=======使用JSON解析對象=========");
46                 List<TbPerson> personList = JSON.parseArray(msg, TbPerson.class);
47                 //TbPerson person = JSON.parseObject(msg, TbPerson.class);
48                 if (personList != null && personList.size() > 0) {
49                     logger.info("消息中包含[" + personList.size() + "]個對象");
50                     for (TbPerson person : personList) {
51                         logger.info("=======對象開始入庫=========");
52                         personService.insert(person);
53                         logger.info("=======對象入庫成功=========");
54                     }
55                 }
56 
57             }
58         }
59     }
60 }

 

如果覺得本文對您有幫助,不妨掃描下方微信二維碼打賞點,您的鼓勵是我前進最大的動力:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM