無論是http接口,還是rpc接口,防重復提交(接口防重)都是繞不過的話題。
重復提交與冪等,既有區別,又有聯系。冪等的意思是,對資源的一次請求與多次請求,作用是相同的。例如,HTTP的POST方法是非冪等的。如果程序處理不好,重復提交會導致非冪等,引起系統數據故障。防重復提交,當屬於冪等的范疇,首先通過技術手段來實現,其次,又要有對業務數據的唯一性驗證。
常見的B/S場景的重復提交,用戶手抖或因為網絡問題,服務端在極短時間內兩次甚至更多次收到同樣的http請求。
rpc接口的重復提交,一種是不恰當的程序調用,即程序漏洞導致重復提交。在一種,比如拿dubbo來說,因為網絡傳輸問題,會觸發重試調用。
防重提交的方案,常見的是加鎖。分布式系統,一般是借助redis或zk等分布式鎖。對於java單體應用,有網友說可以用語言本身的synchronized鎖機制,嚴格來說,這樣是不恰當的,因為synchronized是多線程下的同步鎖,只會阻塞線程執行,而不會阻斷線程的執行。
【說明幾點】
- lockKey的設置 加鎖是為了攔截重復請求。key一定要與業務操作請求的請求數據有關,具有系統全局唯一性。通常的命名規范是業務操作前綴+業務數據,比如key="user.add."+userVo.toString(); key="withdraw."+userId。
-
反例:key=“withdraw.”;。這是一棒子打死的節奏。鎖的顆粒度太大了,一個用戶提現操作在執行的過程中,其他用戶都無法進行提現操作。對於通常的系統提現場景來說,很顯然說不過去。
-
反例:key="withdraw."+userId+DateUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss");。因為加了時間戳,那么,非這個時刻的相同用戶的提現請求就能重新上鎖,從而達不到分布式鎖的效果。同樣的還有加UUID或雪花算法生成的唯一Id,都達不到分布式鎖的控制效果。
- 鎖的有效期 下文代碼會提到,釋放鎖的代碼在finally中執行,以保證當程序因異常中斷的時候仍然能夠釋放鎖。不過,jvm也會導致finally不被執行。加鎖的時間需要按業務邏輯執行時間來評估一個保守的值。太短自然是不行的,會導致重入故障;如果太久,在沒有及時釋放鎖時候,其他請求無法進入,就會產生死鎖。
- 上鎖的原子性 有的上鎖方案是利用jedis的setnx(lockKey, requestId)和expire(lockKey, expireTime),因為是兩條命令,所以不具備原子性。假如程序執行了setnx后突然崩潰,會導致鎖一直存在,最終導致死鎖。
- 關於釋放鎖 釋放鎖原則:解鈴還須系鈴人。即,clientA(線程A)上鎖,只能由clientA(線程A)來解鎖。釋放鎖的操作同樣要具備原子性。另,如上文所述,釋放鎖的代碼在finally代碼中執行。
redis分布式鎖的實現
類圖:
RedisDistributedLock
package com.emax.zhenghe.rpcapi.provider.config.distributeRedisLock; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; @Component @Slf4j public class RedisDistributedLock extends AbstractDistributedLock { @Autowired @Resource private RedisTemplate<Object, Object> redisTemplate; private ThreadLocal<String> lockFlag = new ThreadLocal<String>(); public static final String UNLOCK_LUA; static { StringBuilder sb = new StringBuilder(); sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] "); sb.append("then "); sb.append(" return redis.call(\"del\",KEYS[1]) "); sb.append("else "); sb.append(" return 0 "); sb.append("end "); UNLOCK_LUA = sb.toString(); } public RedisDistributedLock() { super(); } @Override public boolean lock(String key, long expire, int retryTimes, long sleepMillis) { boolean result = setRedis(key, expire); // 如果獲取鎖失敗,按照傳入的重試次數進行重試 while ((!result) && retryTimes-- > 0) { try { log.debug("lock failed, retrying..." + retryTimes); Thread.sleep(sleepMillis); } catch (InterruptedException e) { return false; } result = setRedis(key, expire); } return result; } /** * * @param key * @param expire MILLISECONDS * @return */ private boolean setRedis(final String key, final long expire) { try { String uuid = UUID.randomUUID().toString(); lockFlag.set(uuid); return redisTemplate.opsForValue().setIfAbsent(key,uuid,expire,TimeUnit.MILLISECONDS); } catch (Exception e) { log.info("redis lock error.", e); } return false; } @Override public boolean releaseLock(String key) { // 釋放鎖的時候,有可能因為持鎖之后方法執行時間大於鎖的有效期,此時有可能已經被另外一個線程持有鎖,所以不能直接刪除 try { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<Boolean>(UNLOCK_LUA,Boolean.class); return redisTemplate.execute(defaultRedisScript,Arrays.asList(key),lockFlag.get()); } catch (Exception e) { log.error("release lock occured an exception", e); } finally { // 清除掉ThreadLocal中的數據,避免內存溢出 lockFlag.remove(); } return false; } }
AbstractDistributedLock
package com.emax.zhenghe.rpcapi.provider.config.distributeRedisLock; public abstract class AbstractDistributedLock implements DistributedLock { @Override public boolean lock(String key) { return lock(key , TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS); } @Override public boolean lock(String key, int retryTimes, long sleepMillis) { return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis); } @Override public boolean lock(String key, long expire) { return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS); } @Override public boolean lock(String key, long expire, int retryTimes) { return lock(key, expire, retryTimes, SLEEP_MILLIS); } }
DistributedLock
package com.emax.zhenghe.rpcapi.provider.config.distributeRedisLock; public interface DistributedLock { long TIMEOUT_MILLIS = 30000; int RETRY_TIMES = 2; long SLEEP_MILLIS = 500; boolean lock(String key);boolean lock(String key, int retryTimes, long sleepMillis); boolean lock(String key, long expire); boolean lock(String key, long expire, int retryTimes); boolean lock(String key, long expire, int retryTimes, long sleepMillis); boolean releaseLock(String key); }
調用:
@Autowired private RedisDistributedLock distributedLock; @Test public void lock11() throws Exception { String key = "examplekey" + System.currentTimeMillis(); try { boolean lock = distributedLock.lock(key, 2000L, 1, 100L); log.info("===================" + lock); } finally { distributedLock.releaseLock(key); } }
進一步封裝,實現代碼解耦
上面的加鎖和釋放鎖都暴露在了業務調用方,增加了業務調用方的職責,同時,如果使用不當,還會產生bug。
接下來,我們稍作重構。看看下面的RedisLockTemplate
package com.emax.zhenghe.rpcapi.provider.config.distributeRedisLock; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * redis分布式鎖並發控制模板類 * * @author zhangguozhan */ @Slf4j @Component public class RedisLockTemplate { @Autowired private RedisDistributedLock redisDistributedLock; /** * redis分布式鎖控制 * * @param key 鎖名 * @param expireMS 鎖的生命周期,單位:毫秒 * @param redisLockCallback callback方法 * @return */ public Object execute(String key, long expireMS, RedisLockCallback redisLockCallback) { return execute(key, expireMS, redisLockCallback, true, 2); } /** * redis分布式鎖控制 * * @param key * @param expireMS * @param redisLockCallback * @param isAutoReleaseLock callback方法執行完成后自動釋放鎖 * @return */ public Result execute(String key, long expireMS, RedisLockCallback redisLockCallback, boolean isAutoReleaseLock, int retryTimes) { log.info("redis分布式鎖控制 key={}", key); if (StringUtils.isBlank(key)) { log.info("try lock failure:key is null"); return null; } boolean lock = redisDistributedLock.lock(key, expireMS, retryTimes); if (lock) { try { Result o = redisLockCallback.doInRedisLock(); return o; } finally { if (isAutoReleaseLock) { redisDistributedLock.releaseLock(key); } } } else { log.info("###key已存在,終止 key={}", key); return Result.error(ResultCodeEnum.GET_LOCK_FAIL, "請勿重復發起"); } } }
RedisLockCallback是一個函數式接口
package com.emax.zhenghe.rpcapi.provider.config.distributeRedisLock; @FunctionalInterface public interface RedisLockCallback { Result doInRedisLock(); }
這樣,業務的調用就變得很easy了。例如:
Result result = redisLockTemplate.execute(key, 5000L, () -> { List<BillVo> billVos = batchInsert(list); return Result.ok(billVos); });
關於ajax異步請求
現在的web項目一般都是采用前后端分離的開發模式了,前端的程序框架也百花齊放,常見的有vue、nodejs等等。
對於用戶手抖導致的重復提交,服務端的做法就是利用上面的分布式控制,非首次的請求因為上鎖失敗而中斷處理,前端收到的是“請勿重復提交”這樣的提示。我原以為這樣可能會影響用戶體驗。后來咨詢前端同事,原來事實並非如此。
自己寫了一個demo,模擬重復提交。頁面異步重復發起相同的請求,服務端重復處理。第一次是加鎖,正常處理請求,第二次是發現鎖已存在,上鎖失敗,直接返回“請勿重復提交”的提示。頁面會收到兩次的響應結果。不過,因為第二次的請求上鎖失敗直接返回錯誤提示,所以響應早於第一次的響應。ajax判斷響應的邏輯是如果是成功(正常響應,視為成功),就觸發相應的后續處理,如果是失敗(“請勿重復提交”視為失敗),就toast提示。 因此,雖然toast了一下,只是一瞬間,第一次請求的響應來了之后,就會正常處理頁面邏輯。
所以,上面的防重機制,也是比較合適的方案。
當然,應該校驗的業務邏輯還是要有的,尤其是數據校驗。這屬於業務范疇了。
本文代碼已放到github:https://github.com/buguge/api-idempotent.git
前端頁面異步請求API,可參閱其中的代碼:https://github.com/buguge/api-idempotent/blob/master/mideng/src/main/webapp/user.jsp