在分布式系統中,設計思路很重要
先來講講大概思路,代碼都是可以改的但思路大同小異。
先緩存商品,加載到redis,秒殺場景下如果直接訪問關系型數據庫,會引起雪崩效應,系統癱瘓,所以就改為訪問redis,這里是減庫存的時候先減redis,然后異步去減DB。就可以防止系統崩潰。
正題 先看工程目錄
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <groupId>com.lac</groupId> <artifactId>miaosha</artifactId> <version>0.0.2-SNAPSHOT</version> <name>miaosha</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2.2.1.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <!--redis--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.22</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-deploy-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> <plugin> <groupId>com.spotify</groupId> <artifactId>dockerfile-maven-plugin</artifactId> <version>1.4.13</version> <executions> <execution> <id>default</id> <goals> <goal>build</goal> <goal>push</goal> </goals> </execution> </executions> <configuration> <repository>danbing2226/dockerfilemavenplugins</repository> <tag>${project.version}</tag> <useMavenSettingsForAuth>true</useMavenSettingsForAuth> <buildArgs> <JAR_FILE>target/${project.build.finalName}.jar</JAR_FILE> </buildArgs> </configuration> </plugin> </plugins> </build> </project>
application.yml
用的都是我前面文章配置過的東西,直接可以去前面文章找

spring:
application:
name: miaosha
datasource:
# 數據源基本配置
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.142.129:3306/mysql?autoReconnect=true
type: com.alibaba.druid.pool.DruidDataSource
# 數據源其他配置
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 'x'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
filters: stat,wall
maxPoolPreparedStatementPerConnectionSize: 20
useGlobalDataSourceStat: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
rabbitmq:
host: 192.168.142.129
port: 5672
username: root
password: root
cloud:
nacos:
discovery:
server-addr: 192.168.142.129:8848
redis:
database: 0
host: 192.168.142.129
port: 6379
password:
timeout: 500
pool:
max-active: 20
max-wait: -1
max-idle: 8
min-idle: 0
mybatis:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.lac.component.model
# cloud:
# zookeeper:
# connect-string: 192.168.99.100:2181
# discovery:
# enabled: true
server:
port: 8093
ComponentApplication.java

package com.lac.component; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @SpringBootApplication @EnableDiscoveryClient @MapperScan("com.lac.component.dao") public class ComponentApplication { public static void main(String[] args) { SpringApplication.run(ComponentApplication.class, args); } }
RedisConfig
配置redis的鍵值格式化方式

package com.lac.component.redis; import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory ) { //設置序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); RedisSerializer redisSerializer = new FastJsonRedisSerializer(Object.class); // 配置redisTemplate RedisTemplate redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(redisConnectionFactory); RedisSerializer stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer); // key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value序列化 redisTemplate.setHashKeySerializer(stringSerializer); // Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
RabbitConfig配置,這里不細講,后面文章會將到

package com.lac.component.rabbit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String EXCHANGE_B = "my-mq-exchange_B"; public static final String EXCHANGE_C = "my-mq-exchange_C"; public static final String QUEUE_A = "QUEUE_A"; public static final String QUEUE_B = "QUEUE_B"; public static final String QUEUE_C = "QUEUE_C"; public static final String QUEUE_D = "QUEUE_D"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; public static final String ROUTINGKEY_B = "spring-boot-routingKey_B"; public static final String ROUTINGKEY_C = "spring-boot-routingKey_C"; public static final String FANOUT_EXCHANGE = "FANOUT_EXCHANGE"; public static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE"; /** * Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸, * Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。 * Queue:消息的載體,每個消息都會被投到一個或多個隊列。 * Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來. * Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。 * vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權限分離。 * Producer:消息生產者,就是投遞消息的程序. * Consumer:消息消費者,就是接受消息的程序. * Channel:消息通道,在客戶端的每個連接里,可建立多個channel. * 異步登陸日志,業務解耦,流量削峰,秒殺,異步發送注冊郵件,異步發送異常登陸信息。 */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); // connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } @Bean public DirectExchange defaultExchange1() { return new DirectExchange(EXCHANGE_B); } /* *獲取隊列A */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true);//隊列持久 } /* *獲取隊列B */ @Bean public Queue queueB() { return new Queue(QUEUE_B, true);//隊列持久 } /* *獲取隊列C */ @Bean public Queue queueC() { return new Queue(QUEUE_C, true);//隊列持久 } /* *獲取隊列D */ @Bean public Queue queueD() { return new Queue(QUEUE_D, true);//隊列持久 } @Bean public Queue queueMessage() { return new Queue("topic.message",true); } @Bean public Queue queueMessages() { return new Queue("topic.messages",true); } // @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } // // // 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去 // @Bean // public Binding bindingB() { // return BindingBuilder.bind(queueB()).to(defaultExchange1()).with(RabbitConfig.ROUTINGKEY_B); // } //配置fanout_exchange @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE); } @Bean TopicExchange topicExchange(){ return new TopicExchange(this.TOPIC_EXCHANGE); } @Bean Binding bingingExchangeMessage(Queue queueMessage,TopicExchange topicExchange){ return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message"); } @Bean Binding bingingExchangeMessages(Queue queueMessages,TopicExchange topicExchange){ return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.#"); } // @Bean // Binding bingingExchangeFanout(FanoutExchange fanoutExchange){ // return BindingBuilder.bind(queueA()).to(fanoutExchange); // } }
最關鍵一句
把queueA綁到默認的交互機上
@Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); }
核心代碼來了,注意
MsgProducer
生產者,就是把要生產的重要數據傳輸過來,發送個消費者,消費者里面調用service去進行數據庫操作。

package com.lac.component.rabbit; import com.lac.component.rabbit.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.Map; // import java.util.UUID; public class MsgProducer implements RabbitTemplate.ConfirmCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //由於rabbitTemplate的scope屬性設置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動注入 private RabbitTemplate rabbitTemplate; /** * 構造方法注入rabbitTemplate */ @Autowired public MsgProducer(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; //rabbitTemplate如果為單例的話,那回調就是最后設置的內容 rabbitTemplate.setConfirmCallback(this); } public void sendMsg(String goodsId,String content){ // CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //Fanout 就是我們熟悉的廣播模式,給Fanout交換機發送消息,綁定了這個交換機的所有隊列都收到這個消息。 //rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,content); //把消息放入ROUTINGKEY_A對應的隊列當中去,對應的是隊列A //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId); //傳輸對象 Map mp = new HashMap(1024); mp.put("goodsId",goodsId); mp.put("reduce",Integer.valueOf(content)); rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,mp); //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,user,correlationId); } /* * 回調 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回調id:" + correlationData); if (ack) { logger.info("生產者0被消息成功消費"); } else { logger.info("生產者0被消息消費失敗:" + cause ); } } }
MsgReceiver
消費者,看代碼頭部是不是做了一個監聽

package com.lac.component.rabbit; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.lac.component.model.Goods; import com.lac.component.service.GoodsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.List; import java.util.Map; @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MsgReceiver { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private GoodsService goodsService; // @RabbitHandler // public void process(String content) { // logger.info("處理器one接收處理隊列A當中的消息:" +content); // } @RabbitHandler public void process(Map mp){ List<Goods> goodsList = this.goodsService.selectGoods(); ObjectMapper mapper = new ObjectMapper(); Map hashMap = new HashMap<String,Integer>(); //!!!解決linkedHashmap轉實體類的問題 List<Goods> goods1 = mapper.convertValue(goodsList, new TypeReference<List<Goods>>(){}); for(Goods a:goods1){ hashMap.put(a.getGoodsId(),a.getGoodsCount()); } Integer allCount = (Integer)hashMap.get("goods1"); String goodsId = (String) mp.get("goodsId"); Integer reduce = (Integer) mp.get("reduce"); System.out.println("更新成的件數"+String.valueOf(allCount-reduce)); int successFlag = this.goodsService.updateGoods("goods1",allCount-reduce); System.out.println(successFlag+"更新成功"); } }
初始化進來的時候先查數據庫緩存到redis中
initController

package com.lac.component.controller; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.lac.component.model.Goods; import com.lac.component.service.GoodsService; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Controller; import java.util.List; @Controller public class initController implements InitializingBean { @Autowired private RedisTemplate redisTemplate; @Autowired private GoodsService goodsService; @Override public void afterPropertiesSet() throws Exception { List<Goods> goodsList = this.goodsService.selectGoods(); ObjectMapper mapper = new ObjectMapper(); //!!!解決linkedHashmap轉實體類的問題 List<Goods> goods1 = mapper.convertValue(goodsList, new TypeReference<List<Goods>>(){}); for(Goods a:goods1){ redisTemplate.opsForValue().set(a.getGoodsId(),a.getGoodsCount()); System.out.println(redisTemplate.opsForValue().get(a.getGoodsId())); } } }
判斷邏輯我寫在了controller里面,實際項目寫在service
DemoController

package com.lac.component.controller; import com.lac.component.rabbit.MsgProducer; import com.lac.component.rabbit.RabbitConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.util.Random; @RestController public class DemoController { @Autowired private RedisTemplate redisTemplate; @Autowired private RabbitConfig rabbitConfig; @GetMapping("/") public String getHello() { return "hello"; } @GetMapping("/user/{string}") public String test(@PathVariable String string) { return "Hello Nacos :" + string; } @GetMapping("/danbing2226/{string}") public String test1(@PathVariable String string) { return "灰色天空 :" + string; } @GetMapping("/xiawanan/{str}") public String test2(@PathVariable String str) { return "夏婉安的歌曲:"+str; } @GetMapping("/huisetiankong/{str}") public String test3(@PathVariable String str) { return "聽了無數遍:"+str; } @GetMapping("/rabbit") public String send() throws Exception{ String goodsId = "goods1"; Random r = new Random(1); int i = r.nextInt(100); MsgProducer producer = new MsgProducer(rabbitConfig.rabbitTemplate()); System.out.println(redisTemplate.opsForValue().toString()); Integer count = (Integer) redisTemplate.opsForValue().get(goodsId); if(count == 0){ System.out.println("沒庫存了"); return "沒庫存了"; } long kucun = redisTemplate.opsForValue().decrement(goodsId,i); if(kucun <0 ){ count = (Integer) redisTemplate.opsForValue().get(goodsId); if(count != 0 && count < Integer.valueOf(i)){ redisTemplate.opsForValue().increment(goodsId,i); System.out.println("買多了再把庫存還原"); return "買多了再把庫存還原"; }else if(count == 0){ redisTemplate.opsForValue().set(goodsId,0); return "庫存賣完了"; } System.out.println("redis庫存:"+ redisTemplate.opsForValue().get(goodsId)); } producer.sendMsg("goods1",String.valueOf(i)); return "下單成功"; } }
邏輯代碼認真看下,不多,庫存夠就下單成功生產者就發送信息,賣多了就返回庫存,返回信息,這個是簡易版,至於你想返回什么完全根據項目或自己的需要
結合上的生產者和消費者,理解一下就是這么簡單。
為了減少學習成本,這里也用到了mybatis dao層和entity我也都貼出來
GoodsDao
有的項目叫mapper不影響理解。一個查詢一個更新

package com.lac.component.dao; import com.lac.component.model.Goods; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; @Repository public interface GoodsDao { List<Goods> selectGoods(); int updateGoods(@Param("id")String id, @Param("count")Integer count); }
GoodsMapper.xml

<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.lac.component.dao.GoodsDao"> <sql id="GOODS"> goods </sql> <sql id="GOODS_COLUMN"> goodsId,goodsName,goodsPrice,goodsCount </sql> <select id="selectGoods" resultType="com.lac.component.model.Goods"> SELECT <include refid="GOODS_COLUMN"/> FROM <include refid="GOODS"/> </select> <update id="updateGoods" parameterType="com.lac.component.model.Goods"> UPDATE <include refid="GOODS"/> SET goodsCount=#{count} WHERE goodsId=#{id} </update> </mapper>
service層
GoodsService

package com.lac.component.service; import com.lac.component.model.Goods; import java.util.List; public interface GoodsService { List<Goods> selectGoods(); int updateGoods(String id, Integer count); }
GoodsServiceImpl

package com.lac.component.service.impl; import com.lac.component.dao.GoodsDao; import com.lac.component.model.Goods; import com.lac.component.service.GoodsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.Serializable; import java.util.List; @Service(value = "GoodsService") public class GoodsServiceImpl implements GoodsService, Serializable { @Autowired private GoodsDao goodsDao; @Override public List<Goods> selectGoods() { return goodsDao.selectGoods(); } @Override public int updateGoods(String id, Integer count) { return goodsDao.updateGoods(id,count); } }
源碼放在最后了
看下效果
先把服務起起來
再次點擊,不夠賣了。
代碼:
https://gitee.com/danbing_2226/miaosha點擊鏈接