解決方案總結:
由於數據庫層面的讀寫並發,引發的數據庫與緩存數據不一致的問題(本質是后發生的讀請求先返回了),可能通過兩個小的改動解決:
1)修改服務Service連接池,id取模選取服務連接,能夠保證同一個數據的讀寫都落在同一個后端服務上
“同一個數據的訪問一定落到同一個服務上”
獲取Service連接的CPool.GetServiceConnection()【返回任何一個可用Service連接】改為CPool.GetServiceConnection(longid)【返回id取模相關聯的Service連接】這樣的話,就能夠保證同一個數據例如uid的請求落到同一個服務Service上
2)修改數據庫DB連接池,id取模選取DB連接,能夠保證同一個數據的讀寫在數據庫層面是串行的
“讓同一數據請求落到同一個數據庫鏈接中“讓同一個數據的訪問能串行化”
“讓同一個數據的訪問通過同一條DB連接執行”
“在DB連接池層面稍微修改,按數據取連接即可”
獲取DB連接的CPool.GetDBConnection()【返回任何一個可用DB連接】改為CPool.GetDBConnection(longid)【返回id取模相關聯的DB連接】
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
redis系列之數據庫與緩存數據一致性解決方案
數據庫與緩存讀寫模式策略
數據庫與緩存雙寫情況下導致數據不一致問題
場景一
場景一解決方案
場景二
場景二解決方案
這里有一個優化點,如果發現隊列里有一個查詢請求了,那么就不要放新的查詢操作進去了,用一個while(true)循環去查詢緩存,循環個200MS左右,如果緩存里還沒有則直接取數據庫的舊數據,一般情況下是可以取到的。
(1)讀請求時長阻塞
由於讀請求進行了非常輕度的異步化,所以一定要注意讀超時的問題,每個讀請求必須在超時間內返回,該解決方案最大的風險在於可能數據更新很頻繁,導致隊列中擠壓了大量的更新操作在里面,然后讀請求會發生大量的超時,最后導致大量的請求直接走數據庫,像遇到這種情況,一般要做好足夠的壓力測試,如果壓力過大,需要根據實際情況添加機器。
(2)請求並發量過高
這里還是要做好壓力測試,多模擬真實場景,並發量在最高的時候QPS多少,扛不住就要多加機器,還有就是做好讀寫比例是多少
(3)多服務實例部署的請求路由
可能這個服務部署了多個實例,那么必須保證說,執行數據更新操作,以及執行緩存更新操作的請求,都通過nginx服務器路由到相同的服務實例上
(4)熱點商品的路由問題,導致請求的傾斜
某些商品的讀請求特別高,全部打到了相同的機器的相同丟列里了,可能造成某台服務器壓力過大,因為只有在商品數據更新的時候才會清空緩存,然后才會導致讀寫並發,所以更新頻率不是太高的話,這個問題的影響並不是很大,但是確實有可能某些服務器的負載會高一些。
數據庫與緩存數據一致性解決方案流程圖
數據庫與緩存數據一致性解決方案對應代碼
商品庫存實體
package com.shux.inventory.entity; public class InventoryProduct { private Integer productId; private Long InventoryCnt; public Integer getProductId() { return productId; } public void setProductId(Integer productId) { this.productId = productId; } public Long getInventoryCnt() { return InventoryCnt; } public void setInventoryCnt(Long inventoryCnt) { InventoryCnt = inventoryCnt; } }
請求接口
public interface Request { public void process(); public Integer getProductId(); public boolean isForceFefresh(); }
數據更新請求
package com.shux.inventory.request; import org.springframework.transaction.annotation.Transactional; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; /** ********************************************** * 描述:更新庫存信息 * 1、先刪除緩存中的數據 * 2、更新數據庫中的數據 ********************************************** **/ public class InventoryUpdateDBRequest implements Request{ private InventoryProductBiz inventoryProductBiz; private InventoryProduct inventoryProduct; public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){ this.inventoryProduct = inventoryProduct; this.inventoryProductBiz = inventoryProductBiz; } @Override @Transactional public void process() { inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId()); inventoryProductBiz.updateInventoryProduct(inventoryProduct); } @Override public Integer getProductId() { // TODO Auto-generated method stub return inventoryProduct.getProductId(); } @Override public boolean isForceFefresh() { // TODO Auto-generated method stub return false; } }
查詢請求
package com.shux.inventory.request; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; /** ********************************************** * 描述:查詢緩存數據 * 1、從數據庫中查詢 * 2、從數據庫中查詢后插入到緩存中 ********************************************** **/ public class InventoryQueryCacheRequest implements Request { private InventoryProductBiz inventoryProductBiz; private Integer productId; private boolean isForceFefresh; public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) { this.productId = productId; this.inventoryProductBiz = inventoryProductBiz; this.isForceFefresh = isForceFefresh; } @Override public void process() { InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId); inventoryProductBiz.setInventoryProductCache(inventoryProduct); } @Override public Integer getProductId() { // TODO Auto-generated method stub return productId; } public boolean isForceFefresh() { return isForceFefresh; } public void setForceFefresh(boolean isForceFefresh) { this.isForceFefresh = isForceFefresh; } }
spring啟動時初始化隊列線程池
package com.shux.inventory.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; import com.shux.utils.other.SysConfigUtil; /** ********************************************** * 描述:請求處理線程池,初始化隊列數及每個隊列最多能處理的數量 ********************************************** **/ public class RequestProcessorThreadPool { private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString()); private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString()); private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum); private RequestProcessorThreadPool(){ for(int i=0;i<blockingQueueNum;i++){//初始化隊列 ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(queueDataNum);//每個隊列中放100條數據 RequestQueue.getInstance().addQueue(queue); threadPool.submit(new RequestProcessorThread(queue));//把每個queue交個線程去處理,線程會處理每個queue中的數據 } } public static class Singleton{ private static RequestProcessorThreadPool instance; static{ instance = new RequestProcessorThreadPool(); } public static RequestProcessorThreadPool getInstance(){ return instance; } } public static RequestProcessorThreadPool getInstance(){ return Singleton.getInstance(); } /** * 初始化線程池 */ public static void init(){ getInstance(); } }
請求處理線程
package com.shux.inventory.thread; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import com.shux.inventory.request.InventoryUpdateDBRequest; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; /** ********************************************** * 描述:請求處理線程 ********************************************** **/ public class RequestProcessorThread implements Callable<Boolean>{ private ArrayBlockingQueue<Request> queue; public RequestProcessorThread(ArrayBlockingQueue<Request> queue){ this.queue = queue; } @Override public Boolean call() throws Exception { Request request = queue.take(); Map<Integer,Boolean> flagMap = RequestQueue.getInstance().getFlagMap(); //不需要強制刷新的時候,查詢請求去重處理 if (!request.isForceFefresh()){ if (request instanceof InventoryUpdateDBRequest) {//如果是更新請求,那就置為false flagMap.put(request.getProductId(), true); } else { Boolean flag = flagMap.get(request.getProductId()); /** * 標志位為空,有三種情況 * 1、沒有過更新請求 * 2、沒有查詢請求 * 3、數據庫中根本沒有數據 * 在最初情況,一旦庫存了插入了數據,那就好會在緩存中也會放一份數據, * 但這種情況下有可能由於redis中內存滿了,redis通過LRU算法把這個商品給清除了,導致緩存中沒有數據 * 所以當標志位為空的時候,需要從數據庫重查詢一次,並且把標志位置為false,以便后面的請求能夠從緩存中取 */ if ( flag == null) { flagMap.put(request.getProductId(), false); } /** * 如果不為空,並且flag為true,說明之前有一次更新請求,說明緩存中沒有數據了(更新緩存會先刪除緩存), * 這個時候就要去刷新緩存,即從數據庫中查詢一次,並把標志位設置為false */ if ( flag != null && flag) { flagMap.put(request.getProductId(), false); } /** * 這種情況說明之前有一個查詢請求,並且把數據刷新到了緩存中,所以這時候就不用去刷新緩存了,直接返回就可以了 */ if (flag != null && !flag) { flagMap.put(request.getProductId(), false); return true; } } } request.process(); return true; } }
請求隊列
package com.shux.inventory.request; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; /** ********************************************** * 描述:請求隊列 ********************************************** **/ public class RequestQueue { private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>(); private Map<Integer,Boolean> flagMap = new ConcurrentHashMap<>(); private RequestQueue(){ } private static class Singleton{ private static RequestQueue queue; static{ queue = new RequestQueue(); } public static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance(){ return Singleton.getInstance(); } public void addQueue(ArrayBlockingQueue<Request> queue) { queues.add(queue); } public int getQueueSize(){ return queues.size(); } public ArrayBlockingQueue<Request> getQueueByIndex(int index) { return queues.get(index); } public Map<Integer,Boolean> getFlagMap() { return this.flagMap; } }
spring 啟動初始化線程池類
package com.shux.inventory.listener; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import com.shux.inventory.thread.RequestProcessorThreadPool; /** ********************************************** * 描述:spring 啟動初始化線程池類 ********************************************** **/ public class InitListener implements ApplicationListener<ContextRefreshedEvent>{ @Override public void onApplicationEvent(ContextRefreshedEvent event) { // TODO Auto-generated method stub if(event.getApplicationContext().getParent() != null){ return; } RequestProcessorThreadPool.init(); } }
異步處理請求接口
package com.shux.inventory.biz; import com.shux.inventory.request.Request; /** ********************************************** * 描述:請求異步處理接口,用於路由隊列並把請求加入到隊列中 ********************************************** **/ public interface IRequestAsyncProcessBiz { void process(Request request); }
異步處理請求接口實現
package com.shux.inventory.biz.impl; import java.util.concurrent.ArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.shux.inventory.biz.IRequestAsyncProcessBiz; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; /** ********************************************** * 描述:異步處理請求,用於路由隊列並把請求加入到隊列中 ********************************************** **/ @Service("requestAsyncProcessService") public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void process(Request request) { // 做請求的路由,根據productId路由到對應的隊列 ArrayBlockingQueue<Request> queue = getQueueByProductId(request.getProductId()); try { queue.put(request); } catch (InterruptedException e) { logger.error("產品ID{}加入隊列失敗",request.getProductId(),e); } } private ArrayBlockingQueue<Request> getQueueByProductId(Integer productId) { RequestQueue requestQueue = RequestQueue.getInstance(); String key = String.valueOf(productId); int hashcode; int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16); //對hashcode取摸 int index = (requestQueue.getQueueSize()-1) & hash; return requestQueue.getQueueByIndex(index); } }
數據更新請求controller
package com.shux.inventory.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.shux.inventory.biz.IRequestAsyncProcessBiz; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; import com.shux.inventory.request.InventoryUpdateDBRequest; import com.shux.inventory.request.Request; import com.shux.utils.other.Response; /** ********************************************** * 描述:提交更新請求 ********************************************** **/ @Controller("/inventory") public class InventoryUpdateDBController { private @Autowired InventoryProductBiz inventoryProductBiz; private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz; @RequestMapping("/updateDBInventoryProduct") @ResponseBody public Response updateDBInventoryProduct(InventoryProduct inventoryProduct){ Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz); requestAsyncProcessBiz.process(request); return new Response(Response.SUCCESS,"更新成功"); } }
數據查詢請求controller
package com.shux.inventory.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import com.shux.inventory.biz.IRequestAsyncProcessBiz; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; import com.shux.inventory.request.InventoryQueryCacheRequest; import com.shux.inventory.request.Request; /** ********************************************** * 描述:提交查詢請求 * 1、先從緩存中取數據 * 2、如果能從緩存中取到數據,則返回 * 3、如果不能從緩存取到數據,則等待20毫秒,然后再次去數據,直到200毫秒,如果超過200毫秒還不能取到數據,則從數據庫中取,並強制刷新緩存數據 ********************************************** **/ @Controller("/inventory") public class InventoryQueryCacheController { private @Autowired InventoryProductBiz inventoryProductBiz; private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz; @RequestMapping("/queryInventoryProduct") public InventoryProduct queryInventoryProduct(Integer productId) { Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false); requestAsyncProcessBiz.process(request);//加入到隊列中 long startTime = System.currentTimeMillis(); long allTime = 0L; long endTime = 0L; InventoryProduct inventoryProduct = null; while (true) { if (allTime > 200){//如果超過了200ms,那就直接退出,然后從數據庫中查詢 break; } try { inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId); if (inventoryProduct != null) { return inventoryProduct; } else { Thread.sleep(20);//如果查詢不到就等20毫秒 } endTime = System.currentTimeMillis(); allTime = endTime - startTime; } catch (Exception e) { } } /** * 代碼執行到這來,只有以下三種情況 * 1、緩存中本來有數據,由於redis內存滿了,redis通過LRU算法清除了緩存,導致數據沒有了 * 2、由於之前數據庫查詢比較慢或者內存太小處理不過來隊列中的數據,導致隊列里擠壓了很多的數據,所以一直沒有從數據庫中獲取數據然后插入到緩存中 * 3、數據庫中根本沒有這樣的數據,這種情況叫數據穿透,一旦別人知道這個商品沒有,如果一直執行查詢,就會一直查詢數據庫,如果過多,那么有可能會導致數據庫癱瘓 */ inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId); if (inventoryProduct != null) { Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true); requestAsyncProcessBiz.process(forcRrequest);//這個時候需要強制刷新數據庫,使緩存中有數據 return inventoryProduct; } return null; } }