使用spring+springMVC+mybatis+kafka做了兩個web項目,一個是生產者,一個是消費者。
通過JMeter測試工具模擬100個用戶並發訪問生產者項目,發送json數據給生產者的接口,生產者將json數據發送到kafka集群,
消費者監聽到kafka集群中的消息就開始消費,並將json解析成對象存到MySQL數據庫。
下面是使用JMeter測試工具模擬100個並發的線程設置截圖:
請求所發送的數據:
下面是100個用戶10000個請求的聚合報告:
下面是生產者截圖生產完10000條消息的時間截圖:
下面是消費者項目消費入庫的結束時間截圖:
可見,10000條消息從生產完成到入庫(消費完10000條消息的時間只是比生產完成的時間落后了幾十秒,但是消費端真正入庫完成所需要的時間很長)完成時間相差了10幾分鍾。
下面是MySQL數據庫截圖,數據全部入庫成功:
下面是消息對應的POJO:
1 package com.xuebusi.pojo; 2 3 public class TbPerson { 4 private Long id; 5 6 private String name; 7 8 private Integer age; 9 10 public Long getId() { 11 return id; 12 } 13 14 public void setId(Long id) { 15 this.id = id; 16 } 17 18 public String getName() { 19 return name; 20 } 21 22 public void setName(String name) { 23 this.name = name == null ? null : name.trim(); 24 } 25 26 public Integer getAge() { 27 return age; 28 } 29 30 public void setAge(Integer age) { 31 this.age = age; 32 } 33 34 @Override 35 public String toString() { 36 return "TbPerson [id=" + id + ", name=" + name + ", age=" + age + "]"; 37 } 38 }
下面是生產端的邏輯:
1 package com.xuebusi.controller; 2 3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.KafkaService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.stereotype.Controller; 9 import org.springframework.web.bind.annotation.RequestBody; 10 import org.springframework.web.bind.annotation.RequestMapping; 11 import org.springframework.web.bind.annotation.RequestMethod; 12 import org.springframework.web.bind.annotation.ResponseBody; 13 14 import javax.annotation.Resource; 15 16 @Controller 17 @RequestMapping("/producer") 18 public class KafkaController { 19 20 private static final Logger logger = LoggerFactory.getLogger(KafkaController.class); 21 22 @Resource 23 private KafkaService kafkaService; 24 25 /** 26 * 發消息到ssmk這個topic 27 * @param person 28 * @return 29 */ 30 @RequestMapping(value = "/person", method = RequestMethod.POST) 31 @ResponseBody 32 public String createPerson(@RequestBody TbPerson person) { 33 if (person == null){ 34 return "fail, data can not be null."; 35 } 36 String json = JSON.toJSONString(person); 37 boolean result = kafkaService.sendInfo("ssmk", json); 38 logger.info("生產者發送消息[" + result + "]:" + json); 39 return "success"; 40 } 41 }
下面是消費端的邏輯:
1 package com.xuebusi.consumer; 2 3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.PersonService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.stereotype.Service; 10 11 import java.util.List; 12 import java.util.Map; 13 14 @Service 15 public class KafkaConsumerService { 16 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class); 17 18 @Autowired 19 private PersonService personService; 20 21 public void processMessage(Map<String, Map<Integer, String>> msgs) { 22 /*for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) { 23 String topic = entry.getKey(); 24 Map<Integer, String> value = entry.getValue(); 25 for (Map.Entry<Integer, String> entrySet : value.entrySet()) { 26 Integer partiton = entrySet.getKey(); 27 String msg = entrySet.getValue(); 28 logger.info("消費的主題:" + topic + ",消費的分區:" + partiton + ",消費的消息:" + msg); 29 logger.info("=======使用JSON解析對象========="); 30 TbPerson person = JSON.parseObject(msg, TbPerson.class); 31 logger.info("=======對象開始入庫========="); 32 personService.insert(person); 33 logger.info("=======對象入庫成功========="); 34 } 35 }*/ 36 37 for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) { 38 String topic = entry.getKey(); 39 Map<Integer, String> value = entry.getValue(); 40 for (Map.Entry<Integer, String> entrySet : value.entrySet()) { 41 Integer partiton = entrySet.getKey(); 42 String msg = entrySet.getValue(); 43 logger.info("消費的主題:" + topic + ",消費的分區:" + partiton + ",消費的消息:" + msg); 44 msg = "[" + msg + "]";//注意這里要在前后都加上中括號,否則下面在解析json成對象的時候會報json格式不對的異常(spring會對多條json數據用逗號分隔) 45 logger.info("=======使用JSON解析對象========="); 46 List<TbPerson> personList = JSON.parseArray(msg, TbPerson.class); 47 //TbPerson person = JSON.parseObject(msg, TbPerson.class); 48 if (personList != null && personList.size() > 0) { 49 logger.info("消息中包含[" + personList.size() + "]個對象"); 50 for (TbPerson person : personList) { 51 logger.info("=======對象開始入庫========="); 52 personService.insert(person); 53 logger.info("=======對象入庫成功========="); 54 } 55 } 56 57 } 58 } 59 } 60 }
如果覺得本文對您有幫助,不妨掃描下方微信二維碼打賞點,您的鼓勵是我前進最大的動力: