redis系列之數據庫與緩存數據一致性解決方案


解決方案總結:

由於數據庫層面的讀寫並發,引發的數據庫與緩存數據不一致的問題(本質是后發生的讀請求先返回了),可能通過兩個小的改動解決:
  1)修改服務Service連接池,id取模選取服務連接,能夠保證同一個數據的讀寫都落在同一個后端服務上

    “同一個數據的訪問一定落到同一個服務上”
    獲取Service連接的CPool.GetServiceConnection()【返回任何一個可用Service連接】改為CPool.GetServiceConnection(longid)【返回id取模相關聯的Service連接】這樣的話,就能夠保證同一個數據例如uid的請求落到同一個服務Service上

  2)修改數據庫DB連接池,id取模選取DB連接,能夠保證同一個數據的讀寫在數據庫層面是串行的
    “讓同一數據請求落到同一個數據庫鏈接中“讓同一個數據的訪問能串行化”
    “讓同一個數據的訪問通過同一條DB連接執行”
    “在DB連接池層面稍微修改,按數據取連接即可”
    獲取DB連接的CPool.GetDBConnection()【返回任何一個可用DB連接】改為CPool.GetDBConnection(longid)【返回id取模相關聯的DB連接】

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

redis系列之數據庫與緩存數據一致性解決方案

數據庫與緩存讀寫模式策略

寫完數據庫后是否需要馬上更新緩存還是直接刪除緩存?
(1)、如果寫數據庫的值與更新到緩存值是一樣的,不需要經過任何的計算,可以馬上更新緩存,但是如果對於那種寫數據頻繁而讀數據少的場景並不合適這種解決方案,因為也許還沒有查詢就被刪除或修改了,這樣會浪費時間和資源
(2)、如果寫數據庫的值與更新緩存的值不一致,寫入緩存中的數據需要經過幾個表的關聯計算后得到的結果插入緩存中,那就沒有必要馬上更新緩存,只有刪除緩存即可,等到查詢的時候在去把計算后得到的結果插入到緩存中即可。
所以一般的策略是當更新數據時,先刪除緩存數據,然后更新數據庫,而不是更新緩存,等要查詢的時候才把最新的數據更新到緩存

數據庫與緩存雙寫情況下導致數據不一致問題

場景一

當更新數據時,如更新某商品的庫存,當前商品的庫存是100,現在要更新為99,先更新數據庫更改成99,然后刪除緩存,發現刪除緩存失敗了,這意味着數據庫存的是99,而緩存是100,這導致數據庫和緩存不一致。

場景一解決方案

 
 
這種情況應該是先刪除緩存,然后在更新數據庫,如果刪除緩存失敗,那就不要更新數據庫,如果說刪除緩存成功,而更新數據庫失敗,那查詢的時候只是從數據庫里查了舊的數據而已,這樣就能保持數據庫與緩存的一致性。

場景二

在高並發的情況下,如果當刪除完緩存的時候,這時去更新數據庫,但還沒有更新完,另外一個請求來查詢數據,發現緩存里沒有,就去數據庫里查,還是以上面商品庫存為例,如果數據庫中產品的庫存是100,那么查詢到的庫存是100,然后插入緩存,插入完緩存后,原來那個更新數據庫的線程把數據庫更新為了99,導致數據庫與緩存不一致的情況

場景二解決方案

 
遇到這種情況,可以用隊列的去解決這個問,創建幾個隊列,如20個,根據商品的ID去做hash值,然后對隊列個數取摸,當有數據更新請求時,先把它丟到隊列里去,當更新完后在從隊列里去除,如果在更新的過程中,遇到以上場景,先去緩存里看下有沒有數據,如果沒有,可以先去隊列里看是否有相同商品ID在做更新,如果有也把查詢的請求發送到隊列里去,然后同步等待緩存更新完成。
這里有一個優化點,如果發現隊列里有一個查詢請求了,那么就不要放新的查詢操作進去了,用一個while(true)循環去查詢緩存,循環個200MS左右,如果緩存里還沒有則直接取數據庫的舊數據,一般情況下是可以取到的。
 
在高並發下解決場景二要注意的問題
(1)讀請求時長阻塞
 由於讀請求進行了非常輕度的異步化,所以一定要注意讀超時的問題,每個讀請求必須在超時間內返回,該解決方案最大的風險在於可能數據更新很頻繁,導致隊列中擠壓了大量的更新操作在里面,然后讀請求會發生大量的超時,最后導致大量的請求直接走數據庫,像遇到這種情況,一般要做好足夠的壓力測試,如果壓力過大,需要根據實際情況添加機器。
(2)請求並發量過高
 這里還是要做好壓力測試,多模擬真實場景,並發量在最高的時候QPS多少,扛不住就要多加機器,還有就是做好讀寫比例是多少
(3)多服務實例部署的請求路由
可能這個服務部署了多個實例,那么必須保證說,執行數據更新操作,以及執行緩存更新操作的請求,都通過nginx服務器路由到相同的服務實例上
(4)熱點商品的路由問題,導致請求的傾斜
某些商品的讀請求特別高,全部打到了相同的機器的相同丟列里了,可能造成某台服務器壓力過大,因為只有在商品數據更新的時候才會清空緩存,然后才會導致讀寫並發,所以更新頻率不是太高的話,這個問題的影響並不是很大,但是確實有可能某些服務器的負載會高一些。
 

數據庫與緩存數據一致性解決方案流程圖

 

 

數據庫與緩存數據一致性解決方案對應代碼

 

商品庫存實體

package com.shux.inventory.entity;  

public class InventoryProduct {  
    private Integer productId;  
    private Long InventoryCnt;  
      
    public Integer getProductId() {  
        return productId;  
    }  
    public void setProductId(Integer productId) {  
        this.productId = productId;  
    }  
    public Long getInventoryCnt() {  
        return InventoryCnt;  
    }  
    public void setInventoryCnt(Long inventoryCnt) {  
        InventoryCnt = inventoryCnt;  
    }  
      
}  

請求接口

public interface Request {  
    public void process();  
    public Integer getProductId();  
    public boolean isForceFefresh();  
}  

數據更新請求

package com.shux.inventory.request;  
  
import org.springframework.transaction.annotation.Transactional;  
  
import com.shux.inventory.biz.InventoryProductBiz;  
import com.shux.inventory.entity.InventoryProduct;  
  
/** 
 ********************************************** 
 *  描述:更新庫存信息 
 *  1、先刪除緩存中的數據 
 *  2、更新數據庫中的數據 
 ********************************************** 
**/  
public class InventoryUpdateDBRequest implements Request{  
    private InventoryProductBiz inventoryProductBiz;  
    private InventoryProduct inventoryProduct;  
      
    public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){  
        this.inventoryProduct = inventoryProduct;  
        this.inventoryProductBiz = inventoryProductBiz;  
    }  
    @Override  
    @Transactional  
    public void process() {  
        inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId());  
        inventoryProductBiz.updateInventoryProduct(inventoryProduct);  
    }  
    @Override  
    public Integer getProductId() {  
        // TODO Auto-generated method stub  
        return inventoryProduct.getProductId();  
    }  
    @Override  
    public boolean isForceFefresh() {  
        // TODO Auto-generated method stub  
        return false;  
    }  
  
}  

查詢請求

package com.shux.inventory.request;  
  
import com.shux.inventory.biz.InventoryProductBiz;  
import com.shux.inventory.entity.InventoryProduct;  
  
/** 
 ********************************************** 
 *  描述:查詢緩存數據 
 *  1、從數據庫中查詢 
 *  2、從數據庫中查詢后插入到緩存中 

 ********************************************** 
**/  
public class InventoryQueryCacheRequest implements Request {  
    private InventoryProductBiz inventoryProductBiz;  
    private Integer productId;  
    private boolean isForceFefresh;  
      
    public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) {  
        this.productId = productId;  
        this.inventoryProductBiz = inventoryProductBiz;  
        this.isForceFefresh = isForceFefresh;  
    }  
    @Override  
    public void process() {  
      InventoryProduct  inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);  
      inventoryProductBiz.setInventoryProductCache(inventoryProduct);  
    }  
    @Override  
    public Integer getProductId() {  
        // TODO Auto-generated method stub  
        return productId;  
    }  
    public boolean isForceFefresh() {  
        return isForceFefresh;  
    }  
    public void setForceFefresh(boolean isForceFefresh) {  
        this.isForceFefresh = isForceFefresh;  
    }  
      
  
}  

spring啟動時初始化隊列線程池

package com.shux.inventory.thread;  
  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
import com.shux.inventory.request.Request;  
import com.shux.inventory.request.RequestQueue;  
import com.shux.utils.other.SysConfigUtil;  
  
/** 
 ********************************************** 
 *  描述:請求處理線程池,初始化隊列數及每個隊列最多能處理的數量 

 ********************************************** 
**/  
public class RequestProcessorThreadPool {  
    private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString());  
    private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString());  
    private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum);  
    private RequestProcessorThreadPool(){  
        for(int i=0;i<blockingQueueNum;i++){//初始化隊列  
            ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(queueDataNum);//每個隊列中放100條數據  
            RequestQueue.getInstance().addQueue(queue);  
            threadPool.submit(new RequestProcessorThread(queue));//把每個queue交個線程去處理,線程會處理每個queue中的數據  
        }  
    }  
    public static class Singleton{  
       private static RequestProcessorThreadPool instance;  
       static{  
           instance = new RequestProcessorThreadPool();  
       }  
       public static RequestProcessorThreadPool getInstance(){  
           return instance;  
       }  
    }  
    public static RequestProcessorThreadPool getInstance(){  
        return Singleton.getInstance();  
    }  
    /** 
     * 初始化線程池 
     */  
    public static void init(){  
        getInstance();  
    }  
}  

請求處理線程

package com.shux.inventory.thread;  
  
import java.util.Map;  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.Callable;  
  
import com.shux.inventory.request.InventoryUpdateDBRequest;  
import com.shux.inventory.request.Request;  
import com.shux.inventory.request.RequestQueue;  
  
/** 
 ********************************************** 
 *  描述:請求處理線程 

 ********************************************** 
**/  
public class RequestProcessorThread implements Callable<Boolean>{  
    private ArrayBlockingQueue<Request> queue;  
    public RequestProcessorThread(ArrayBlockingQueue<Request> queue){  
        this.queue = queue;  
    }  
    @Override  
    public Boolean call() throws Exception {  
       Request request = queue.take();  
       Map<Integer,Boolean> flagMap = RequestQueue.getInstance().getFlagMap();  
       //不需要強制刷新的時候,查詢請求去重處理  
           if (!request.isForceFefresh()){  
               if (request instanceof InventoryUpdateDBRequest) {//如果是更新請求,那就置為false  
                   flagMap.put(request.getProductId(), true);  
               } else {  
                   Boolean flag = flagMap.get(request.getProductId());  
                   /** 
                    * 標志位為空,有三種情況 
                    * 1、沒有過更新請求 
                    * 2、沒有查詢請求 
                    * 3、數據庫中根本沒有數據 
                    * 在最初情況,一旦庫存了插入了數據,那就好會在緩存中也會放一份數據, 
                    * 但這種情況下有可能由於redis中內存滿了,redis通過LRU算法把這個商品給清除了,導致緩存中沒有數據 
                    * 所以當標志位為空的時候,需要從數據庫重查詢一次,並且把標志位置為false,以便后面的請求能夠從緩存中取 
                    */  
                   if ( flag == null) {  
                       flagMap.put(request.getProductId(), false);  
                   }  
                   /** 
                    * 如果不為空,並且flag為true,說明之前有一次更新請求,說明緩存中沒有數據了(更新緩存會先刪除緩存), 
                    * 這個時候就要去刷新緩存,即從數據庫中查詢一次,並把標志位設置為false 
                    */  
                   if ( flag != null && flag) {  
                       flagMap.put(request.getProductId(), false);  
                   }  
                   /** 
                    * 這種情況說明之前有一個查詢請求,並且把數據刷新到了緩存中,所以這時候就不用去刷新緩存了,直接返回就可以了 
                    */  
                   if (flag != null && !flag) {  
                       flagMap.put(request.getProductId(), false);  
                       return true;  
                   }   
               }  
           }  
           request.process();  
        return true;  
    }  
      
}  

請求隊列

package com.shux.inventory.request;  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Map;  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.ConcurrentHashMap;  
  
/** 
 ********************************************** 
 *  描述:請求隊列 
 ********************************************** 
**/  
public class RequestQueue {  
    private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>();  
      
    private Map<Integer,Boolean> flagMap = new ConcurrentHashMap<>();  
    private RequestQueue(){  
          
    }  
    private static class Singleton{  
        private static RequestQueue queue;  
        static{  
            queue = new RequestQueue();  
        }  
        public static RequestQueue getInstance() {  
            return queue;  
        }  
    }  
      
    public static RequestQueue getInstance(){  
        return Singleton.getInstance();  
    }  
    public void addQueue(ArrayBlockingQueue<Request> queue) {  
        queues.add(queue);  
    }  
      
    public int getQueueSize(){  
        return queues.size();  
    }  
    public ArrayBlockingQueue<Request> getQueueByIndex(int index) {  
        return queues.get(index);  
    }  
      
    public Map<Integer,Boolean> getFlagMap() {  
        return this.flagMap;  
    }  
}  

spring 啟動初始化線程池類

package com.shux.inventory.listener;  
  
import org.springframework.context.ApplicationListener;  
import org.springframework.context.event.ContextRefreshedEvent;  
  
import com.shux.inventory.thread.RequestProcessorThreadPool;  
  
/** 
 ********************************************** 
 *  描述:spring 啟動初始化線程池類 

 ********************************************** 
**/  
public class InitListener implements  ApplicationListener<ContextRefreshedEvent>{  
  
    @Override  
    public void onApplicationEvent(ContextRefreshedEvent event) {  
        // TODO Auto-generated method stub  
        if(event.getApplicationContext().getParent() != null){  
            return;  
        }  
        RequestProcessorThreadPool.init();  
    }  
}  

異步處理請求接口

package com.shux.inventory.biz;  
  
import com.shux.inventory.request.Request;  
  
/** 
 ********************************************** 
 *  描述:請求異步處理接口,用於路由隊列並把請求加入到隊列中 

 ********************************************** 
**/  
public interface IRequestAsyncProcessBiz {  
    void process(Request request);  
}  

異步處理請求接口實現

package com.shux.inventory.biz.impl;  
  
import java.util.concurrent.ArrayBlockingQueue;  
  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.stereotype.Service;  
  
import com.shux.inventory.biz.IRequestAsyncProcessBiz;  
import com.shux.inventory.request.Request;  
import com.shux.inventory.request.RequestQueue;  
  
  
/** 
 ********************************************** 
 *  描述:異步處理請求,用於路由隊列並把請求加入到隊列中 

 ********************************************** 
**/  
@Service("requestAsyncProcessService")  
public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz {  
    private Logger logger = LoggerFactory.getLogger(getClass());  
    @Override  
    public void process(Request request) {  
        // 做請求的路由,根據productId路由到對應的隊列  
        ArrayBlockingQueue<Request> queue = getQueueByProductId(request.getProductId());  
        try {  
            queue.put(request);  
        } catch (InterruptedException e) {  
            logger.error("產品ID{}加入隊列失敗",request.getProductId(),e);  
        }  
    }  
  
    private ArrayBlockingQueue<Request> getQueueByProductId(Integer productId) {  
        RequestQueue  requestQueue = RequestQueue.getInstance();  
        String key = String.valueOf(productId);  
        int hashcode;  
        int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16);  
        //對hashcode取摸  
        int index = (requestQueue.getQueueSize()-1) & hash;  
        return requestQueue.getQueueByIndex(index);  
    }  
      
   
  
}  

數據更新請求controller

package com.shux.inventory.controller;  
  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Controller;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.ResponseBody;  
  
import com.shux.inventory.biz.IRequestAsyncProcessBiz;  
import com.shux.inventory.biz.InventoryProductBiz;  
import com.shux.inventory.entity.InventoryProduct;  
import com.shux.inventory.request.InventoryUpdateDBRequest;  
import com.shux.inventory.request.Request;  
import com.shux.utils.other.Response;  
  
/** 
 ********************************************** 
 *  描述:提交更新請求 

 ********************************************** 
**/  
@Controller("/inventory")  
public class InventoryUpdateDBController {  
    private @Autowired InventoryProductBiz inventoryProductBiz;  
    private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;  
    @RequestMapping("/updateDBInventoryProduct")  
    @ResponseBody  
    public Response updateDBInventoryProduct(InventoryProduct  inventoryProduct){  
        Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz);  
        requestAsyncProcessBiz.process(request);  
       return new Response(Response.SUCCESS,"更新成功");  
    }  
}  

數據查詢請求controller

package com.shux.inventory.controller;  
  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Controller;  
import org.springframework.web.bind.annotation.RequestMapping;  
  
import com.shux.inventory.biz.IRequestAsyncProcessBiz;  
import com.shux.inventory.biz.InventoryProductBiz;  
import com.shux.inventory.entity.InventoryProduct;  
import com.shux.inventory.request.InventoryQueryCacheRequest;  
import com.shux.inventory.request.Request;  
  
/** 
 ********************************************** 
 *  描述:提交查詢請求 
 *  1、先從緩存中取數據 
 *  2、如果能從緩存中取到數據,則返回 
 *  3、如果不能從緩存取到數據,則等待20毫秒,然后再次去數據,直到200毫秒,如果超過200毫秒還不能取到數據,則從數據庫中取,並強制刷新緩存數據 

 ********************************************** 
**/  
@Controller("/inventory")  
public class InventoryQueryCacheController {  
    private @Autowired InventoryProductBiz inventoryProductBiz;  
    private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;  
    @RequestMapping("/queryInventoryProduct")  
    public InventoryProduct queryInventoryProduct(Integer productId) {  
         Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false);  
         requestAsyncProcessBiz.process(request);//加入到隊列中  
         long startTime = System.currentTimeMillis();  
         long allTime = 0L;  
         long endTime = 0L;  
         InventoryProduct inventoryProduct = null;  
         while (true) {  
             if (allTime > 200){//如果超過了200ms,那就直接退出,然后從數據庫中查詢  
                 break;  
             }  
             try {  
                 inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId);  
                 if (inventoryProduct != null) {  
                     return inventoryProduct;  
                 } else {  
                     Thread.sleep(20);//如果查詢不到就等20毫秒  
                 }   
                 endTime = System.currentTimeMillis();  
                 allTime = endTime - startTime;  
             } catch (Exception e) {  
             }   
         }  
         /** 
          * 代碼執行到這來,只有以下三種情況 
          * 1、緩存中本來有數據,由於redis內存滿了,redis通過LRU算法清除了緩存,導致數據沒有了 
          * 2、由於之前數據庫查詢比較慢或者內存太小處理不過來隊列中的數據,導致隊列里擠壓了很多的數據,所以一直沒有從數據庫中獲取數據然后插入到緩存中 
          * 3、數據庫中根本沒有這樣的數據,這種情況叫數據穿透,一旦別人知道這個商品沒有,如果一直執行查詢,就會一直查詢數據庫,如果過多,那么有可能會導致數據庫癱瘓 
          */  
         inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);  
         if (inventoryProduct != null) {  
             Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true);  
             requestAsyncProcessBiz.process(forcRrequest);//這個時候需要強制刷新數據庫,使緩存中有數據  
             return inventoryProduct;  
         }  
         return null;  
           
     }  
}  

參考鏈接:https://www.cnblogs.com/cxxjohnson/p/8519616.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM