更多MyCat源碼分析,請戳MyCat源碼分析系列
BufferPool
MyCat的緩沖區采用的是java.nio.ByteBuffer,由BufferPool類統一管理,相關的設置在SystemConfig中。先明確一下相關概念和配置:
- 每個Buffer單元稱之為一個chunk,默認chunk的大小(DEFAULT_BUFFER_CHUNK_SIZE)為4096字節
- BufferPool的總大小為DEFAULT_BUFFER_CHUNK_SIZE * processors * 1000,其中processors為處理器數量(Runtime.getRuntime().availableProcessors())
- 緩沖區有兩種類型:本地緩存線程緩沖區和其他緩沖區,其中本地緩存線程指的是線程名以"$_"開頭的線程(坦白說,我並不清楚這類線程是如何產生的,求各位指導)
BufferPool中核心的變量如下:
private final ThreadLocalBufferPool localBufferPool; private final int chunkSize; private final ConcurrentLinkedQueue<ByteBuffer> items = new ConcurrentLinkedQueue<ByteBuffer>(); private final long threadLocalCount; private final long capactiy;
這些變量代表的含義分別如下:
- chunkSize:每個chunk的大小
- capacity:chunk的個數,計算方式為BufferPool的總大小/chunkSize
- items:ByteBuffer隊列,初始大小為capacity,其中每個ByteBuffer由ByteBuffer.allocateDirect(chunkSize)創建
- threadLocalCount:本地線程數量,由capacity的某個比例計算得出,看起來相當於每個處理器分到的chunk個數
- localBufferPool:本地線程緩沖區,類型為繼承了ThreadLocal<BufferQueue>的ThreadLocalBufferPool,BufferQueue中包含了類似的ByteBuffer鏈表items,其容量固定為threadLocalCount
接下來重點介紹分配Buffer和回收Buffer的過程。
1. 分配Buffer
分配Buffer時可以指定Buffer的大小,也可缺省該值,分別對應兩個方法public ByteBuffer allocate(int size)和public ByteBuffer allocate(),實現如下:
public ByteBuffer allocate(int size) { if (size <= this.chunkSize) { return allocate(); } else { LOGGER.warn("allocate buffer size large than default chunksize:" + this.chunkSize + " he want " + size); return createTempBuffer(size); } } public ByteBuffer allocate() { ByteBuffer node = null; if (isLocalCacheThread()) { // allocate from threadlocal node = localBufferPool.get().poll(); if (node != null) { return node; } } node = items.poll(); if (node == null) { //newCreated++; newCreated.incrementAndGet(); node = this.createDirectBuffer(chunkSize); } return node; }
- allocate():當執行線程為本地緩存線程時(isLocalCacheThread()返回true),先嘗試從localBufferPool中獲取一個可用的ByteBuffer;反之,從items中獲取一個可用的ByteBuffer,若還是失敗,則調用createDirectBuffer(size)創建新的ByteBuffer
private ByteBuffer createDirectBuffer(int size) { // for performance return ByteBuffer.allocateDirect(size); }
- allocate(size):如果用戶指定的size不大於chunkSize,則調用allocate()進行分配;反之則調用createTempBuffer(size)創建臨時緩沖區,代碼如下:
private ByteBuffer createTempBuffer(int size) { return ByteBuffer.allocate(size); }
2. 回收Buffer
回收Buffer時調用方法recycle(),相關代碼如下:
public void recycle(ByteBuffer buffer) { if (!checkValidBuffer(buffer)) { return; } if (isLocalCacheThread()) { BufferQueue localQueue = localBufferPool.get(); if (localQueue.snapshotSize() < threadLocalCount) { localQueue.put(buffer); } else { // recyle 3/4 thread local buffer items.addAll(localQueue.removeItems(threadLocalCount * 3 / 4)); items.offer(buffer); sharedOptsCount++; } } else { sharedOptsCount++; items.offer(buffer); } } private boolean checkValidBuffer(ByteBuffer buffer) { // 拒絕回收null和容量大於chunkSize的緩存 if (buffer == null || !buffer.isDirect()) { return false; } else if (buffer.capacity() > chunkSize) { LOGGER.warn("cant' recycle a buffer large than my pool chunksize " + buffer.capacity()); return false; } totalCounts++; totalBytes += buffer.limit(); buffer.clear(); return true; }
首先調用checkValidBuffer()進行Buffer的有效性檢測,該檢測的目的是判斷Buffer是否滿足被回收(后續重用)的條件,以下3種情況不符合:
- Buffer為null
- Buffer不是Direct Buffer,即在分配時是通過createTempBuffer()創建出來的,而不是createDirectBuffer()
- Buffer的容量大於chunkSize
滿足回收條件后,判斷執行線程如果是本地緩存線程(isLocalCacheThread()返回true),若localBufferPool還有空余容量則將其放入,反之將localBufferPool中3/4的Buffer轉移到items中並放入該Buffer;如果不是本地緩存線程直接放入items中
緩沖區的分配與回收機制如上所述,但單獨設置所謂的本地緩存線程緩沖區的意義以及回收時出現的3/4轉移的設置本人暫不清楚。
緩存機制
MyCat的緩存機制用於路由信息計算時為某些特定場景節省二次計算的開銷,直接從相應的緩存中獲取結果。
配置文件為cacheservice.properties,里面可以配置各類緩存統一的類型、大小、過期時間等,也可為每張表獨立設置參數,其中提供3類緩存類型:ehcache、leveldb和mapdb。
緩存池為CachePool,它是一個接口,具體每個CachePool實現類由對應CachePoolFactory創建:
public interface CachePool { public void putIfAbsent(Object key, Object value); public Object get(Object key); public void clearCache(); public CacheStatic getCacheStatic(); public long getMaxSize(); }
CacheService作為緩存服務類存在,其init()方法負責讀取緩存配置文件並創建相應的CachePoolFactory和CachePool:
private void init() throws Exception { Properties props = new Properties(); props.load(CacheService.class .getResourceAsStream("/cacheservice.properties")); final String poolFactoryPref = "factory."; final String poolKeyPref = "pool."; final String layedPoolKeyPref = "layedpool."; String[] keys = props.keySet().toArray(new String[0]); Arrays.sort(keys); for (String key : keys) { if (key.startsWith(poolFactoryPref)) { createPoolFactory(key.substring(poolFactoryPref.length()), (String) props.get(key)); } else if (key.startsWith(poolKeyPref)) { String cacheName = key.substring(poolKeyPref.length()); String value = (String) props.get(key); String[] valueItems = value.split(","); if (valueItems.length < 3) { throw new java.lang.IllegalArgumentException( "invalid cache config ,key:" + key + " value:" + value); } String type = valueItems[0]; int size = Integer.valueOf(valueItems[1]); int timeOut = Integer.valueOf(valueItems[2]); createPool(cacheName, type, size, timeOut); } else if (key.startsWith(layedPoolKeyPref)) { String cacheName = key.substring(layedPoolKeyPref.length()); String value = (String) props.get(key); String[] valueItems = value.split(","); int index = cacheName.indexOf("."); if (index < 0) {// root layer String type = valueItems[0]; int size = Integer.valueOf(valueItems[1]); int timeOut = Integer.valueOf(valueItems[2]); createLayeredPool(cacheName, type, size, timeOut); } else { // root layers' children String parent = cacheName.substring(0, index); String child = cacheName.substring(index + 1); CachePool pool = this.allPools.get(parent); if (pool == null || !(pool instanceof LayerCachePool)) { throw new java.lang.IllegalArgumentException( "parent pool not exists or not layered cache pool:" + parent + " the child cache is:" + child); } int size = Integer.valueOf(valueItems[0]); int timeOut = Integer.valueOf(valueItems[1]); ((DefaultLayedCachePool) pool).createChildCache(child, size, timeOut); } } } }
MyCat設置了3種緩存,分別是SQLRouteCache、TableId2DataNodeCache和ER_SQL2PARENTID:
- SQLRouteCache:
- 根據SQL語句查找路由信息的緩存,CachePool類型,key為虛擬庫名+SQL語句,value為路由信息RouteResultSet
- 該緩存只針對select語句,如果執行了之前已經執行過的某個SQL語句(緩存命中),那路由信息就不需要重復計算,直接從緩存中獲取,RouteService的route()方法中有關於此緩存的相關代碼片段:
/** * SELECT 類型的SQL, 檢測 */ if (sqlType == ServerParse.SELECT) { cacheKey = schema.getName() + stmt; rrs = (RouteResultset) sqlRouteCache.get(cacheKey); if (rrs != null) { return rrs; } } if (rrs!=null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) { sqlRouteCache.putIfAbsent(cacheKey, rrs); }
- TableId2DataNodeCache:
- 表主鍵到datanode的緩存,LayerCachePool類型,為雙層CachePool,第一層:key為虛擬庫名+表名,value為CachePool;第二層:key為主鍵值,value為datanode名
- 設置該緩存的目的在於當分片字段與主鍵字段不同時,直接通過主鍵值查詢是無法定位具體分片的(只能全分片下發),所以設置之后就可以利用主鍵值查找到分片名
- 該緩存的放入過程在MultiNodeQueryHandler的rowResponse()中,代碼片段如下:
-
// cache primaryKey-> dataNode if (primaryKeyIndex != -1) { RowDataPacket rowDataPkg = new RowDataPacket(fieldCount); rowDataPkg.read(row); String primaryKey = new String(rowDataPkg.fieldValues.get(primaryKeyIndex)); LayerCachePool pool = MycatServer.getInstance() .getRouterservice().getTableId2DataNodeCache(); pool.putIfAbsent(priamaryKeyTable, primaryKey, dataNode); }
- ER_SQL2PARENTID:ER關系專用,子表插入數據時根據父子關聯字段確定子表分片,下次可以直接從緩存中獲取所在分片,key為虛擬庫名+SQL語句,value是datanode名
緩存查看:通過9066管理端口連接MyCat,執行命令mysql> show @@cache;可以觀察目前系統中設置的各類緩存,以及數量、訪問次數和命中情況等
為尊重原創成果,如需轉載煩請注明本文出處:
http://www.cnblogs.com/fernandolee24/p/5198192.html,特此感謝
