需求分析
在分享源碼之前,先將b2b2c系統中商品模塊需求整理、明確,方便源碼的理解。
業務需求
-
b2b2c電子商務系統中商品的庫存存放在redis和數據庫中,實現發貨退貨等操作庫存的扣減或增加
技術需求
-
redis事務問題,若扣減庫存完成后,發生異常,則redis沒有事務,無法實現數據回滾,導致數據異常
- 采用lua腳本扣減庫存方式,原子性提交操作,lua腳本中實現扣減失敗則回滾操作
-
數據庫中的庫存信息,非實時更新,而是采用緩沖池方式,緩沖池方式可以自主選擇是否開啟
架構思路
商品庫存領域模型架構
基於lua+redis的庫存扣減
GoodsQuantityVO
/** * 商品庫存vo * @author fk * @version v6.4 * @since v6.4 * 2017年9月7日 上午11:23:16 */ public class GoodsQuantityVO implements Cloneable{ private Integer goodsId; private Integer skuId; private Integer quantity; private QuantityType quantityType; public GoodsQuantityVO() {} public GoodsQuantityVO(Integer goodsId, Integer skuId, Integer quantity ) { super(); this.goodsId = goodsId; this.skuId = skuId; this.quantity = quantity; } setter and getter }
GoodsQuantityManager
/** * 商品庫存接口 * @author fk * @version v2.0 * @since v7.0.0 * 2018年3月23日 上午11:47:29 * * @version 3.0 * 統一為一個接口(更新接口)<br/> * 內部實現為redis +lua 保證原子性 -- by kingapex 2019-01-17 */ public interface GoodsQuantityManager { /** * 庫存更新接口 * @param goodsQuantityList 要更新的庫存vo List * @return 如果更新成功返回真,否則返回假 */ Boolean updateSkuQuantity(List<GoodsQuantityVO> goodsQuantityList ); /** * 同步數據庫數據 */ void syncDataBase();
/** * 為某個sku 填充庫存cache<br/> * 庫存數量由數據庫中獲取<br/> * 一般用於緩存被擊穿的情況 * @param skuId * @return 可用庫存和實際庫存 */ Map<String,Integer> fillCacheFromDB(int skuId); }
GoodsQuantityManagerImpl
庫存業務類基於lua+redis的實現:
/** * 商品庫存接口 * * @author fk * @author kingapex * @version v2.0 written by kingapex 2019年2月27日 * 采用lua腳本執行redis中的庫存扣減<br/> * 數據庫的更新采用非時時同步<br/> * 而是建立了一個緩沖池,當達到一定條件時再同步數據庫<br/> * 這樣條件有:緩沖區大小,緩沖次數,緩沖時間<br/> * 上述條件在配置中心可以配置,如果沒有配置采用 ${@link UpdatePool} 默認值<br/> * 在配置項說明:<br/> * <li>緩沖區大小:javashop.pool.stock.max-pool-size</li> * <li>緩沖次數:javashop.pool.stock.max-update-time</li> * <li>緩沖時間(秒數):javashop.pool.stock.max-lazy-second</li> * @see JavashopConfig */ @Service public class GoodsQuantityManagerImpl implements GoodsQuantityManager { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private DaoSupport daoSupport; @Autowired private JavashopConfig javashopConfig; /** * sku庫存更新緩沖池 */ private static UpdatePool skuUpdatePool; /** * goods庫存更新緩沖池 */ private static UpdatePool goodsUpdatePool; /** * 單例獲取sku pool ,初始化時設置參數 * * @return */ private UpdatePool getSkuPool() { if (skuUpdatePool == null) { skuUpdatePool = new UpdatePool(javashopConfig.getMaxUpdateTime(), javashopConfig.getMaxPoolSize(), javashopConfig.getMaxLazySecond()); logger.debug("初始化sku pool:"); logger.debug(skuUpdatePool.toString()); } return skuUpdatePool; } /** * 單例獲取goods pool ,初始化時設置參數 * * @return */ private UpdatePool getGoodsPool() { if (goodsUpdatePool == null) { goodsUpdatePool = new UpdatePool(javashopConfig.getMaxUpdateTime(), javashopConfig.getMaxPoolSize(), javashopConfig.getMaxLazySecond()); } return goodsUpdatePool; } @Autowired public StringRedisTemplate stringRedisTemplate; private static RedisScript<Boolean> script = null; private static RedisScript<Boolean> getRedisScript() { if (script != null) { return script; } ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("sku_quantity.lua")); String str = null; try { str = scriptSource.getScriptAsString(); } catch (IOException e) { e.printStackTrace(); } script = RedisScript.of(str, Boolean.class); return script; } @Override public Boolean updateSkuQuantity(List<GoodsQuantityVO> goodsQuantityList) { List<Integer> skuIdList = new ArrayList(); List<Integer> goodsIdList = new ArrayList(); List keys = new ArrayList<>(); List values = new ArrayList<>(); for (GoodsQuantityVO quantity : goodsQuantityList) { Assert.notNull(quantity.getGoodsId(), "goods id must not be null"); Assert.notNull(quantity.getSkuId(), "sku id must not be null"); Assert.notNull(quantity.getQuantity(), "quantity id must not be null"); Assert.notNull(quantity.getQuantityType(), "Type must not be null"); //sku庫存 if (QuantityType.enable.equals(quantity.getQuantityType())) { keys.add(StockCacheKeyUtil.skuEnableKey(quantity.getSkuId())); } else if (QuantityType.actual.equals(quantity.getQuantityType())) { keys.add(StockCacheKeyUtil.skuActualKey(quantity.getSkuId())); } values.add("" + quantity.getQuantity()); //goods庫存key if (QuantityType.enable.equals(quantity.getQuantityType())) { keys.add(StockCacheKeyUtil.goodsEnableKey(quantity.getGoodsId())); } else if (QuantityType.actual.equals(quantity.getQuantityType())) { keys.add(StockCacheKeyUtil.goodsActualKey(quantity.getGoodsId())); } values.add("" + quantity.getQuantity()); skuIdList.add(quantity.getSkuId()); goodsIdList.add(quantity.getGoodsId()); } RedisScript<Boolean> redisScript = getRedisScript(); Boolean result = stringRedisTemplate.execute(redisScript, keys, values.toArray()); logger.debug("更新庫存:"); logger.debug(goodsQuantityList.toString()); logger.debug("更新結果:" + result); //如果lua腳本執行成功則記錄緩沖區 if (result) { //判斷配置文件中設置的商品庫存緩沖池是否開啟 if (javashopConfig.isStock()) { //是否需要同步數據庫 boolean needSync = getSkuPool().oneTime(skuIdList); getGoodsPool().oneTime(goodsIdList); logger.debug("是否需要同步數據庫:" + needSync); logger.debug(getSkuPool().toString()); //如果開啟了緩沖池,並且緩沖區已經飽和,則同步數據庫 if (needSync) { syncDataBase(); } } else { //如果未開啟緩沖池,則實時同步商品數據庫中的庫存數據 syncDataBase(skuIdList, goodsIdList); } } return result; } @Override public void syncDataBase() { //獲取同步的skuid 和goodsid List<Integer> skuIdList = getSkuPool().getTargetList(); List<Integer> goodsIdList = getGoodsPool().getTargetList(); logger.debug("goodsIdList is:"); logger.debug(goodsIdList.toString()); //判斷要同步的goods和sku集合是否有值 if (skuIdList.size() != 0 && goodsIdList.size() != 0) { //同步數據庫 syncDataBase(skuIdList, goodsIdList); } //重置緩沖池 getSkuPool().reset(); getGoodsPool().reset(); } @Override public Map<String, Integer> fillCacheFromDB(int skuId) { Map<String, Integer> map = daoSupport.queryForMap("select enable_quantity,quantity from es_goods_sku where sku_id=?", skuId); Integer enableNum = map.get("enable_quantity"); Integer actualNum = map.get("quantity"); stringRedisTemplate.opsForValue().set(StockCacheKeyUtil.skuActualKey(skuId), "" + actualNum); stringRedisTemplate.opsForValue().set(StockCacheKeyUtil.skuEnableKey(skuId), "" + enableNum); return map; } /** * 同步數據庫中的庫存 * * @param skuIdList 需要同步的skuid * @param goodsIdList 需要同步的goodsid */ private void syncDataBase(List<Integer> skuIdList, List<Integer> goodsIdList) { //要形成的指更新sql List<String> sqlList = new ArrayList<String>(); //批量獲取sku的庫存 List skuKeys = StockCacheKeyUtil.skuKeys(skuIdList); List<String> skuQuantityList = stringRedisTemplate.opsForValue().multiGet(skuKeys); int i = 0; //形成批量更新sku的list for (Integer skuId : skuIdList) { String sql = "update es_goods_sku set enable_quantity=" + skuQuantityList.get(i) + ", quantity=" + skuQuantityList.get(i + 1) + " where sku_id=" + skuId; daoSupport.execute(sql); i = i + 2; } //批量獲取商品的庫存 List goodsKeys = createGoodsKeys(goodsIdList); List<String> goodsQuantityList = stringRedisTemplate.opsForValue().multiGet(goodsKeys); i = 0; //形成批量更新goods的list for (Integer goodsId : goodsIdList) { String sql = "update es_goods set enable_quantity=" + goodsQuantityList.get(i) + ", quantity=" + goodsQuantityList.get(i + 1) + " where goods_id=" + goodsId; daoSupport.execute(sql); i = i + 2; } } /** * 生成批量獲取goods庫存的keys * * @param goodsIdList * @return */ private List createGoodsKeys(List<Integer> goodsIdList) { List keys = new ArrayList(); for (Integer goodsId : goodsIdList) { keys.add(StockCacheKeyUtil.goodsEnableKey(goodsId)); keys.add(StockCacheKeyUtil.goodsActualKey(goodsId)); } return keys; } }
sku_quantity.lua
庫存扣減lua腳本
-- 可能回滾的列表,一個記錄要回滾的skuid一個記錄庫存 local skuid_list= {} local stock_list= {} local arg_list = ARGV; local function cut ( key , num ) KEYS[1] = key; local value = redis.call("get",KEYS[1]) if not value then value = 0; end value=value+num if(value<0) then -- 發生超賣 return false; end redis.call("set",KEYS[1],value) return true end local function rollback ( ) for i,k in ipairs (skuid_list) do -- 還原庫存 KEYS[1] = k; redis.call("incrby",KEYS[1],0-stock_list[i]) end end local function doExec() for i, k in ipairs (arg_list) do local num = tonumber(k) local key= KEYS[i] local result = cut(key,num) -- 發生超賣,需要回滾 if (result == false) then rollback() return false else -- 記錄可能要回滾的數據 table.insert(skuid_list,key) table.insert(stock_list,num) end end return true; end return doExec()
JavashopConfig
緩沖池相關設置信息
/** * javashop配置 * * @author zh * @version v7.0 * @date 18/4/13 下午8:19 * @since v7.0 */ @Configuration @ConfigurationProperties(prefix = "javashop") @SuppressWarnings("ConfigurationProperties") public class JavashopConfig { /** * 緩沖次數 */ @Value("${javashop.pool.stock.max-update-timet:#{null}}") private Integer maxUpdateTime; /** * 緩沖區大小 */ @Value("${javashop.pool.stock.max-pool-size:#{null}}") private Integer maxPoolSize; /** * 緩沖時間(秒數) */ @Value("${javashop.pool.stock.max-lazy-second:#{null}}") private Integer maxLazySecond; /** * 商品庫存緩沖池開關 * false:關閉(如果配置文件中沒有配置此項,則默認為false) * true:開啟(優點:緩解程序壓力;缺點:有可能會導致商家中心商品庫存數量顯示延遲;) */ @Value("${javashop.pool.stock:#{false}}") private boolean stock; public JavashopConfig() { } setter and getter... }
以上是javashop中商品模塊扣減庫存的思路以及相關源碼。
易族智匯(javashop)原創文章