對於Guava Cache本身就不多做介紹了,一個非常好用的本地cache lib,可以完全取代自己手動維護ConcurrentHashMap。
背景
目前需要開發一個接口I,對性能要求有非常高的要求,TP99.9在20ms以內。初步開發后發現耗時完全無法滿足,mysql稍微波動就超時了。
主要耗時在DB讀取,請求一次接口會讀取幾次配置表Entry表。而Entry表的信息更新又不頻繁,對實時性要求不高,所以想到了對DB做一個cache,理論上就可以大幅度提升接口性能了。
DB表結構(這里的代碼都是為了演示,不過原理、流程和實際生產環境基本是一致的)
CREATE TABLE `entry` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` int(11) NOT NULL, `value` varchar(50) NOT NULL DEFAULT '', PRIMARY KEY (`id`), UNIQUE KEY `unique_name` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
接口中的查詢是根據name進行select操作,這次的目的就是設計一個cache類,將DB查詢cache化。
基礎使用
首先,自然而然的想到了最基本的guava cache的使用,如下:
@Slf4j @Component public class EntryCache { @Autowired EntryMapper entryMapper; /** * guava cache 緩存實體 */ LoadingCache<String, Entry> cache = CacheBuilder.newBuilder() // 緩存刷新時間 .refreshAfterWrite(10, TimeUnit.MINUTES) // 設置緩存個數 .maximumSize(500) .build(new CacheLoader<String, Entry>() { @Override // 當本地緩存命沒有中時,調用load方法獲取結果並將結果緩存 public Entry load(String appKey) { return getEntryFromDB(appKey); } // 數據庫進行查詢 private Entry getEntryFromDB(String name) { log.info("load entry info from db!entry:{}", name); return entryMapper.selectByName(name); } }); /** * 對外暴露的方法 * 從緩存中取entry,沒取到就走數據庫 */ public Entry getEntry(String name) throws ExecutionException { return cache.get(name); } }
這里用了refreshAfterWrite,和expireAfterWrite區別是expireAfterWrite到期會直接刪除緩存,如果同時多個並發請求過來,這些請求都會重新去讀取DB來刷新緩存。DB速度較慢,會造成線程短暫的阻塞(相對於讀cache)。
而refreshAfterWrite,則不會刪除cache,而是只有一個請求線程會去真實的讀取DB,其他請求直接返回老值。這樣可以避免同時過期時大量請求被阻塞,提升性能。
但是還有一個問題,那就是更新線程還是會被阻塞,這樣在緩存key集體過期時,可能還會使響應時間變得不滿足要求。
后台線程刷新
就像上面所說,只要刷新緩存,就必然有線程被阻塞,這個是無法避免的。
雖然無法避免線程阻塞,但是我們可以避免阻塞用戶線程,讓用戶無感知即可。
所以,我們可以把刷新線程放到后台執行。當key過期時,有新用戶線程讀取cache時,開啟一個新線程去load DB的數據,用戶線程直接返回老的值,這樣就解決了這個問題。
代碼修改如下:
@Slf4j @Component public class EntryCache { @Autowired EntryMapper entryMapper; ListeningExecutorService backgroundRefreshPools = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())); /** * guava cache 緩存實體 */ LoadingCache<String, Entry> cache = CacheBuilder.newBuilder() // 緩存刷新時間 .refreshAfterWrite(10, TimeUnit.MINUTES) // 設置緩存個數 .maximumSize(500) .build(new CacheLoader<String, Entry>() { @Override // 當本地緩存命沒有中時,調用load方法獲取結果並將結果緩存 public Entry load(String appKey) { return getEntryFromDB(appKey); } @Override // 刷新時,開啟一個新線程異步刷新,老請求直接返回舊值,防止耗時過長 public ListenableFuture<Entry> reload(String key, Entry oldValue) throws Exception { return backgroundRefreshPools.submit(() -> getEntryFromDB(key)); } // 數據庫進行查詢 private Entry getEntryFromDB(String name) { log.info("load entry info from db!entry:{}", name); return entryMapper.selectByName(name); } }); /** * 對外暴露的方法 * 從緩存中取entry,沒取到就走數據庫 */ public Entry getEntry(String name) throws ExecutionException { return cache.get(name); } /** * 銷毀時關閉線程池 */ @PreDestroy public void destroy(){ try { backgroundRefreshPools.shutdown(); } catch (Exception e){ log.error("thread pool showdown error!e:{}",e.getMessage()); } } }
改動就是新添加了一個backgroundRefreshPools線程池,重寫了一個reload方法。
ListeningExecutorService是guava的concurrent包里的類,負責一些線程池相關的工作,感興趣的可以自己去了解一下。
在reload方法里提交一個新的線程,就可以用這個線程來刷新cache了。
如果刷新cache沒有完成的時候有其他線程來請求該key,則會直接返回老值。
同時,千萬不要忘記銷毀線程池。
初始化問題
上面兩步達到了不阻塞刷新cache的功能,但是這個前提是這些cache已經存在。
項目剛剛啟動的時候,所有的cache都是不存在的,這個時候如果大批量請求過來,同樣會被阻塞,因為沒有老的值供返回,都得等待cache的第一次load完畢。
解決這個問題的方法就是在項目啟動的過程中,將所有的cache預先load過來,這樣用戶請求剛到服務器時就會直接讀cache,不用等待。
@Slf4j @Component public class EntryCache { @Autowired EntryMapper entryMapper; ListeningExecutorService backgroundRefreshPools = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())); /** * guava cache 緩存實體 */ LoadingCache<String, Entry> cache = CacheBuilder.newBuilder() // 緩存刷新時間 .refreshAfterWrite(10, TimeUnit.MINUTES) // 設置緩存個數 .maximumSize(500) .build(new CacheLoader<String, Entry>() { @Override // 當本地緩存命沒有中時,調用load方法獲取結果並將結果緩存 public Entry load(String appKey) { return getEntryFromDB(appKey); } @Override // 刷新時,開啟一個新線程異步刷新,老請求直接返回舊值,防止耗時過長 public ListenableFuture<Entry> reload(String key, Entry oldValue) throws Exception { return backgroundRefreshPools.submit(() -> getEntryFromDB(key)); } // 數據庫進行查詢 private Entry getEntryFromDB(String name) { log.info("load entry info from db!entry:{}", name); return entryMapper.selectByName(name); } }); /** * 對外暴露的方法 * 從緩存中取entry,沒取到就走數據庫 */ public Entry getEntry(String name) throws ExecutionException { return cache.get(name); } /** * 銷毀時關閉線程池 */ @PreDestroy public void destroy(){ try { backgroundRefreshPools.shutdown(); } catch (Exception e){ log.error("thread pool showdown error!e:{}",e.getMessage()); } } @PostConstruct public void initCache() { log.info("init entry cache start!"); //讀取所有記錄 List<Entry> list = entryMapper.selectAll(); if (CollectionUtils.isEmpty(list)) { return; } for (Entry entry : list) { try { this.getEntry(entry.getName()); } catch (Exception e) { log.error("init cache error!,e:{}", e.getMessage()); } } log.info("init entry cache end!"); } }
結果
讓我們用數據看看這個cache類的表現:
200QPS,TP99.9是9ms,完美達標。
可以看出來,合理的使用緩存對接口性能還是有很大提升的。