在互聯網項目中,一般以堆內緩存的使用居多,無論是Guava,Memcache,還是JDK自帶的HashMap,ConcurrentHashMap等,都是在堆內內存中做數據計算操作。這樣做的好處顯而易見,用戶完全不必在意數據的分配,溢出,回收等操作,全部交由JVM來進行處理。由於JVM提供了諸多的垃圾回收算法,可以保證在不影響甚至微影響系統的前提下,做到堆內內存接近完美的管控。君不見,小如圖書管理這樣的系統,大如整個電商交易平台,都在JVM的加持下,服務於幾個,十幾個,乃至於上億用戶,而在這些系統中,堆內緩存組件所帶來的收益可是居功至偉。在自下而上的互聯網架構中,堆內緩存就像把衛這宮廷入口的劍士,神聖而庄嚴,真可謂誰敢橫刀立馬,唯我堆內緩存將軍。
堆內緩存的劣勢
但是,事物都是有兩面性的,堆內緩存在JVM的管理下,縱然無可挑剔,但是在GC過程中產生的程序小停頓和程序大停頓,則像一把利劍一樣,斬斷了對構造出完美高並發系統的念想。簡單的以HashMap這個JDK自帶的緩存組件為例,benchmark結果如下:
Benchmark Mode Cnt Score Error Units localCacheBenchmark.testlocalCacheSet thrpt 20 85056.759 ± 126702.544 ops/s
其插入速度最快為85056.759+126702.544=211759.303ops,最慢為0,也就是每秒插入速度最快為20w,最慢為0。之所以為0,是因為HashMap中的數據在快速的增長過程中,引起了頻繁的GC操作,為了給當前HashMap騰出足夠的空間進行插入操作,不得不釋放一些對象。頻繁的GC,勢必對插入速度有不小的影響,造成應用的偶爾性暫停。所以這也能解釋為啥最慢的時候,ops為0了。 同時從benchmark數據,我們可以看到誤差率為126702.544ops,比正常操作的85056.756要大很多,說明GC的影響,對HashMap的插入操作影響特別的大。
由於GC的存在,堆內緩存操作的ops會受到不小的影響,會造成原本小流量下10ms能夠完成的內存計算,大流量下500ms還未完成。如果內存計算過於龐雜,則造成整體流程的ops吞吐量降低,也是極有可能的事兒。所以從這里可以看出,堆內緩存組件,在高並發的壓力下,如果計算量巨大,尤其是寫操作巨大,使其不會成為護城的利劍,反而成了性能的幫凶,何其可懼。
堆外緩存的優勢
為了緩解在高並發,高寫入操作下,堆內緩存組件造成的頻繁GC問題,堆外緩存應運而生。從前面的描述我們知道,堆內緩存是受JVM管控的,所以我們不必擔心垃圾回收的問題。但是堆外緩存是不受JVM管控的,所以也不受GC的影響導致的應用暫停問題。但是由於堆外緩存的使用,是以byte數組來進行的,所以需要自己進行序列化反序列化操作。目前已知的知名開源項目中,netty4的buffer pool采用了堆外緩存實現,具體的比對信息可以參考此處,具體的比對信息截圖如下:
帶有Direct字眼的即為offheap堆外Buffer,x軸為分配的內存大小,Y軸為耗時。從上面可以看出,小塊內存分配,JVM要稍微優秀一點;但是大塊內存分配,明顯的堆外緩存要優秀一些。由於堆外Buffer操作不受GC影響,實際上性能更好一些。但是需要的垃圾回收管控也需要自己去做,要麻煩很多。
堆外緩存實現原理
說到堆外緩存實現原理,不可不提到sun.misc.Unsafe這個package包。此包提供了底層的Unsafe操作方法,讓我們可以直接在堆外內存做數據分配操作。由於是底層包,所以用戶層面很少用到,只是一些jdk里面的核心類庫會用到。其實例的初始化方式如下:
public static Unsafe getUnsafe() { Class cc = sun.reflect.Reflection.getCallerClass(2); if (cc.getClassLoader() != null) throw new SecurityException("Unsafe"); return theUnsafe; }
可以看出是一個單例模式。讓我們來嘗試使用一下(下面代碼是先分配了一個100bytes的空間,得到分配好的地址,然后在此地址里面放入1,最后將此地址里面的數據取出,打印出來):
long address = unsafe.allocateMemory(100); unsafe.putLong(address,1); System.out.println(unsafe.getLong(address));
但是在運行的過程中,我們卻遇到了如下的錯誤:
java.lang.SecurityException: Unsafe at sun.misc.Unsafe.getUnsafe(Unsafe.java:90) at UnsafeTest.testUnsafe(UnsafeTest.java:18) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ....... Process finished with exit code -1
可以看出,由於安全性的原因,我們是無法直接使用Unsafe的實例來進行數據操作的,主要原因是因為cc.getClassLoader()對theUnsafe實例做了過濾限制。但是我們可以直接用theUnsafe來實現,由於是private修飾,我們可以用反射來將private修飾改成public修飾,讓其暴露出來供我們使用:
Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); Unsafe unsafe = (Unsafe) f.get(null); long address = unsafe.allocateMemory(100); unsafe.putLong(address,1); System.out.println(unsafe.getLong(address));
這樣就可以了,能夠正確的獲取運行結果。從這里我們可以看出,堆外內存必須自己分配地址空間,那么對應的,自己需要控制好地址邊界,如果控制不好,經典的OOM Exception將會出現。這也是比堆內內存使用麻煩的地方。
上面的代碼展示,其實已經說明了Unsafe方法的基本使用方式。如果想查看更多的Unsafe實現方式,個人推薦可以看看Cassandra源碼中的中的Object mapper - Caffinitas里面關於Unsafe的實現。此類的名稱為Uns.java,由於類精簡,個人認為很值得一看,我貼出部分代碼來:
static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); unsafe = (Unsafe) field.get(null); if (unsafe.addressSize() > 8) throw new RuntimeException("Address size " + unsafe.addressSize() + " not supported yet (max 8 bytes)"); if (__DEBUG_OFF_HEAP_MEMORY_ACCESS) LOGGER.warn("Degraded performance due to off-heap memory allocations and access guarded by debug code enabled via system property " + OHCacheBuilder.SYSTEM_PROPERTY_PREFIX + "debugOffHeapAccess=true"); IAllocator alloc; String allocType = __ALLOCATOR != null ? __ALLOCATOR : "jna"; switch (allocType) { case "unsafe": alloc = new UnsafeAllocator(); LOGGER.info("OHC using sun.misc.Unsafe memory allocation"); break; case "jna": default: alloc = new JNANativeAllocator(); LOGGER.info("OHC using JNA OS native malloc/free"); } allocator = alloc; } catch (Exception e) { throw new AssertionError(e); } } 。。。。。。 static long getLongFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 8 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getLong(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); } static int getIntFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 4 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getInt(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); } static short getShortFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 2 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getShort(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); }
堆外緩存實現進階
寫到這里,原理什么的大概都懂了,我們准備進階一下,寫個基於Off-heap堆外緩存的Int數組,由於On-heap Array的空間請求分配到了堆上,所以這里自然而然的就把空間分配到了堆外。代碼如下:
public class OffheapIntArray { /** * 此list分配的地址 */ private long address; /** * 默認分配空間大小 */ private static final int defaultSize = 1024; /** * 帶參構造 * 由於Integer類型在java中占用4個字節,所以在分配地址的時候,一個integer,需要分配 4*8 = 32 bytes的空間 * @param size * @throws NoSuchFieldException * @throws IllegalAccessException */ public OffheapIntArray(Integer size) throws NoSuchFieldException, IllegalAccessException { if (size == null) { address = alloc(defaultSize * 4 * 8); } else { address = alloc(size * 4 * 8); } } public int get(int index) throws NoSuchFieldException, IllegalAccessException { return getUnsafe().getInt(address + index * 4 * 8); } public void set(int index, int value) throws NoSuchFieldException, IllegalAccessException { getUnsafe().putInt(address + index * 4 * 8, value); } private Unsafe getUnsafe() throws IllegalAccessException, NoSuchFieldException { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); return (Unsafe) f.get(null); } private long alloc(int size) throws NoSuchFieldException, IllegalAccessException { long address = getUnsafe().allocateMemory(size); return address; } public void free() throws NoSuchFieldException, IllegalAccessException { if (address == 0) { return; } getUnsafe().freeMemory(address); } }
我們來簡單的測試一下:
@Test public void testOffheap() throws NoSuchFieldException, IllegalAccessException { OffheapIntArray offheapArray = new OffheapIntArray(10); offheapArray.set(0,11111); offheapArray.set(1,1112); offheapArray.set(2,1113); offheapArray.set(3,1114); System.out.println(offheapArray.get(0)); System.out.println(offheapArray.get(1)); System.out.println(offheapArray.get(2)); System.out.println(offheapArray.get(3)); offheapArray.free(); }
輸出結果如下:
11111 1112 1113 1114
可以看到得到了正確的輸出結果。當然我這里只是簡單的模擬使用。具體的使用方式,推薦如下兩篇文章,可以對堆外內存的使用有更近一步的認識:
Java Magic. Part 4: sun.misc.Unsafe
堆外緩存組件實戰
知道了堆外緩存的簡單使用后,這里我們要更近一步,使用現有的堆外緩存組件到項目中。
目前在市面上,有諸多的緩存組件,比如mapdb,ohc,ehcache3等,但是由於ehcache3收費,所以這里不做討論,主要討論mapdb和ohc這兩個。我們先通過benchmark來篩選一下二者的性能差異,由於這兩個緩存組件提供的都是基於key-value模型的數據存儲,所以benchmark的指標有9個,分別是get,set方法,hget,hset方法(value存儲的是hashmap),sadd,smember方法(value存儲的是set),zadd,zrange方法(value存儲的是treeset)。
benchmark結果如下:
Benchmark Mode Cnt Score Error Units OffheapCacheBenchmark.testMapdbGet thrpt 20 69699.610 ± 4578.888 ops/s OffheapCacheBenchmark.testMapdbHGet thrpt 20 63663.523 ± 3595.413 ops/s OffheapCacheBenchmark.testMapdbHGetAll thrpt 20 64235.582 ± 4009.039 ops/s OffheapCacheBenchmark.testMapdbHSet thrpt 20 25777.077 ± 480.461 ops/s OffheapCacheBenchmark.testMapdbSAdd thrpt 20 335.973 ± 39.353 ops/s OffheapCacheBenchmark.testMapdbSet thrpt 20 39417.070 ± 830.689 ops/s OffheapCacheBenchmark.testMapdbSmember thrpt 20 67432.314 ± 2799.983 ops/s OffheapCacheBenchmark.testMapdbZAdd thrpt 20 21220.595 ± 1128.103 ops/s OffheapCacheBenchmark.testMapdbZRange thrpt 20 45425.162 ± 4533.071 ops/s Benchmark Mode Cnt Score Error Units OhcheapOHCBenchmark.testOhcGet thrpt 20 1196976.452 ± 27291.669 ops/s OhcheapOHCBenchmark.testOhcHGet thrpt 20 348383.355 ± 23304.696 ops/s OhcheapOHCBenchmark.testOhcHGetAll thrpt 20 350798.417 ± 11870.685 ops/s OhcheapOHCBenchmark.testOhcHSet thrpt 20 349370.322 ± 8619.813 ops/s OhcheapOHCBenchmark.testOhcSAdd thrpt 20 11700.160 ± 611.794 ops/s OhcheapOHCBenchmark.testOhcSet thrpt 20 538314.544 ± 132111.037 ops/s OhcheapOHCBenchmark.testOhcSmember thrpt 20 458817.772 ± 15817.159 ops/s OhcheapOHCBenchmark.testOhcZAdd thrpt 20 323979.906 ± 9842.344 ops/s OhcheapOHCBenchmark.testOhcZRange thrpt 20 192776.479 ± 12988.484 ops/s
從上面的結果可以看出,ohc屬於性能怪獸類型,性能十倍於mapdb。而且由於ohc本身支持entry過期,但是mapdb不支持。所以這里綜合一下,選擇ohc作為我們的堆外緩存組件。需要說明一下的是,在我進行benchmark測試過程中,堆外緩存中會進行大量的數據讀寫操作,但是這些讀寫ops整體非常平穩,從error和score的對比就可以看出。不會出現應用暫停的情況。說明GC對堆外緩存的影響是非常小的。
整體類結構圖如下(考慮到擴展性,暫時將mapdb加入到了結構圖中):
從整體的類組織結構圖看來,使用了策略模式+模板模式組合的方式來進行。 屏蔽不同cache底層接口的不一致性,用的是策略模式;為不同的堆外緩存組件提供一致的操作方法用的是模板模式。組合起來使用就使得開發和擴展顯得非常容易。
部分類的封裝方式如下:
public class OhcCacheStrategy implements CacheStrategy { /** * 日志 */ private static Logger logger = LoggerFactory.getLogger(OhcCacheStrategy.class); /** * 緩存組件 */ public OHCache<byte[], byte[]> dataCache; /** * 過期時間組件 */ public OHCache<byte[], byte[]> expireCache; /** * 緩存table最大容量 */ private long level2cacheMax = 1024000L; /** * 鎖 */ private final Object lock = new Object(); /** * 鍵過期回調 */ public ExpirekeyAction expirekeyAction; /** * db引擎初始化 */ @PostConstruct public void initOhcEngine() { try { dataCache = OHCacheBuilder.<byte[], byte[]>newBuilder() .keySerializer(new OhcSerializer()) .valueSerializer(new OhcSerializer()) .segmentCount(2 * 4) .hashTableSize((int) level2cacheMax / 102400) .capacity(2 * 1024 * 1024 * 1024L) .defaultTTLmillis(OffheapCacheConst.EXPIRE_DEFAULT_SECONDS * 1000) .timeouts(true) .timeoutsSlots(64) .timeoutsPrecision(512) .eviction(Eviction.LRU) .build(); logger.error("ohc data cache init ok..."); expireCache = OHCacheBuilder.<byte[], byte[]>newBuilder() .keySerializer(new OhcSerializer()) .valueSerializer(new OhcSerializer()) .segmentCount(1) .hashTableSize((int) level2cacheMax / 102400) .capacity(2 * 1024 * 1024 * 1024L) .defaultTTLmillis(OffheapCacheConst.EXPIRE_DEFAULT_SECONDS * 1000) .timeouts(true) .timeoutsSlots(64) .timeoutsPrecision(512) .eviction(Eviction.NONE) .build(); logger.error("ohc expire cache init ok..."); } catch (Exception ex) { logger.error(OffheapCacheConst.PACKAGE_CONTAINER_OHC + OffheapCacheConst.ENGINE_INIT_FAIL, ex); AlarmUtil.alarm(OffheapCacheConst.PACKAGE_CONTAINER_OHC + OffheapCacheConst.ENGINE_INIT_FAIL, ex.getMessage()); throw ex; } } @Override public <T> boolean putEntry(String key, T entry, long expireAt) { synchronized (lock) { byte[] entryKey = SerializationUtils.serialize(key); byte[] entryVal = SerializationUtils.serialize((Serializable) entry); //緩存數據入庫 if (dataCache.put(entryKey, entryVal, expireAt)) { //過期時間入庫 putExpire(key, expireAt); //返回執行結果 return true; } return false; } } @Override public <T> T queryEntry(String key) { byte[] result = dataCache.get(SerializationUtils.serialize(key)); if (result == null) { return null; } return SerializationUtils.deserialize(result); } @Override public long queryExpireTime(String key) { byte[] entryKey = SerializationUtils.serialize(key); return expireCache.get(entryKey) == null ? 0 : SerializationUtils.deserialize(expireCache.get(entryKey)); } @Override public boolean removeEntry(String key) { byte[] entryKey = SerializationUtils.serialize(key); if (dataCache.remove(entryKey)) { removeExpire(key); return true; } return false; } @Override public boolean removeAll() { Iterable<byte[]> dataKey = () -> dataCache.keyIterator(); dataCache.removeAll(dataKey); Iterable<byte[]> expireKey = () -> expireCache.keyIterator(); expireCache.removeAll(expireKey); return true; } @Override public List<String> queryKeys() { List<String> list = new ArrayList<>(); Iterator<byte[]> iterator = expireCache.keyIterator(); while (iterator.hasNext()) { list.add(SerializationUtils.deserialize(iterator.next())); } return list; } /** * key過期時間同步入庫 * * @param key * @param expireAt */ private void putExpire(String key, long expireAt) { try { expireCache.put(SerializationUtils.serialize(key), SerializationUtils.serialize(expireAt)); } catch (Exception ex) { logger.error("key[" + key + "]過期時間入庫失敗..."); } } /** * 同步清理過期鍵 * * @param key */ private void removeExpire(String key) { try { if (expireCache.remove(SerializationUtils.serialize(key))) { if (expirekeyAction != null) { expirekeyAction.keyExpiredNotification(key); } } } catch (Exception ex) { logger.error("key[" + key + "]過期時間清除失敗..."); } } }
上面這個類是堆外緩存的核心策略類。所有其他的數據模型讀寫操作都可以依據此類來擴展,比如類似redis的sortedset,value可以存儲一個Treeset即可。需要說明一下,上面代碼中,dataCache主要用於存儲數據部分,expireCache主要用於存儲鍵過期時間。以便於可以實現鍵主動過期和被動過期功能。用戶添加刪除鍵的時候,會同步刪除expireCache中的鍵,以便於二者能夠統一。由於ohc本身並未實現keyExpireCallback,所以這里我實現了這個功能,只要有鍵被移除(主動刪除還是被動刪除,都會觸發通知),就會通知用戶,用戶可以按照如下方式使用:
@PostConstruct public void Init() { ohcCacheTemplate.registerExpireKeyAction(key -> { logger.error("key " + key + " expired..."); }); }
鍵被動過期功能,模仿了redis的鍵被動驅逐方式,實現如下:
public class OffheapCacheWorker { /** * 帶參注入 * * @param cacheStrategy */ public OffheapCacheWorker(CacheStrategy cacheStrategy) { this.cacheStrategy = cacheStrategy; this.offheapCacheHelper = new OffheapCacheHelper(); } /** * 日志 */ private static Logger logger = LoggerFactory.getLogger(OffheapCacheWorker.class); /** * 緩存幫助類 */ private OffheapCacheHelper offheapCacheHelper; /** * 緩存構建器 */ private CacheStrategy cacheStrategy; /** * 過期key檢測線程 */ private Thread expireCheckThread; /** * 線程狀態 */ private volatile boolean started; /** * 線程開啟 * * @throws IOException */ public synchronized void start() { if (started) { return; } expireCheckThread = new Thread("expire key check thread") { @Override public void run() { logger.error("expire key check thread start..."); while (!Thread.currentThread().isInterrupted()) { try { processLoop(); } catch (RuntimeException suppress) { logger.error("Thread `" + getName() + "` occured a error, suppressed.", suppress); throw suppress; } catch (Exception exception) { logger.error("Thread `" + getName() + "` occured a error, exception.", exception); } } logger.info("Thread `{}` was stopped normally.", getName()); } }; expireCheckThread.start(); started = true; } /** * 線程停止 * * @throws IOException */ public synchronized void stop() throws IOException { started = false; if (expireCheckThread != null) { expireCheckThread.interrupt(); } } /** * 過期鍵驅逐 * 模仿的redis鍵過期機制
*/ private void processLoop() throws InterruptedException { //每次采集樣本數 int sampleCheckNumber = 20; //過期key計數 int sampleExpiredCount = 0; //抽樣次數迭代 int sampleCheckIteration = 0; //緩存的key List<String> keys = cacheStrategy.queryKeys(); //抽樣開始時間 long start = System.currentTimeMillis(); //循環開始 do { //鍵數量 long expireContainerSize = keys.size(); //默認為鍵數量 long loopCheckNumber = expireContainerSize; //每次檢查的鍵數量,如果超過樣本數,則以樣本數為准 if (loopCheckNumber > sampleCheckNumber) { loopCheckNumber = sampleCheckNumber; } //開始檢測 while (loopCheckNumber-- > 0) { //取隨機下標 int rndNum = offheapCacheHelper.getRandomNumber(toIntExact(expireContainerSize) + 1); //取隨機鍵 String rndKey = keys.get(rndNum); //獲取過期時間 long expireTime = cacheStrategy.queryExpireTime(rndKey); //過期時間比對 if (expireTime <= System.currentTimeMillis()) { //鍵驅逐 boolean result = cacheStrategy.removeEntry(rndKey); if (result) { expireContainerSize--; sampleExpiredCount++; } } } //抽樣次數遞增 sampleCheckIteration++; //抽樣達到16次(16的倍數,&0xf都為0)且本批次耗時超過0.5秒,將退出,避免阻塞正常業務操作 if ((sampleCheckIteration % 16) == 0 && (System.currentTimeMillis() - start) > 300) { logger.error("清理數據庫過期鍵操作耗時過長,退出,預備重新開始..."); return; } } while (sampleExpiredCount > sampleCheckNumber / 4); Thread.sleep(1500); } }
鍵被動驅逐,會隨機抽取20個key檢測,如果過期鍵小於5個,則直接進行下一次抽樣。否則將進行鍵驅逐操作。一旦抽樣次數達到限定次數且鍵驅逐耗時過長,為了不影響業務,將會退出本次循環,繼續下一次循環操作。此worker在后台運行,實測6W個過期key一起過期,cpu占用控制在10%,60w個過期key基本上一起過期,cpu占用控制在60%左右。達到預期效果。在大量的讀寫操作過程中,可以看到堆內內存幾乎沒有變化。
寫到最后,上面就是這次我要介紹的堆外緩存的整體內容了,從Unsafe講到原理,從實現講到ohc,希望大家能夠提出更好的東西來,多謝。