並發場景中大部分處理的是先更新DB,再(刪緩、更新)緩存的處理方式,但是在實際場景中有可能DB更新成功了,但是緩存設置失敗了,就造成了緩存與DB數據不一致的問題,下面就以實際情況說下怎么解決此類問題。
名詞 Cache:本文內指redis,ReadRequest:請求從Cache、Db中拿去數據,WriteRequest:數據寫入DB並刪除緩存
若要保證數據庫與緩存一直,我們需要采用先刪緩存,在更新DB的情況,這時候有的同學可能會問,如果緩存刪除成功了,而DB更新失敗了怎么辦,其實仔細考慮一下,DB雖然失敗了,那真正是不會產生數據影響的,而當下次一次請求進來的時候,我們重新把DB中未更新的數據重新塞入緩存,從結果上來看是沒有影響的。我們把請求分為ReadRequest 、WriteRequest,大部分同學都知道我們在使用Cache時 首先都會去Cache內查一下,如果Cache中沒有拿到數據我們在從數據庫中去獲取數據,這個時候在高並發的場景的踩過坑的同學都知道恰巧在這時候有更新請求把緩存刪除了,這時候大量請求進來,Cache內沒有此項數據,請求就會直接落在DB上,就很容易造成緩存雪崩,數據庫很可能瞬時就掛掉了,所以處理方案就是我們需要對查詢寫入的緩存進行排隊處理,而正確從cache內獲取的姿勢:
1、每次查詢數據的時候我們吧請求數據放入隊列,由隊列消費者去檢查一下cache是否存在,不存在則進行插入,存在就跳過
2、當前readRequest就自循環,我們不斷嘗試從cache內去獲取數據,拿到數據或超時當前線程立即退出
3、如果拿到數據了就返回結果,沒有拿到數據我們就從DB去查
而WriteRequest 的處理相對就簡單多了我們直接刪除緩存后,更新DB即可,下面上代碼說明:
消息隊列這里我們基於jdk並發包內的BlockingQueue進行實現,使用MQ(Rabbit,Kafka等)的話思想差不多,只是需要交互一次mq的服務端。首先項目啟動時我們在程序后台開辟監聽線程,從數據共享緩沖區(ArrayBlockingQueue)內監聽消息
public class BlockQueueThreadPool { /** * 核心線程數 */
private Integer corePoolSize = 10; /** * 線程池最大線程數 */
private Integer maximumPoolSize = 20; /** * 線程最大存活時間 */
private Long keepAliveTime = 60L; private ExecutorService threadPool = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(this.corePoolSize)); public BlockQueueThreadPool() { RequestQueue requestQueue = RequestQueue.getInstance(); BlockingQueue<RequestAction> queue = new ArrayBlockingQueue<>(this.corePoolSize); requestQueue.add(queue); this.threadPool.submit(new JobThread(queue)); } }
PS:ArrayBlockingQueue中很好的利用了Condition中的等待和通知功能,這里我們就能實現對共享通道隊列的事件監聽了。
public class JobThread implements Callable<Boolean> { private BlockingQueue<RequestAction> queue; public JobThread(BlockingQueue<RequestAction> queue) { this.queue = queue; } @Override public Boolean call() throws Exception { try { while (true) { // ArrayBlockingQueue take方法 獲取隊列排在首位的對象,如果隊列為空或者隊列滿了,則會被阻塞住
RequestAction request = this.queue.take(); RequestQueue requestQueue = RequestQueue.getInstance(); Map<String, Boolean> tagMap = requestQueue.getTagMap(); if (request instanceof ReadRequest) { Boolean tag = tagMap.get(request.getIdentity()); if (null == tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && !tag) { return Boolean.TRUE; } } else if (request instanceof WriteRequest) { // 如果是更新數據庫的操作
tagMap.put(request.getIdentity(), Boolean.TRUE); } // 執行請求處理
log.info("緩存隊列執行+++++++++++++++++,{}", request.getIdentity()); request.process(); } } catch (Exception e) { e.printStackTrace(); } return Boolean.TRUE; } }
接下來就要定義我們的WriteRequest、ReadRequest了
@Slf4j public class ReadRequest<TResult> extends BaseRequest { public ReadRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { TResult result = (TResult) action.exec(); if (Objects.isNull(result)) { //防止緩存擊穿
redis.set(cacheKey, "", 10000); } else { redis.set(cacheKey, result, 10000); } } }
public class WriteRequest<TResult> extends BaseRequest { public WriteRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { redis.del(cacheKey); action.exec(); } }
這里我們需要坐下判斷,在數據庫內查詢數據為空后把“”寫入了緩存,這樣子是避免有人惡意請求不存在的數據時造成緩存擊穿。接下來就是我們針對各項業務場景中需要獲取與更新緩存的路由端了
@UtilityClass public class RouteUtils { public static void route(RequestAction requestAction) { try { BlockingQueue<RequestAction> queue = RequestQueue.getInstance().getQueue(0); queue.put(requestAction); } catch (Exception e) { e.printStackTrace(); } } }
public class RequestQueue { private RequestQueue() { } private List<BlockingQueue<RequestAction>> queues = new ArrayList<>(); private Map<String, Boolean> tagMap = new ConcurrentHashMap<>(1); private static class Singleton { private static RequestQueue queue; static { queue = new RequestQueue(); } private static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance() { return Singleton.getInstance(); } public void add(BlockingQueue<RequestAction> queue) { this.queues.add(queue); } public BlockingQueue<RequestAction> getQueue(int index) { return this.queues.get(index); } public int size() { return this.queues.size(); } public Map<String, Boolean> getTagMap() { return this.tagMap; } }
這里有一個小的知識點,很多時候我們在保證線程安全的時候多數會使用DSL雙鎖模型,但是我始終覺得這類代碼不夠美觀,所以我們可以利用JVM的類加載原則,使用靜態類包裹初始化類,這樣子也一定能保證單例模型,並且代碼也更美觀了。接下來就可以看下Service的代碼
@Service public class StudentService { public Student getStudent(String name) { ReadRequest<Student> readRequest = new ReadRequest<>(name, () -> Student.builder().name(name).age(3).build()); return CacheProcessor.builder().build().getData(readRequest); } public void update(Student student) { WriteRequest<Student> writeRequest = new WriteRequest<>(student.getName(), () -> student); CacheProcessor.builder().build().setData(writeRequest); } }
Service內直接調用了Cachce的處理者,我們通過處理者來獲取緩存與更新緩存
@Builder public class CacheProcessor { public <TResult> TResult getData(ReadRequest readRequest) { try { RouteUtils.route(readRequest); long startTime = System.currentTimeMillis(); long waitTime = 0L; while (true) { if (waitTime > 3000) { break; } TResult result = (TResult) readRequest.redis.get(readRequest.getIdentity()); if (!Objects.isNull(result)) { return result; } else { Thread.sleep(20); waitTime = System.currentTimeMillis() - startTime; } } return (TResult) readRequest.get(); } catch (Exception e) { return null; } } public void setData(WriteRequest writeRequest){ RouteUtils.route(writeRequest); } }
這里我們就先把請求數據發送到數據共享渠道,消費者端與當前的ReadRequest線程同步執行,拿到數據后ReadRequest就立馬退出,超時后我們就從數據庫中獲取數據。這里面我使用了java8 @FunctionalInterface 標記接口,對各個業務中需要用到緩存的地方統一進行封裝方便調用,以上的代碼就已經基本說明並發中Db和Cache雙休一致性的解決思路,聰明的小伙伴肯定能看出其實還有很多優化的地方,比如說我們栗子中是單線程吞吐量不高,采用多線程與多消費者端的時候我們還需要保證商品的更新和讀取請求需要落在同一個消費者端等等問題。或者在使用外部MQ時,我們除了要考慮以上同一商品的讀寫保證落在一個消費節點上,還需要考慮隊列內有插入緩存請求的時候需要跳過的處理等等,更多情況還需要根據實際情況大家自己去發現咯
參考:中華石杉的教程