Redis面試題記錄--緩存雙寫情況下導致數據不一致問題


轉載自:https://blog.csdn.net/lzhcoder/article/details/79469123

https://blog.csdn.net/u013374645/article/details/91409150

 

1.最經典的緩存+數據庫讀寫的模式,cache aside pattern

1.1、Cache Aside Pattern

(1)讀的時候,先讀緩存,緩存沒有的話,那么就讀數據庫,然后取出數據后放入緩存,同時返回響應

(2)更新的時候,先刪除緩存,然后再更新數據庫

1.2、為什么是刪除緩存,而不是更新緩存呢?

(1)、如果寫數據庫的值與更新到緩存值是一樣的,不需要經過任何的計算,可以馬上更新緩存,但是如果對於那種寫數據頻繁而讀數據少的場景並不合適這種解決方案,因為也許還沒有查詢就被刪除或修改了,這樣會浪費時間和資源

(2)、如果寫數據庫的值與更新緩存的值不一致,寫入緩存中的數據需要經過幾個表的關聯計算后得到的結果插入緩存中,那就沒有必要馬上更新緩存,只要刪除緩存即可,等到查詢的時候在去把計算后得到的結果插入到緩存中即可。其實刪除緩存,而不是更新緩存,就是一個lazy計算的思想。

1.3、先刪除緩存,再更新數據庫 && 先更新數據庫,再刪除緩存的比較

第一種方案的分析見2.1和2.2

下面討論第二種方案,該設計模式產生雙寫不一致的可能情況:

①一個是讀操作,但是沒有命中緩存,然后就到數據庫中取數據,此時來了一個寫操作,寫完數據庫后,讓緩存失效,然后,之前的那個讀操作再把老的數據放進去,所以,會造成臟數據。

該情況出現的概率可能非常低,因為這個條件需要發生在讀緩存時緩存失效,而且並發着有一個寫操作。而實際上數據庫的寫操作會比讀操作慢得多,而且還要鎖表,而讀操作必需在寫操作前進入數據庫操作,而又要晚於寫操作更新緩存,所有的這些條件都具備的概率基本並不大。 

②刪除緩存失敗

 

2.在庫存服務中實現緩存與數據庫雙寫一致性保障方案實戰

實時性比較高的數據緩存,選擇的就是庫存的服務

庫存可能會修改,每次修改都要去更新這個緩存數據; 每次庫存的數據,在緩存中一旦過期,或者是被清理掉了,前端的nginx服務都會發送請求給庫存服務,去獲取相應的數據

庫存這一塊,寫數據庫的時候,直接更新redis緩存

實際上沒有這么的簡單,這里,其實就涉及到了一個問題,數據庫與緩存雙寫,數據不一致的問題

圍繞和結合實時性較高的庫存服務,把數據庫與緩存雙寫不一致問題以及其解決方案,給大家講解一下

數據庫與緩存雙寫不一致,很常見的問題,大型的緩存架構中,第一個解決方案

 

2.1、最初級的緩存不一致問題以及解決方案

問題:先修改數據庫,再刪除緩存,如果刪除緩存失敗了,那么會導致數據庫中是新數據,緩存中是舊數據,數據出現不一致

解決思路:

先刪除緩存,再修改數據庫。如果刪除緩存成功,修改數據庫失敗了,那么數據庫中是舊數據,緩存中是空的,那么數據不會不一致,如果刪除緩存失敗不執行修改數據庫(cache aside pattern)

 

2.2、比較復雜的數據不一致問題分析

數據發生了變更,先刪除了緩存,然后要去修改數據庫,此時還沒修改

一個請求過來,去讀緩存,發現緩存空了,去查詢數據庫,查到了修改前的舊數據,放到了緩存中

數據變更的程序完成了數據庫的修改

完了,數據庫和緩存中的數據不一樣了。。。。

 

2.3、為什么上億流量高並發場景下,緩存會出現這個問題?

只有在對一個數據在並發的進行讀寫的時候,才可能會出現這種問題

其實如果說你的並發量很低的話,特別是讀並發很低,每天訪問量就1萬次,那么很少的情況下,會出現剛才描述的那種不一致的場景

但是問題是,如果每天的是上億的流量,每秒並發讀是幾萬,每秒只要有數據更新的請求,就可能會出現上述的數據庫+緩存不一致的情況

高並發了以后,問題是很多的

 

2.4、數據庫與緩存更新與讀取操作進行異步串行化

更新數據的時候(寫請求),根據數據的唯一標識,將操作路由之后,發送到AarrayBlockQueue中

讀取數據的時候,如果發現數據不在緩存中,那么將讀取mysql數據+更新緩存的操作(讀請求),根據產品id路由之后,也發送同一個AarrayBlockQueue中

一個隊列對應一個工作線程,每個工作線程串行拿到對應的操作,然后一條一條的執行

這樣的話,一個數據變更的操作,先執行,刪除緩存,然后再去更新數據庫,但是還沒完成更新

此時如果一個讀請求過來,讀到了空的緩存,那么可以先將緩存更新的請求發送到隊列中,此時會在隊列中積壓,然后同步等待緩存更新完成

這里有一個優化點,一個隊列中同一個商品,每次調用獲取庫存的接口時,都會發起一個讀取mysql數據+更新緩存的異步請求,這些請求串在一起是沒意義的,因此可以做過濾,如果發現隊列中同一個商品已經有一個更新緩存的請求了,那么就不用再放這個商品的更新請求操作進去了,直接等待前面的更新操作請求完成即可(代碼中利用flagMap實現過濾)

注解:同一個商品在同一個內存隊列中,保證刪除緩存,和修改數據庫的操作不會有其他線程干擾

如果請求還在等待時間范圍內,不斷輪詢發現可以取到值了,那么就直接返回; 如果請求等待的時間超過一定時長,那么這一次直接從數據庫中讀取當前的舊值

 

2.4.1、線程池+內存隊列初始化

/**注冊監聽器**/
 
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean(){
    ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
    servletListenerRegistrationBean.setListener(new InitListener());
    return servletListenerRegistrationBean;
}

 

  

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
 
import com.roncoo.eshop.inventory.thread.RequestProcessorThreadPool;
 
/**
 * 系統初始化監聽器
 * @author Administrator
 *
 */
public class InitListener implements ServletContextListener {
 
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        // 初始化工作線程池和內存隊列
        RequestProcessorThreadPool.init();
    }
    
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        
    }
 
}

 

  

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.request.RequestQueue;
 
/**
 * 請求處理線程池:單例
 * @author Administrator
 *
 */
public class RequestProcessorThreadPool {
    
    // 在實際項目中,你設置線程池大小是多少,每個線程監控的那個內存隊列的大小是多少
    // 都可以做到一個外部的配置文件中
    // 我們這了就給簡化了,直接寫死了,好吧
    
    /**
     * 線程池
     */
    private ExecutorService threadPool = Executors.newFixedThreadPool(10);
    
    public RequestProcessorThreadPool() {
        RequestQueue requestQueue = RequestQueue.getInstance();
        
        for(int i = 0; i < 10; i++) {
            ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
            requestQueue.addQueue(queue);  
            threadPool.submit(new RequestProcessorThread(queue));  
        }
    }
 
    /**
     * 單例有很多種方式去實現:我采取絕對線程安全的一種方式
     * 
     * 靜態內部類的方式,去初始化單例
     * 
     * @author Administrator
     *
     */
    private static class Singleton {
        
        private static RequestProcessorThreadPool instance;
        
        static {
            instance = new RequestProcessorThreadPool();
        }
        
        public static RequestProcessorThreadPool getInstance() {
            return instance;
        }
        
    }
    
    /**
     * jvm的機制去保證多線程並發安全
     * 
     * 內部類的初始化,一定只會發生一次,不管多少個線程並發去初始化
     * 
     * @return
     */
    public static RequestProcessorThreadPool getInstance() {
        return Singleton.getInstance();
    }
    
    /**
     * 初始化的便捷方法
     */
    public static void init() {
        getInstance();
    }
    
}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 請求內存隊列
 * @author Administrator
 *
 */
public class RequestQueue {
 
    /**
     * 內存隊列
     */
    private List<ArrayBlockingQueue<Request>> queues = 
            new ArrayList<ArrayBlockingQueue<Request>>();
    /**
     * 標識位map
     */
    private Map<Integer, Boolean> flagMap = new ConcurrentHashMap<Integer, Boolean>();
    
    /**
     * 單例有很多種方式去實現:我采取絕對線程安全的一種方式
     * 
     * 靜態內部類的方式,去初始化單例
     * 
     * @author Administrator
     *
     */
    private static class Singleton {
        
        private static RequestQueue instance;
        
        static {
            instance = new RequestQueue();
        }
        
        public static RequestQueue getInstance() {
            return instance;
        }
        
    }
    
    /**
     * jvm的機制去保證多線程並發安全
     * 
     * 內部類的初始化,一定只會發生一次,不管多少個線程並發去初始化
     * 
     * @return
     */
    public static RequestQueue getInstance() {
        return Singleton.getInstance();
    }
    
    /**
     * 添加一個內存隊列
     * @param queue
     */
    public void addQueue(ArrayBlockingQueue<Request> queue) {
        this.queues.add(queue);
    }
    
    /**
     * 獲取內存隊列的數量
     * @return
     */
    public int queueSize() {
        return queues.size();
    }
    
    /**
     * 獲取內存隊列
     * @param index
     * @return
     */
    public ArrayBlockingQueue<Request> getQueue(int index) {
        return queues.get(index);
    }
    
    public Map<Integer, Boolean> getFlagMap() {
        return flagMap;
    }
    
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
 
import com.roncoo.eshop.inventory.request.Request;
 
/**
 * 執行請求的工作線程
 * @author Administrator
 *
 */
public class RequestProcessorThread implements Callable<Boolean> {
    
    /**
     * 自己監控的內存隊列
     */
    private ArrayBlockingQueue<Request> queue;
 
    public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
        this.queue = queue;
    }
    
    @Override
    public Boolean call() throws Exception {
        try {
            while(true) {
                // ArrayBlockingQueue
                // Blocking就是說明,如果隊列滿了,或者是空的,那么都會在執行操作的時候,阻塞住
                Request request = queue.take();
                System.out.println("===========日志===========: 工作線程處理請求,商品id=" + request.getProductId()); 
                // 執行這個request操作
                request.process();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }
 
}

 

2.4.2、兩種請求對象封裝

import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.service.ProductInventoryService;
 
/**
 * 重新加載商品庫存的緩存
 * @author Administrator
 *
 */
public class ProductInventoryCacheRefreshRequest implements Request {
 
    /**
     * 商品id
     */
    private Integer productId;
    /**
     * 商品庫存Service
     */
    private ProductInventoryService productInventoryService;
    
    public ProductInventoryCacheRefreshRequest(Integer productId,
            ProductInventoryService productInventoryService) {
        this.productId = productId;
        this.productInventoryService = productInventoryService;
    }
    
    @Override
    public void process() {
        // 從數據庫中查詢最新的商品庫存數量
        ProductInventory productInventory = productInventoryService.findProductInventory(productId);
        System.out.println("===========日志===========: 已查詢到商品最新的庫存數量,商品id=" + productId + ", 商品庫存數量=" + productInventory.getInventoryCnt());  
        // 將最新的商品庫存數量,刷新到redis緩存中去
        productInventoryService.setProductInventoryCache(productInventory); 
    }
    
    public Integer getProductId() {
        return productId;
    }
    
}
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.service.ProductInventoryService;
 
/**
 * 比如說一個商品發生了交易,那么就要修改這個商品對應的庫存
 * 
 * 此時就會發送請求過來,要求修改庫存,那么這個可能就是所謂的data update request,數據更新請求
 * 
 * cache aside pattern
 * 
 * (1)刪除緩存
 * (2)更新數據庫
 * 
 * @author Administrator
 *
 */
public class ProductInventoryDBUpdateRequest implements Request {
 
    /**
     * 商品庫存
     */
    private ProductInventory productInventory;
    /**
     * 商品庫存Service
     */
    private ProductInventoryService productInventoryService;
    
    public ProductInventoryDBUpdateRequest(ProductInventory productInventory,
            ProductInventoryService productInventoryService) {
        this.productInventory = productInventory;
        this.productInventoryService = productInventoryService;
    }
    
    @Override
    public void process() {
        System.out.println("===========日志===========: 數據庫更新請求開始執行,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());  
        // 刪除redis中的緩存
        productInventoryService.removeProductInventoryCache(productInventory);
        // 為了模擬演示先刪除了redis中的緩存,然后還沒更新數據庫的時候,讀請求過來了,這里可以人工sleep一下
//        try {
//            Thread.sleep(20000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        } 
        // 修改數據庫中的庫存
        productInventoryService.updateProductInventory(productInventory);  
    }
    
    /**
     * 獲取商品id
     */
    public Integer getProductId() {
        return productInventory.getProductId();
    }
    
}

 

2.4.3、請求異步執行Service封裝

2.4.4、讀請求去重優化

對一個商品的庫存的數據庫更新操作已經在內存隊列中了

然后對這個商品的庫存的讀取操作,要求讀取數據庫的庫存數據,然后更新到緩存中,多個讀

這多個讀,其實只要有一個讀請求操作壓到隊列里就可以了

其他的讀操作,全部都wait那個讀請求的操作,刷新緩存,就可以讀到緩存中的最新數據

2.4.5、空數據讀請求過濾優化

如果讀請求發現redis緩存中沒有數據,就會發送讀請求給庫存服務,但是此時緩存中為空,可能是因為寫請求先刪除了緩存,也可能是數據庫里壓根兒沒這條數據

如果是數據庫中壓根兒沒這條數據的場景,那么就不應該將讀請求操作給壓入隊列中,而是直接返回空就可以了

都是為了減少內存隊列中的請求積壓,內存隊列中積壓的請求越多,就可能導致每個讀請求hang住的時間越長,也可能導致多個讀請求被hang住

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
 
import org.springframework.stereotype.Service;
 
import com.roncoo.eshop.inventory.request.ProductInventoryCacheRefreshRequest;
import com.roncoo.eshop.inventory.request.ProductInventoryDBUpdateRequest;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.request.RequestQueue;
import com.roncoo.eshop.inventory.service.RequestAsyncProcessService;
 
/**
 * 請求異步處理的service實現
 * @author Administrator
 *
 */
@Service("requestAsyncProcessService")  
public class RequestAsyncProcessServiceImpl implements RequestAsyncProcessService {
    
    @Override
    public void process(Request request) {
        try {
            // 先做讀請求的去重
            RequestQueue requestQueue = RequestQueue.getInstance();
            Map<Integer, Boolean> flagMap = requestQueue.getFlagMap();
            
            if(request instanceof ProductInventoryDBUpdateRequest) {
                // 如果是一個更新數據庫的請求,那么就將那個productId對應的標識設置為true
                flagMap.put(request.getProductId(), true);
            } else if(request instanceof ProductInventoryCacheRefreshRequest) {
                Boolean flag = flagMap.get(request.getProductId());
                
                // 如果flag是null
                if(flag == null) {
                    flagMap.put(request.getProductId(), false);
                }
                
                // 如果是緩存刷新的請求,那么就判斷,如果標識不為空,而且是true,就說明之前有一
                                //個這個商品的數據庫更新請求
                if(flag != null && flag) {
                    flagMap.put(request.getProductId(), false);
                }
                
                // 如果是緩存刷新的請求,而且發現標識不為空,但是標識是false
                // 說明前面已經有一個緩存刷新請求了
                if(flag != null && !flag) {
                    // 對於這種讀請求,直接就過濾掉,不要放到后面的內存隊列里面去了
                    return;
                }
            }
            
            // 做請求的路由,根據每個請求的商品id,路由到對應的內存隊列中去
            ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
            // 將請求放入對應的隊列中,完成路由操作
            queue.put(request);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 獲取路由到的內存隊列
     * @param productId 商品id
     * @return 內存隊列
     */
    private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId) {
        RequestQueue requestQueue = RequestQueue.getInstance();
        
        // 先獲取productId的hash值
        String key = String.valueOf(productId);
        int h;
        int hash = (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
        
        // 對hash值取模,將hash值路由到指定的內存隊列中,比如內存隊列大小8
        // 用內存隊列的數量對hash值取模之后,結果一定是在0~7之間
        // 所以任何一個商品id都會被固定路由到同樣的一個內存隊列中去的
        int index = (requestQueue.queueSize() - 1) & hash;
        
        System.out.println("===========日志===========: 路由內存隊列,商品id=" + productId + ", 隊列索引=" + index);  
        
        return requestQueue.getQueue(index);
    }
 
}

 

2.4.6、兩種請求Controller接口封裝

如果請求還在等待時間范圍內,不斷輪詢發現可以取到值了,那么就直接返回; 如果請求等待的時間超過一定時長,那么這一次直接從數據庫中讀取當前的舊值

 
    /**
     * 獲取商品庫存
     */
    @RequestMapping("/getProductInventory")
    @ResponseBody
    public ProductInventory getProductInventory(Integer productId) {
        System.out.println("===========日志===========: 接收到一個商品庫存的讀請求,商品id=" + productId);  
        
        ProductInventory productInventory = null;
        
        try {
            Request request = new ProductInventoryCacheRefreshRequest(
                    productId, productInventoryService);
            requestAsyncProcessService.process(request);
            
            // 將請求扔給service異步去處理以后,就需要while(true)一會兒,在這里hang住
            // 去嘗試等待前面有商品庫存更新的操作,同時緩存刷新的操作,將最新的數據刷新到緩存中
            long startTime = System.currentTimeMillis();
            long endTime = 0L;
            long waitTime = 0L;
            
            // 等待超過200ms沒有從緩存中獲取到結果
            while(true) {
//                if(waitTime > 25000) {
//                    break;
//                }
                
                // 一般公司里面,面向用戶的讀請求控制在200ms就可以了
                if(waitTime > 200) {
                    break;
                }
                
                // 嘗試去redis中讀取一次商品庫存的緩存數據
                productInventory = productInventoryService.getProductInventoryCache(productId);
                
                // 如果讀取到了結果,那么就返回
                if(productInventory != null) {
                    System.out.println("===========日志===========: 在200ms內讀取到了redis中的庫存緩存,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());  
                    return productInventory;
                }
                
                // 如果沒有讀取到結果,那么等待一段時間
                else {
                    Thread.sleep(20);
                    endTime = System.currentTimeMillis();
                    waitTime = endTime - startTime;
                }
            }
            
            // 直接嘗試從數據庫中讀取數據
            productInventory = productInventoryService.findProductInventory(productId);
            if(productInventory != null) {
                // 將緩存刷新一下
                productInventoryService.setProductInventoryCache(productInventory); 
                return productInventory;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        return new ProductInventory(productId, -1L);  
    }
    /**
     * 更新商品庫存
     */
    @RequestMapping("/updateProductInventory")
    @ResponseBody
    public Response updateProductInventory(ProductInventory productInventory) {
        // 為了簡單起見,我們就不用log4j那種日志框架去打印日志了
        // 其實log4j也很簡單,實際企業中都是用log4j去打印日志的,自己百度一下
        System.out.println("===========日志===========: 接收到更新商品庫存的請求,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());
        
        Response response = null;
        
        try {
            Request request = new ProductInventoryDBUpdateRequest(
                    productInventory, productInventoryService);
            requestAsyncProcessService.process(request);
            response = new Response(Response.SUCCESS);
        } catch (Exception e) {
            e.printStackTrace();
            response = new Response(Response.FAILURE);
        }
        
        return response;
    }

2.5、高並發的場景下,該解決方案要注意的問題

(1)讀請求長時阻塞

   由於讀請求進行了非常輕度的異步化,所以一定要注意讀超時的問題,每個讀請求必須在超時時間范圍內返回

該解決方案,最大的風險點在於說,可能數據更新很頻繁,導致隊列中積壓了大量更新操作在里面,然后讀請求會發生大量的超時,最后導致大量的請求直接走數據庫

務必通過一些模擬真實的測試,看看更新數據的頻繁是怎樣的

另外一點,因為一個隊列中,可能會積壓針對多個數據項的更新操作,因此需要根據自己的業務情況進行測試,可能需要部署多個服務,每個服務分攤一些數據的更新操作

如果一個內存隊列里居然會擠壓100個商品的庫存修改操作,每隔庫存修改操作要耗費10ms區完成,那么最后一個商品的讀請求,可能等待10 * 100 = 1000ms = 1s后,才能得到數據

這個時候就導致讀請求的長時阻塞

一定要做根據實際業務系統的運行情況,去進行一些壓力測試,和模擬線上環境,去看看最繁忙的時候,內存隊列可能會擠壓多少更新操作,可能會導致最后一個更新操作對應的讀請求,會hang多少時間,如果讀請求在200ms返回,如果你計算過后,哪怕是最繁忙的時候,積壓10個更新操作,最多等待200ms,那還可以的

如果一個內存隊列可能積壓的更新操作特別多,那么你就要加機器,讓每個機器上部署的服務實例處理更少的數據,那么每個內存隊列中積壓的更新操作就會越少

其實根據之前的項目經驗,一般來說數據的寫頻率是很低的,因此實際上正常來說,在隊列中積壓的更新操作應該是很少的

針對讀高並發,讀緩存架構的項目,一般寫請求相對讀來說,是非常非常少的,每秒的QPS能到幾百就不錯了

一秒,500的寫操作,5份,每200ms,就100個寫操作

單機器,20個內存隊列,每個內存隊列,可能就積壓5個寫操作,每個寫操作性能測試后,一般在20ms左右就完成

 

那么針對每個內存隊列中的數據的讀請求,也就最多hang一會兒,200ms以內肯定能返回了

寫QPS擴大10倍,但是經過剛才的測算,就知道,單機支撐寫QPS幾百沒問題,那么就擴容機器,擴容10倍的機器,10台機器,每個機器20個隊列,200個隊列

大部分的情況下,應該是這樣的,大量的讀請求過來,都是直接走緩存取到數據的

少量情況下,可能遇到讀跟數據更新沖突的情況,如上所述,那么此時更新操作如果先入隊列,之后可能會瞬間來了對這個數據大量的讀請求,但是因為做了去重的優化,所以也就一個更新緩存的操作跟在它后面

等數據更新完了,讀請求觸發的緩存更新操作也完成,然后臨時等待的讀請求全部可以讀到緩存中的數據

(2)讀請求並發量過高

這里還必須做好壓力測試,確保恰巧碰上上述情況的時候,還有一個風險,就是突然間大量讀請求會在幾十毫秒的延時hang在服務上,看服務能不能抗的住,需要多少機器才能抗住最大的極限情況的峰值

但是因為並不是所有的數據都在同一時間更新,緩存也不會同一時間失效,所以每次可能也就是少數數據的緩存失效了,然后那些數據對應的讀請求過來,並發量應該也不會特別大

按1:99的比例計算讀和寫的請求,每秒5萬的讀QPS,可能只有500次更新操作

如果一秒有500的寫QPS,那么要測算好,可能寫操作影響的數據有500條,這500條數據在緩存中失效后,可能導致多少讀請求,發送讀請求到庫存服務來,要求更新緩存

 

一般來說,1:1,1:2,1:3,每秒鍾有1000個讀請求,會hang在庫存服務上,每個讀請求最多hang多少時間,200ms就會返回

在同一時間最多hang住的可能也就是單機200個讀請求,同時hang住

單機hang200個讀請求,還是ok的

1:20,每秒更新500條數據,這500秒數據對應的讀請求,會有20 * 500 = 1萬

1萬個讀請求全部hang在庫存服務上,就死定了

(3)多服務實例部署的請求路由

后端的nginx服務器,就稱之為應用服務器; 最前端的nginx服務器,被稱之為分發服務器

分發層nginx,負責流量分發的邏輯和策略,這個里面它可以根據你自己定義的一些規則,比如根據productId去進行hash,然后對后端的nginx數量取模

將某一個商品的訪問的請求,就固定路由到一個后端的nginx服務器上去,固定商品id,走固定的緩存服務實例。保證說只會從redis中獲取一次緩存數據,后面全都是走nginx本地緩存了

機器級別的請求路由問題

 

(4)熱點商品的路由問題,導致請求的傾斜

萬一某個商品的讀寫請求特別高,全部打到相同的機器的相同的隊列里面去了,可能造成某台機器的壓力過大

就是說,因為只有在商品數據更新的時候才會清空緩存,然后才會導致讀寫並發,所以更新頻率不是太高的話,這個問題的影響並不是特別大

但是的確可能某些機器的負載會高一些

2.6、總結

一般來說,就是如果你的系統不是嚴格要求緩存+數據庫必須一致性的話,緩存可以稍微的跟數據庫偶爾有不一致的情況,最好不要做這個方案,讀請求和寫請求串行化,串到一個內存隊列里去,這樣就可以保證一定不會出現不一致的情況

串行化之后,就會導致系統的吞吐量會大幅度的降低,用比正常情況下多幾倍的機器去支撐線上的一個請求。

最后附上代碼地址https://gitee.com/lzhcode/eshop-inventory

 

 

2.4.1、線程池+內存隊列初始化

  1.  
    /**注冊監聽器**/
  2.  
     
  3.  
    @Bean
  4.  
    public ServletListenerRegistrationBean servletListenerRegistrationBean(){
  5.  
        ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
  6.  
        servletListenerRegistrationBean.setListener( new InitListener());
  7.  
        return servletListenerRegistrationBean;
  8.  
    }
  1.  
    import javax.servlet.ServletContextEvent;
  2.  
    import javax.servlet.ServletContextListener;
  3.  
     
  4.  
    import com.roncoo.eshop.inventory.thread.RequestProcessorThreadPool;
  5.  
     
  6.  
    /**
  7.  
    * 系統初始化監聽器
  8.  
    * @author Administrator
  9.  
    *
  10.  
    */
  11.  
    public class InitListener implements ServletContextListener {
  12.  
     
  13.  
    @Override
  14.  
    public void contextInitialized(ServletContextEvent sce) {
  15.  
    // 初始化工作線程池和內存隊列
  16.  
    RequestProcessorThreadPool.init();
  17.  
    }
  18.  
     
  19.  
    @Override
  20.  
    public void contextDestroyed(ServletContextEvent sce) {
  21.  
     
  22.  
    }
  23.  
     
  24.  
    }

 

  1.  
    import java.util.concurrent.ArrayBlockingQueue;
  2.  
    import java.util.concurrent.ExecutorService;
  3.  
    import java.util.concurrent.Executors;
  4.  
     
  5.  
    import com.roncoo.eshop.inventory.request.Request;
  6.  
    import com.roncoo.eshop.inventory.request.RequestQueue;
  7.  
     
  8.  
    /**
  9.  
    * 請求處理線程池:單例
  10.  
    * @author Administrator
  11.  
    *
  12.  
    */
  13.  
    public class RequestProcessorThreadPool {
  14.  
     
  15.  
    // 在實際項目中,你設置線程池大小是多少,每個線程監控的那個內存隊列的大小是多少
  16.  
    // 都可以做到一個外部的配置文件中
  17.  
    // 我們這了就給簡化了,直接寫死了,好吧
  18.  
     
  19.  
    /**
  20.  
    * 線程池
  21.  
    */
  22.  
    private ExecutorService threadPool = Executors.newFixedThreadPool( 10);
  23.  
     
  24.  
    public RequestProcessorThreadPool() {
  25.  
    RequestQueue requestQueue = RequestQueue.getInstance();
  26.  
     
  27.  
    for( int i = 0; i < 10; i++) {
  28.  
    ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>( 100);
  29.  
    requestQueue.addQueue(queue);
  30.  
    threadPool.submit( new RequestProcessorThread(queue));
  31.  
    }
  32.  
    }
  33.  
     
  34.  
    /**
  35.  
    * 單例有很多種方式去實現:我采取絕對線程安全的一種方式
  36.  
    *
  37.  
    * 靜態內部類的方式,去初始化單例
  38.  
    *
  39.  
    * @author Administrator
  40.  
    *
  41.  
    */
  42.  
    private static class Singleton {
  43.  
     
  44.  
    private static RequestProcessorThreadPool instance;
  45.  
     
  46.  
    static {
  47.  
    instance = new RequestProcessorThreadPool();
  48.  
    }
  49.  
     
  50.  
    public static RequestProcessorThreadPool getInstance() {
  51.  
    return instance;
  52.  
    }
  53.  
     
  54.  
    }
  55.  
     
  56.  
    /**
  57.  
    * jvm的機制去保證多線程並發安全
  58.  
    *
  59.  
    * 內部類的初始化,一定只會發生一次,不管多少個線程並發去初始化
  60.  
    *
  61.  
    * @return
  62.  
    */
  63.  
    public static RequestProcessorThreadPool getInstance() {
  64.  
    return Singleton.getInstance();
  65.  
    }
  66.  
     
  67.  
    /**
  68.  
    * 初始化的便捷方法
  69.  
    */
  70.  
    public static void init() {
  71.  
    getInstance();
  72.  
    }
  73.  
     
  74.  
    }
  1.  
    import java.util.ArrayList;
  2.  
    import java.util.List;
  3.  
    import java.util.Map;
  4.  
    import java.util.concurrent.ArrayBlockingQueue;
  5.  
    import java.util.concurrent.ConcurrentHashMap;
  6.  
     
  7.  
    /**
  8.  
    * 請求內存隊列
  9.  
    * @author Administrator
  10.  
    *
  11.  
    */
  12.  
    public class RequestQueue {
  13.  
     
  14.  
    /**
  15.  
    * 內存隊列
  16.  
    */
  17.  
    private List<ArrayBlockingQueue<Request>> queues =
  18.  
    new ArrayList<ArrayBlockingQueue<Request>>();
  19.  
    /**
  20.  
    * 標識位map
  21.  
    */
  22.  
    private Map<Integer, Boolean> flagMap = new ConcurrentHashMap<Integer, Boolean>();
  23.  
     
  24.  
    /**
  25.  
    * 單例有很多種方式去實現:我采取絕對線程安全的一種方式
  26.  
    *
  27.  
    * 靜態內部類的方式,去初始化單例
  28.  
    *
  29.  
    * @author Administrator
  30.  
    *
  31.  
    */
  32.  
    private static class Singleton {
  33.  
     
  34.  
    private static RequestQueue instance;
  35.  
     
  36.  
    static {
  37.  
    instance = new RequestQueue();
  38.  
    }
  39.  
     
  40.  
    public static RequestQueue getInstance() {
  41.  
    return instance;
  42.  
    }
  43.  
     
  44.  
    }
  45.  
     
  46.  
    /**
  47.  
    * jvm的機制去保證多線程並發安全
  48.  
    *
  49.  
    * 內部類的初始化,一定只會發生一次,不管多少個線程並發去初始化
  50.  
    *
  51.  
    * @return
  52.  
    */
  53.  
    public static RequestQueue getInstance() {
  54.  
    return Singleton.getInstance();
  55.  
    }
  56.  
     
  57.  
    /**
  58.  
    * 添加一個內存隊列
  59.  
    * @param queue
  60.  
    */
  61.  
    public void addQueue(ArrayBlockingQueue<Request> queue) {
  62.  
    this.queues.add(queue);
  63.  
    }
  64.  
     
  65.  
    /**
  66.  
    * 獲取內存隊列的數量
  67.  
    * @return
  68.  
    */
  69.  
    public int queueSize() {
  70.  
    return queues.size();
  71.  
    }
  72.  
     
  73.  
    /**
  74.  
    * 獲取內存隊列
  75.  
    * @param index
  76.  
    * @return
  77.  
    */
  78.  
    public ArrayBlockingQueue<Request> getQueue(int index) {
  79.  
    return queues.get(index);
  80.  
    }
  81.  
     
  82.  
    public Map<Integer, Boolean> getFlagMap() {
  83.  
    return flagMap;
  84.  
    }
  85.  
     
  86.  
    }

 

  1.  
    import java.util.concurrent.ArrayBlockingQueue;
  2.  
    import java.util.concurrent.Callable;
  3.  
     
  4.  
    import com.roncoo.eshop.inventory.request.Request;
  5.  
     
  6.  
    /**
  7.  
    * 執行請求的工作線程
  8.  
    * @author Administrator
  9.  
    *
  10.  
    */
  11.  
    public class RequestProcessorThread implements Callable<Boolean> {
  12.  
     
  13.  
    /**
  14.  
    * 自己監控的內存隊列
  15.  
    */
  16.  
    private ArrayBlockingQueue<Request> queue;
  17.  
     
  18.  
    public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
  19.  
    this.queue = queue;
  20.  
    }
  21.  
     
  22.  
    @Override
  23.  
    public Boolean call() throws Exception {
  24.  
    try {
  25.  
    while( true) {
  26.  
    // ArrayBlockingQueue
  27.  
    // Blocking就是說明,如果隊列滿了,或者是空的,那么都會在執行操作的時候,阻塞住
  28.  
    Request request = queue.take();
  29.  
    System.out.println( "===========日志===========: 工作線程處理請求,商品id=" + request.getProductId());
  30.  
    // 執行這個request操作
  31.  
    request.process();
  32.  
    }
  33.  
    } catch (Exception e) {
  34.  
    e.printStackTrace();
  35.  
    }
  36.  
    return true;
  37.  
    }
  38.  
     
  39.  
    }

2.4.2、兩種請求對象封裝

  1.  
    import com.roncoo.eshop.inventory.model.ProductInventory;
  2.  
    import com.roncoo.eshop.inventory.service.ProductInventoryService;
  3.  
     
  4.  
    /**
  5.  
    * 重新加載商品庫存的緩存
  6.  
    * @author Administrator
  7.  
    *
  8.  
    */
  9.  
    public class ProductInventoryCacheRefreshRequest implements Request {
  10.  
     
  11.  
    /**
  12.  
    * 商品id
  13.  
    */
  14.  
    private Integer productId;
  15.  
    /**
  16.  
    * 商品庫存Service
  17.  
    */
  18.  
    private ProductInventoryService productInventoryService;
  19.  
     
  20.  
    public ProductInventoryCacheRefreshRequest(Integer productId,
  21.  
    ProductInventoryService productInventoryService) {
  22.  
    this.productId = productId;
  23.  
    this.productInventoryService = productInventoryService;
  24.  
    }
  25.  
     
  26.  
    @Override
  27.  
    public void process() {
  28.  
    // 從數據庫中查詢最新的商品庫存數量
  29.  
    ProductInventory productInventory = productInventoryService.findProductInventory(productId);
  30.  
    System.out.println( "===========日志===========: 已查詢到商品最新的庫存數量,商品id=" + productId + ", 商品庫存數量=" + productInventory.getInventoryCnt());
  31.  
    // 將最新的商品庫存數量,刷新到redis緩存中去
  32.  
    productInventoryService.setProductInventoryCache(productInventory);
  33.  
    }
  34.  
     
  35.  
    public Integer getProductId() {
  36.  
    return productId;
  37.  
    }
  38.  
     
  39.  
    }
  1.  
    import com.roncoo.eshop.inventory.model.ProductInventory;
  2.  
    import com.roncoo.eshop.inventory.service.ProductInventoryService;
  3.  
     
  4.  
    /**
  5.  
    * 比如說一個商品發生了交易,那么就要修改這個商品對應的庫存
  6.  
    *
  7.  
    * 此時就會發送請求過來,要求修改庫存,那么這個可能就是所謂的data update request,數據更新請求
  8.  
    *
  9.  
    * cache aside pattern
  10.  
    *
  11.  
    * (1)刪除緩存
  12.  
    * (2)更新數據庫
  13.  
    *
  14.  
    * @author Administrator
  15.  
    *
  16.  
    */
  17.  
    public class ProductInventoryDBUpdateRequest implements Request {
  18.  
     
  19.  
    /**
  20.  
    * 商品庫存
  21.  
    */
  22.  
    private ProductInventory productInventory;
  23.  
    /**
  24.  
    * 商品庫存Service
  25.  
    */
  26.  
    private ProductInventoryService productInventoryService;
  27.  
     
  28.  
    public ProductInventoryDBUpdateRequest(ProductInventory productInventory,
  29.  
    ProductInventoryService productInventoryService) {
  30.  
    this.productInventory = productInventory;
  31.  
    this.productInventoryService = productInventoryService;
  32.  
    }
  33.  
     
  34.  
    @Override
  35.  
    public void process() {
  36.  
    System.out.println( "===========日志===========: 數據庫更新請求開始執行,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());
  37.  
    // 刪除redis中的緩存
  38.  
    productInventoryService.removeProductInventoryCache(productInventory);
  39.  
    // 為了模擬演示先刪除了redis中的緩存,然后還沒更新數據庫的時候,讀請求過來了,這里可以人工sleep一下
  40.  
    // try {
  41.  
    // Thread.sleep(20000);
  42.  
    // } catch (InterruptedException e) {
  43.  
    // e.printStackTrace();
  44.  
    // }
  45.  
    // 修改數據庫中的庫存
  46.  
    productInventoryService.updateProductInventory(productInventory);
  47.  
    }
  48.  
     
  49.  
    /**
  50.  
    * 獲取商品id
  51.  
    */
  52.  
    public Integer getProductId() {
  53.  
    return productInventory.getProductId();
  54.  
    }
  55.  
     
  56.  
    }
  57.  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM