本文探討在web開發中如何解決並發訪問帶來的數據同步問題。
1、需求:
通過REST接口請求並發訪問redis,例如:將key=fusor:${order_id} 中的值+1;
2、場景:
設想,多線程對key=fusor:${order_id}並發訪問觸發了競態條件,例如兩個線程同時發現key=fusor:${order_id}的值為5,然后並且+1回寫6,這個時候就出現了問題,最終的值為6而不是7。
3、粗粒度鎖:
這時候,普遍的做法是加鎖,但是如果對整個訪問redis的動作加鎖,那么等於多個線程串行訪問了!
4、細粒度加鎖:
我們這里的做法是對key進行細粒度加鎖,每個key擁有一把鎖,只對key進行並發控制,key與key之間允許並發。
直接上代碼
package com.xiaoju.dqa.fusor.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Random; import java.util.UUID; @Component public class SexyLockUtil<K> { private static Logger logger = LoggerFactory.getLogger(SexyLockUtil.class); @Autowired private SexyLocker<K> sexyLocker; public KeyLocker<K> getKeyLocker(K key) { return new KeyLocker<K>(sexyLocker, key); } public static class KeyLocker<K> { private UUID uuid = UUID.randomUUID(); private SexyLocker sexyLocker; private K key; public KeyLocker(SexyLocker sexyLocker, K key) { this.sexyLocker = sexyLocker; this.key = key; } public boolean lockWithRetry(int expireTime, int retryTimes) { Random random = new Random(); for (int i = 0; i < retryTimes; i++) { if (this.lock()) { return true; } else { try { Thread.sleep(random.nextInt(50)); } catch (InterruptedException e) { } } } return false; } public boolean lock() { try { this.sexyLocker.lock(key); return true; } catch (Exception e) { return false; } } public boolean unlock() { try { this.sexyLocker.unlock(key); return true; } catch (Exception e) { return false; } } } }
package com.xiaoju.dqa.fusor.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; @Component public class SexyLocker<K> { private static Logger logger = LoggerFactory.getLogger(SexyLockUtil.class); private final ConcurrentMap<K, Semaphore> map = new ConcurrentHashMap<K, Semaphore>(); private final ThreadLocal<Map<K, LockInfo>> local = new ThreadLocal<Map<K, LockInfo>>() { @Override protected Map<K, LockInfo> initialValue() { return new HashMap<K, LockInfo>(); } }; public void lock(K key) { if (key == null) return; LockInfo info = local.get().get(key); if (info == null) { Semaphore current = new Semaphore(1); current.acquireUninterruptibly(); Semaphore previous = map.put(key, current); if (previous != null) previous.acquireUninterruptibly(); local.get().put(key, new LockInfo(current)); } else { info.lockCount++; } } public void unlock(K key) { if (key == null) return; LockInfo info = local.get().get(key); if (info != null && --info.lockCount == 0) { info.current.release(); map.remove(key, info.current); local.get().remove(key); } } public void lock(K[] keys) { if (keys == null) return; for (K key : keys) { lock(key); } } public void unlock(K[] keys) { if (keys == null) return; for (K key : keys) { unlock(key); } } private static class LockInfo { private final Semaphore current; private int lockCount; private LockInfo(Semaphore current) { this.current = current; this.lockCount = 1; } } }