1. 概述
緩存與數據庫的強一致性,也稱線性一致性,核心要求是:數據庫中的值發生變更,緩存數據要實現同步復制,並且一旦操作完成,隨后任意客戶端的查詢都必須返回這一新值。以下圖為例,一旦寫入b
完成,必須保證讀到;而寫入過程中,認為值的跳變可能發生在某一瞬間,因此讀到a或b都是可能的。數據庫與緩存作為一個整體,在向外提供服務的過程中,無論數據是否變更過,都時刻保持數據一致,因為它內部的數據仿佛
只有一份,即使並發訪問不同節點。
2. 場景
秒殺是一個比較典型的強一致場景,一般秒殺系統的庫存同時保持在數據庫與緩存中,如果查詢緩存有數據,直接可以走秒殺流程,將數據庫中的庫存數量進行扣減,同時將最新的數據更新到緩存,使緩存中數據與數據庫中數據保持強一致,這里只是拿秒殺的場景來舉例,類似秒殺的場景有很多,像搶門票系統、12306搶火車票等,資源比較少用戶比較多,需要在特定時間內進行搶購的業務場景。真實秒殺場景的設計,是在緩存中扣庫存,不會直接在數據庫中進行扣庫存,因為數據庫的性能遠遠比緩存差,所以本篇也只是拿類似秒殺這樣的場景,來闡述強一致下的設計思想與相關實現。
3. 方案
分布式系統里面,有個眾所周知的理論,就是CAP理論
,CAP即:
Consistency(一致性)
Availability(可用性)
Partition tolerance(分區容忍性)
這三個性質對應了分布式系統的三個指標。
而CAP理論說的就是一個分布式系統,不可能同時做到這三點。如果默認是分區的,那么只能選擇P的情況下,出現了兩種選擇組合,AP與CP,AP保證可用性則犧牲了一致性,CP保證了一致性則犧牲了可用性,所以我們在講緩存與數據庫強一致的同時,不可避免犧牲了系統可用性的指標,所以看到12306網站這種體驗不好,總是搶不到票,或者在一直提示排隊中這種情況,就是系統可用性不佳的表現,因為火車站的票源是個稀缺資源,而且在各個站點之間查到的數量又是動態的,在這種強一致性下的業務場景,可用性必然會出現問題。這里不深入討論12306網站具體是如何實現的,只是拿該場景做個引入。
假設現有一般搶購系統,某些商品搞促銷活動,庫存也就1000,搶完為止,在開槍時間未到來前,頁面顯示初始庫存,在搶購過程中,只要刷新頁面庫存還有,按鈕就不會置灰,還可以接着點擊搶購,直到頁面顯示庫存為0,活動結束。
這是個比較典型的讀多寫少
場景,大量請求來集中訪問,少部分請求能真正完成下單,我們很容易想到做讀寫分離
,將商品的庫存提前從數據庫預加載到緩存,用戶讀的時候,從緩存讀取數量,只要能看到數量,也就可以直接下單,至於用戶能否搶到,得看用戶運氣了,讓真正下單成功的用戶去走后續付款操作。注意,這里對於某個用戶下單成功后,后台要做的操作是先扣數據庫庫存數量,隨后實時同步
更新庫存到緩存中。如果這一步更新不及時,很有可能數據庫與緩存庫存不一致,導致緩存中的數量比實際數據庫庫存還多,最終緩存庫存減為零,而數據庫已經是負數,結果導致超賣。
3.1 數據庫與緩存雙寫+讀取操作異步串行化
當庫存發生變化后,更新數據庫,同時更新緩存,如果在讀並發高的情況下,更新數據庫與更新緩存的時間間隔中,被讀操作打斷,那么讀到的將是緩存中舊的庫存,數據庫已經是新庫存,此時會出現不一致;
為了解決這種問題,應先更新數據庫后,立即刪除緩存,這也是上一篇分布式緩存--緩存與數據庫一致性方案中極力推薦的cache aside pattern
(旁路緩存)的經典模式。
更新數據的時候,根據數據的唯一標識,將操作路由之后,發送到一個jvm內部的內存隊列中,同時刪除緩存。
讀取數據的時候,那么將重新讀取數據,並更新緩存的操作,根據唯一標識路由之后,也發送同一個jvm內部的隊列中。
一個隊列對應一個工作線程,每個工作線程串行拿到對應的操作,然后一條一條的執行,這樣的話,一個數據變更的操作先執行,刪除緩存。如果一個讀請求過來,讀到了空的緩存,就從數據庫將更新后的值加載到緩存。如果並發高的情況下,會出現多個讀操作並發的讀數據庫並加載緩存,可以先將緩存更新的請求發送到隊列中,此時會在隊列中積壓,然后同步等待緩存更新完成。
這里有一個優化點,一個隊列中,其實多個更新緩存請求串在一起是沒意義的,因此可以做過濾,如果發現隊列中已經有一個更新緩存的請求了,那么就不用再放個更新請求操作進去了,直接等待前面的更新操作請求完成即可;
如果請求還在等待時間范圍內,不斷輪詢發現可以取到值了,那么就直接返回; 如果請求等待的時間超過一定時長,那么直接嘗試從數據庫中讀取數據,並寫入緩存。
實現代碼如下:
step1
: 注冊監聽器,初始化工作線程池和內存隊列
在SpringBoot的啟動類中注冊如下監聽器類InitListener
@EnableAutoConfiguration
@SpringBootApplication
@ComponentScan
@MapperScan("com.roncoo.eshop.inventory.mapper")
public class Application {
/**
* 注冊監聽器
* @return
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean() {
ServletListenerRegistrationBean servletListenerRegistrationBean =
new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
監聽器的實現類如下如下:
/**
* 系統初始化監聽器
*
*/
public class InitListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
// 初始化工作線程池和內存隊列
RequestProcessorThreadPool.init();
}
}
請求處理線程池的類如下,完成線程池與內存隊列的初始化:
/**
* 請求處理線程池:單例
*
*/
public class RequestProcessorThreadPool {
/**
* 線程池
*/
private ExecutorService threadPool = Executors.newFixedThreadPool(10);
public RequestProcessorThreadPool() {
RequestQueue requestQueue = RequestQueue.getInstance();
for(int i = 0; i < 10; i++) {
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
requestQueue.addQueue(queue);
threadPool.submit(new RequestProcessorThread(queue));
}
}
/**
*
* 靜態內部類的方式,去初始化單例
*
*
*/
private static class Singleton {
private static RequestProcessorThreadPool instance;
static {
instance = new RequestProcessorThreadPool();
}
public static RequestProcessorThreadPool getInstance() {
return instance;
}
}
public static RequestProcessorThreadPool getInstance() {
return Singleton.getInstance();
}
/**
* 初始化方法
*/
public static void init() {
getInstance();
}
}
請求內存隊列
/**
* 請求內存隊列
*
*/
public class RequestQueue {
/**
* 內存隊列
*/
private List<ArrayBlockingQueue<Request>> queues =
new ArrayList<ArrayBlockingQueue<Request>>();
/**
* 標識位map
*/
private Map<Integer, Boolean> flagMap = new ConcurrentHashMap<Integer, Boolean>();
/**
*
* 靜態內部類的方式,去初始化單例
*
*
*/
private static class Singleton {
private static RequestQueue instance;
static {
instance = new RequestQueue();
}
public static RequestQueue getInstance() {
return instance;
}
}
public static RequestQueue getInstance() {
return Singleton.getInstance();
}
/**
* 添加一個內存隊列
* @param queue
*/
public void addQueue(ArrayBlockingQueue<Request> queue) {
this.queues.add(queue);
}
/**
* 獲取內存隊列的數量
* @return
*/
public int queueSize() {
return queues.size();
}
/**
* 獲取內存隊列
* @param index
* @return
*/
public ArrayBlockingQueue<Request> getQueue(int index) {
return queues.get(index);
}
public Map<Integer, Boolean> getFlagMap() {
return flagMap;
}
}
每個工作線程如下:
/**
* 執行請求的工作線程
*
*/
public class RequestProcessorThread implements Callable<Boolean> {
/**
* 自己監控的內存隊列
*/
private ArrayBlockingQueue<Request> queue;
public RequestProcessorThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
@Override
public Boolean call() throws Exception {
try {
while(true) {
// ArrayBlockingQueue,線程安全的內存隊列
// Blocking就是說明,如果隊列滿了,或者是空的,那么都會在執行操作的時候,阻塞住
Request request = queue.take();
boolean forceRfresh = request.isForceRefresh();
// 先做讀請求的去重
if(!forceRfresh) {
RequestQueue requestQueue = RequestQueue.getInstance();
Map<Integer, Boolean> flagMap = requestQueue.getFlagMap();
if(request instanceof ProductInventoryDBUpdateRequest) {
// 如果是一個更新數據庫的請求,那么就將那個productId對應的標識設置為true
flagMap.put(request.getProductId(), true);
} else if(request instanceof ProductInventoryCacheRefreshRequest) {
Boolean flag = flagMap.get(request.getProductId());
// 如果flag是null
if(flag == null) {
flagMap.put(request.getProductId(), false);
}
// 如果是緩存刷新的請求,那么就判斷,如果標識不為空,而且是true,就說明之前有一個這個商品的數據庫更新請求
if(flag != null && flag) {
flagMap.put(request.getProductId(), false);
}
// 如果是緩存刷新的請求,而且發現標識不為空,但是標識是false
// 說明前面已經有一個數據庫更新請求與一個緩存刷新請求了
if(flag != null && !flag) {
// 對於這種讀請求,直接就過濾掉,不要放到后面的內存隊列里面去了
return true;
}
}
}
System.out.println("===========日志===========: 工作線程處理請求,商品id=" + request.getProductId());
// 執行這個request操作
request.process();
}
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
}
step2
: 封裝兩種請求接口
/**
* 請求接口
*
*/
public interface Request {
void process();
Integer getProductId();
boolean isForceRefresh();
}
更新數據庫請求實現類
public class ProductInventoryDBUpdateRequest implements Request {
/**
* 商品庫存
*/
private ProductInventory productInventory;
/**
* 商品庫存Service
*/
private ProductInventoryService productInventoryService;
public ProductInventoryDBUpdateRequest(ProductInventory productInventory,
ProductInventoryService productInventoryService) {
this.productInventory = productInventory;
this.productInventoryService = productInventoryService;
}
@Override
public void process() {
System.out.println("===========日志===========: 數據庫更新請求開始執行,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());
// 修改數據庫中的庫存
productInventoryService.updateProductInventory(productInventory);
// 刪除redis中的緩存
productInventoryService.removeProductInventoryCache(productInventory);
}
/**
* 獲取商品id
*/
public Integer getProductId() {
return productInventory.getProductId();
}
@Override
public boolean isForceRefresh() {
return false;
}
}
更新緩存類請求類
/**
* 重新加載商品庫存的緩存
* @author Administrator
*
*/
public class ProductInventoryCacheRefreshRequest implements Request {
/**
* 商品id
*/
private Integer productId;
/**
* 商品庫存Service
*/
private ProductInventoryService productInventoryService;
/**
* 是否強制刷新緩存
*/
private boolean forceRefresh;
public ProductInventoryCacheRefreshRequest(Integer productId,
ProductInventoryService productInventoryService,
boolean forceRefresh) {
this.productId = productId;
this.productInventoryService = productInventoryService;
this.forceRefresh = forceRefresh;
}
@Override
public void process() {
// 從數據庫中查詢最新的商品庫存數量
ProductInventory productInventory = productInventoryService.findProductInventory(productId);
System.out.println("===========日志===========: 已查詢到商品最新的庫存數量,商品id=" + productId + ", 商品庫存數量=" + productInventory.getInventoryCnt());
// 將最新的商品庫存數量,刷新到redis緩存中去
productInventoryService.setProductInventoryCache(productInventory);
}
public Integer getProductId() {
return productId;
}
public boolean isForceRefresh() {
return forceRefresh;
}
}
step3
: 請求異步執行Service封裝
/**
* 請求異步執行的service
*
*/
public interface RequestAsyncProcessService {
void process(Request request);
}
實現類:
/**
* 請求異步處理的service實現
* @author Administrator
*
*/
@Service("requestAsyncProcessService")
public class RequestAsyncProcessServiceImpl implements RequestAsyncProcessService {
@Override
public void process(Request request) {
try {
// 做請求的路由,根據每個請求的商品id,路由到對應的內存隊列中去
ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
// 將請求放入對應的隊列中,完成路由操作
queue.put(request);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 獲取路由到的內存隊列
* @param productId 商品id
* @return 內存隊列
*/
private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId) {
RequestQueue requestQueue = RequestQueue.getInstance();
// 先獲取productId的hash值
String key = String.valueOf(productId);
int h;
int hash = (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
// 對hash值取模,將hash值路由到指定的內存隊列中,比如內存隊列大小8
// 用內存隊列的數量對hash值取模之后,結果一定是在0~7之間
// 所以任何一個商品id都會被固定路由到同樣的一個內存隊列中去的
int index = (requestQueue.queueSize() - 1) & hash;
System.out.println("===========日志===========: 路由內存隊列,商品id=" + productId + ", 隊列索引=" + index);
return requestQueue.getQueue(index);
}
}
step4
: 兩種請求controller封裝
@Controller
public class ProductInventoryController {
@Resource
private RequestAsyncProcessService requestAsyncProcessService;
@Resource
private ProductInventoryService productInventoryService;
/**
* 更新商品庫存
*/
@RequestMapping("/updateProductInventory")
@ResponseBody
public Response updateProductInventory(ProductInventory productInventory) {
System.out.println("===========日志===========: 接收到更新商品庫存的請求,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());
Response response = null;
try {
Request request = new ProductInventoryDBUpdateRequest(
productInventory, productInventoryService);
requestAsyncProcessService.process(request);
response = new Response(Response.SUCCESS);
} catch (Exception e) {
e.printStackTrace();
response = new Response(Response.FAILURE);
}
return response;
}
/**
* 獲取商品庫存
*/
@RequestMapping("/getProductInventory")
@ResponseBody
public ProductInventory getProductInventory(Integer productId) {
System.out.println("===========日志===========: 接收到一個商品庫存的讀請求,商品id=" + productId);
ProductInventory productInventory = null;
try {
Request request = new ProductInventoryCacheRefreshRequest(
productId, productInventoryService, false);
requestAsyncProcessService.process(request);
// 將請求扔給service異步去處理以后,就需要while(true)一會兒,在這里hang住
// 去嘗試等待前面有商品庫存更新的操作,同時緩存刷新的操作,將最新的數據刷新到緩存中
long startTime = System.currentTimeMillis();
long endTime = 0L;
long waitTime = 0L;
// 等待超過200ms沒有從緩存中獲取到結果
while(true) {
// 一般公司里面,面向用戶的讀請求控制在200ms就可以了
if(waitTime > 200) {
break;
}
// 嘗試去redis中讀取一次商品庫存的緩存數據
productInventory = productInventoryService.getProductInventoryCache(productId);
// 如果讀取到了結果,那么就返回
if(productInventory != null) {
System.out.println("===========日志===========: 在200ms內讀取到了redis中的庫存緩存,商品id=" + productInventory.getProductId() + ", 商品庫存數量=" + productInventory.getInventoryCnt());
return productInventory;
}
// 如果沒有讀取到結果,那么等待一段時間
else {
Thread.sleep(20);
endTime = System.currentTimeMillis();
waitTime = endTime - startTime;
}
}
// 直接嘗試從數據庫中讀取數據
productInventory = productInventoryService.findProductInventory(productId);
if(productInventory != null) {
// 將緩存刷新一下
// 這個過程,實際上是一個讀操作的過程,但是沒有放在隊列中串行去處理,還是有數據不一致的問題
request = new ProductInventoryCacheRefreshRequest(
productId, productInventoryService, true);
requestAsyncProcessService.process(request);
// 代碼會運行到這里,只有三種情況:
// 1、就是說,上一次也是讀請求,數據刷入了redis,但是redis LRU算法給清理掉了,標志位還是false
// 所以此時下一個讀請求是從緩存中拿不到數據的,再放一個讀Request進隊列,讓數據去刷新一下
// 2、可能在200ms內,就是讀請求在隊列中一直積壓着,沒有等待到它執行
// 所以就直接查一次庫,然后給隊列里塞進去一個刷新緩存的請求
// 3、數據庫里本身就沒有,緩存穿透,穿透redis,請求到達mysql庫
return productInventory;
}
} catch (Exception e) {
e.printStackTrace();
}
return new ProductInventory(productId, -1L);
}
}
上述實現過程中有兩個優化點
優化點1
: 讀請求去重優化
因為那個寫請求肯定會更新數據庫,然后那個讀請求肯定會從數據庫中讀取最新數據,然后刷新到緩存中,自己只要hang一會兒就可以從緩存中讀到數據了。
優化點2
: 空數據讀請求過濾優化
可能某個數據,在數據庫里面壓根兒就沒有,那么那個讀請求是不需要放入內存隊列的,而且讀請求在controller那一層,直接就可以返回了,不需要等待。
如果緩存里沒數據,就兩個情況,第一個是數據庫里就沒數據,緩存肯定也沒數據;第二個是數據庫更新操作過來了,先刪除了緩存,此時緩存是空的,但是數據庫里是有的。我們做了之前的讀請求去重優化,用了一個flag map,只要前面有數據庫更新操作,flag就肯定是存在的,你只不過可以根據true或false,判斷你前面執行的是寫請求還是讀請求。但是如果flag壓根兒就沒有呢,就說明這個數據,無論是寫請求,還是讀請求,都沒有過。那這個時候過來的讀請求,發現flag是null,就可以認為數據庫里肯定也是空的,那就不會去讀取了。或者說,我們也可以認為每個商品有一個最初始的庫存,但是因為最初始的庫存肯定會同步到緩存中去的,有一種特殊的情況,就是說,商品庫存本來在redis中是有緩存的,但是因為redis內存滿了,就給干掉了,但是此時數據庫中是有值的,那么在這種情況下,可能就是之前沒有任何的寫請求和讀請求的flag的值,此時還是需要從數據庫中重新加載一次數據到緩存中的。
3.2 方案改進
上述方案是筆者的朋友在互聯網大廠的經驗總結,在思路上是沒有問題的,但是在工業級項目的落地過程中,會有不少問題。
比如機器突然掛了,那內存隊列就會丟數據
,再比如,如果並發讀的數量很大,那么內存隊列積壓
的數據為會越來越多,導致后面的請求也有可能hang在那很長時間,一直讀不到數據。
問題1: 如果內存隊列丟數據,怎么辦?
這種情況比較常見,比如機器突然掛了,內存隊列數據丟了,該如何處理?首先明確一點,這里的請求都是同步阻塞
的,如果業務系統掛了,那上游的路由網關會出現請求異常或者超時,外部系統或者外部用戶請求也會異常或者超時,那調用端會重試請求,直到機器重啟ok,請求會再次進隊列,數據只不過重新進入隊列。
問題2:數據積壓如何處理?
這里確實會存在內存隊列積壓大量的讀請求,導致后續的請求hang在那幾秒甚至十幾秒都沒有得到處理。
問題3:業務系統需要完成強一致的需求,需要引入內存隊列,路由網關,導致大量的開發成本,並且稍微控制不好,就會出現隱藏的bug。
針對以上問題,作如下改進:
改進點1:封裝緩存代理客戶端與緩存服務端,引入RocketMQ,將消息寫入帶上時間戳版本。
封裝緩存客戶端,一方面是省去路由網關,另一方面是充當消息寫入的角色。RocketMQ替換原來的內存隊列,因為消息本身按照key分區寫入,就能保證相同的key會寫到相同的分區隊列里面。然后換成代理服務端按照消息有序消費,再寫入緩存集群,架構設計如下:
上圖中,首先事務寫入保證消息不會丟,其次寫入時帶上時間戳作為版本號,防止讀取的舊值后寫入 ,更新的新值先寫入,當寫入緩存集群中時,比較時間戳是否是較新的,防止舊值覆蓋新值。
改進點2:解決MQ吞吐問量問題,緩存代理服務端使用內存隊列。
針對改點1中的情形,為保證消息絕對有序,只有一個線程消費MQ中一個分區的消息,再寫入緩存,會帶來吞吐量的下降,因此在緩存代理服務端使用多個內存隊列,讓多個線程依次消費多個隊列,增加吞吐量。
改進點3: 增加手動ack,增加消息進入重試隊列與死信隊列的機制。
如果Redis緩存掛了,此時需要通過更新緩存后拿到結果,並手動通知ack到消息中間件,確保消息消費者處理完后,才丟棄該消息,防止消息在消費者端丟失,時間戳來保證更新緩存冪等性,此外一直更新緩存失敗的消息進入消息中間件進入重試隊列死信隊列,待下次發消息后再消費。
3.3 終極方案
通過上述改進,一套思想完備,可落地到生產級的方案基本完成,有人會說這不是分布式鎖
的思想么?說對了,多年前,分布式鎖還沒有發展成熟的時候,就是通過類似的這種消息正中間將寫入分區的操作串行化,進行消費,再通過冪等性保證最終寫入不會被亂序覆蓋,現在分布式鎖的實現已經比較成熟,完全可以用分布式鎖來解決,比如用Redis的提供的客戶端Redission來實現,不但簡化流程,而且保證只有搶到鎖的線程才可以更新數據庫與緩存,再釋放鎖,當然加鎖與釋放鎖的占用時間也是較快的,因為更新數據與寫一條緩存也就幾毫秒或者是十幾毫秒的時間,可以保證后續更新的操作在很快時間內可以再次搶到鎖。
4. 總結
本篇講述了數據庫與緩存雙寫在強一致下的實現思路與方案,從一開始的方案設計到落地,再到落地后的優化改進,最后到比較可行又簡單的方式,你會發現,好的架構不是一步到位,而是逐步演進
而來;其次幾年前的方案,也許比較合適
,但是現在看起來就會顯得過於復雜,原因是解決特定問題的專業技術已經出現,專門解決了該類問題,最終通過重構來解決之前過於復雜又容易出問題的方案,由此也不難發現,好的架構是比較簡單
的。所以我也比較贊同原阿里P9李運華在極客時間的架構專欄中提到的架構三原則
,即合適,簡單,演進
,就是在特定的場景下做適合該場景的合適架構,並且盡可能設計簡單,並通過不斷演進,來優化之前方案中的缺陷。