熱數據緩存
這是使用緩存最頻繁最直接的方式,即我們把需要頻繁訪問DB的數據加載到內存里面,以提高響應速度。通常我們的做法是使用一個ConcuccrentHashMap<Request, AtomicInteger>來記錄一天當中每個請求的次數,每天凌晨取出昨天訪問最頻繁的K個請求(K取多少個取決你的可用內存有多少),從DB中讀取這些請求的返回結果放到一個ConcuccrentHashMap<Request, Response>容器中,然后把所有請求計數清0,重新開始計數。
LRU緩存
熱數據緩存適用於那些熱數據比較明顯且穩定的業務場景,而對於那些熱數據不穩定的應用場景我們需要發明一種動態的熱數據識別方式。我們都知道常用的內存換頁算法有2種:LFU和LRU。
LFU(Least Frequently Used)是把那些最近最不經常使用的頁面置換出去,這跟上面講的熱數據緩存是一個道理,缺點有2個:
- 需要維護一個計數器,記住每個頁面的使用次數。
- 上一個時間段頻繁使用的,在下一個時間段不一定還頻繁。
LRU(Least Recently Used)策略是把最近最長時間未使用的頁面置換出去。實現起來很簡單,只需要一個鏈表結構,每次訪問一個元素時把它移到鏈表的尾部,當鏈表已滿需要刪除元素時就刪除頭部的元素,因為頭部的元素就是最近最長時間未使用的元素。

1 import java.util.ArrayList; 2 import java.util.Collection; 3 import java.util.LinkedHashMap; 4 import java.util.Map; 5 import java.util.concurrent.locks.ReadWriteLock; 6 import java.util.concurrent.locks.ReentrantReadWriteLock; 7 8 /** 9 * 利用LinkedHashMap實現一個定長容量的,先進先出的隊列。當指定按訪問順序排序時,就實際上是一個最近最少使用LRU隊列<br> 10 * <br> 11 * 根據鏈表中元素的順序可以分為:按插入順序的鏈表,和按訪問順序(調用get方法)的鏈表。<br> 12 * 默認是按插入順序排序,如果指定按訪問順序排序,那么調用get方法后,會將這次訪問的元素移至鏈表尾部。<br> 13 * 不斷訪問可以形成按訪問順序排序的鏈表。<br> 14 * 可以重寫removeEldestEntry方法返回true值指定插入元素時移除最老的元素。<br> 15 * 16 * @Author:zhangchaoyang 17 * @Since:2014-9-5 18 * @Version:1.0 19 */ 20 public class LRUCache<K, V> extends LinkedHashMap<K, V> { 21 22 private static final long serialVersionUID = -2045058079564141163L; 23 24 private final int maxCapacity; 25 26 // 本類中設置裝載因子實際沒有意義,因為容量超過maxCapacity時就會把元素移除掉 27 private static final float DEFAULT_LOAD_FACTOR = 1f; 28 29 private final ReadWriteLock lock = new ReentrantReadWriteLock(); 30 31 public LRUCache(int maxCapacity) { 32 super(maxCapacity, DEFAULT_LOAD_FACTOR, true);// 第3個參數false表示維持插入順序,這樣最早插入的將最先被移除。true表示維持訪問順序,調用get方法后,會將這次訪問的元素移至鏈表尾部,刪除老元素時會刪除表頭元素。 33 this.maxCapacity = maxCapacity; 34 } 35 36 @Override 37 protected boolean removeEldestEntry(java.util.Map.Entry<K, V> eldest) { 38 return size() > maxCapacity;// 到達maxCapacity時就移除老元素,這樣實現定長的LinkedHashMap 39 } 40 41 @Override 42 public boolean containsKey(Object key) { 43 try { 44 lock.readLock().lock(); 45 return super.containsKey(key); 46 } finally { 47 lock.readLock().unlock(); 48 } 49 } 50 51 @Override 52 public V get(Object key) { 53 try { 54 lock.readLock().lock(); 55 return super.get(key); 56 } finally { 57 lock.readLock().unlock(); 58 } 59 } 60 61 @Override 62 public V put(K key, V value) { 63 try { 64 lock.writeLock().lock(); 65 return super.put(key, value); 66 } finally { 67 lock.writeLock().unlock(); 68 } 69 } 70 71 public int size() { 72 try { 73 lock.readLock().lock(); 74 return super.size(); 75 } finally { 76 lock.readLock().unlock(); 77 } 78 } 79 80 public void clear() { 81 try { 82 lock.writeLock().lock(); 83 super.clear(); 84 } finally { 85 lock.writeLock().unlock(); 86 } 87 } 88 89 public Collection<Map.Entry<K, V>> getAll() { 90 try { 91 lock.readLock().lock(); 92 return new ArrayList<Map.Entry<K, V>>(super.entrySet()); 93 } finally { 94 lock.readLock().unlock(); 95 } 96 } 97 }
TimeOut緩存
Timeout緩存常用於那些跟用戶關聯的請求數據,比如用戶在翻頁查看一個列表數據時,他第一次看N頁的數據時,服務器是從DB中讀取的相應數據,當他看第N+1頁的數據時應該把第N頁的數據放入緩存,因為用戶可能呆會兒還會回過頭來看第N頁的數據,這時候服務器就可以直接從緩存中獲取數據。如果用戶在5分鍾內還沒有回過頭來看第N頁的數據,那么我們認為他再看第N頁的概率就非常低了,此時可以把第N頁的數據從緩存中移除,實際上相當於我們為緩存設置了一個超時時間。
我想了一種Timeout緩存的實現方法。還是用ConcurrentHashMap來存放key-value,另建一棵小頂堆,每個節點上存放key以及key的到期時間,建堆時依據到期時間來建。開一個后台線程不停地掃描堆頂元素,拿當前的時間戳去跟堆頂的到期時間比較,如果當前時間晚於堆頂的到期時間則刪除堆頂,把堆頂里存放的key從ConcurrentHashMap中刪除。刪除堆頂的時間復雜度為$O(log_2{N})$,具體步驟如下:
-
用末元素替換堆頂元素root
-
臨時保存root節點。從上往下遍歷樹,用子節點中較小那個替換父節點。最后把root放到葉節點上
下面的代碼是直接基於java中的java.util.concurrent.Delayed實現的,Delayed是不是基於上面的小頂堆的思想我也沒去深入研究。
TimeoutCache.java

1 import java.io.IOException; 2 import java.util.concurrent.ConcurrentHashMap; 3 import java.util.concurrent.ConcurrentMap; 4 import java.util.concurrent.DelayQueue; 5 import java.util.concurrent.TimeUnit; 6 7 import org.apache.commons.logging.Log; 8 import org.apache.commons.logging.LogFactory; 9 10 /** 11 * 可以為每個元素設置存活時間的緩存容器 12 * 13 * @Author:orisun 14 * @Since:2015-10-9 15 * @Version:1.0 16 */ 17 public class TimeoutCache<K, V> { 18 19 private static final Log logger = LogFactory.getLog(TimeoutCache.class); 20 private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>(); 21 private DelayQueue<DelayItem<Pair<K, V>>> queue = new DelayQueue<DelayItem<Pair<K, V>>>(); 22 private Thread daemonThread; 23 24 public TimeoutCache() { 25 Runnable daemonTask = new Runnable() { 26 public void run() { 27 daemonCheck(); 28 } 29 }; 30 daemonThread = new Thread(daemonTask); 31 daemonThread.setDaemon(true); 32 daemonThread.setName("TimeoutCache Daemon Check"); 33 daemonThread.start(); // 啟動后台線程,對容器中的元素不停地進行輪循,將過期的元素移除出出去 34 } 35 36 private void daemonCheck() { 37 logger.info("check timeout element of cache started"); 38 for (;;) { 39 try { 40 DelayItem<Pair<K, V>> delayItem = queue.take();// 如果所有元素都沒有超時,該行代碼會阻塞 41 if (delayItem != null) { 42 Pair<K, V> pair = delayItem.getItem(); 43 cacheObjMap.remove(pair.first, pair.second); // 超時對象,從容器中移除 44 } 45 } catch (InterruptedException e) { 46 logger.error("take timeout element from cache failed", e); 47 break; // 檢測到中斷時就退出循環 48 } 49 } 50 logger.info("check timeout element of cache stopped."); 51 } 52 53 /** 54 * 以覆蓋的方式向緩存中添加對象,緩存以<key,value>的形式存在.<br> 55 * 注意:value如果是List,則它不是由通過List.subList()得來的 56 * 。因為List.subList()返回的是一個RandomAccessSubList實例 57 * ,在反序列化時ObjectOutputStream.writeObject(RandomAccessSubList)會出錯 58 * 59 * @param key 60 * @param value 61 * @param time 62 * 對象在緩存中的生存時間 63 * @param unit 64 * 時間單位 65 */ 66 public void put(K key, V value, long time, TimeUnit unit) { 67 V oldValue = cacheObjMap.put(key, value); 68 if (oldValue != null) 69 queue.remove(key); 70 71 long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit); 72 queue.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), 73 nanoTime)); 74 } 75 76 /** 77 * 根據key從緩存中取得對應的value,如果key不存在則返回null<br> 78 * 取出的是value的深拷貝 79 * 80 * @param key 81 * @return 82 */ 83 @SuppressWarnings("unchecked") 84 public V get(K key) { 85 try { 86 return (V) JavaSerializer.deepCopy(cacheObjMap.get(key)); 87 } catch (ClassNotFoundException | IOException e) { 88 e.printStackTrace(); 89 return null; 90 } 91 } 92 93 }
DelayItem.java

1 import java.util.concurrent.Delayed; 2 import java.util.concurrent.TimeUnit; 3 import java.util.concurrent.atomic.AtomicLong; 4 5 /** 6 * 7 * @Author:orisun 8 * @Since:2015-10-9 9 * @Version:1.0 10 */ 11 public class DelayItem<T> implements Delayed { 12 13 private static final long ORIGIN = System.nanoTime();// 記錄進入隊列的時刻 14 private static final AtomicLong sequencer = new AtomicLong(0); 15 private final long sequenceNumber; 16 private final long time; 17 private final T item; 18 19 final static long now() { 20 return System.nanoTime() - ORIGIN; 21 } 22 23 /** 24 * 25 * @param submit 26 * 隊列中的元素類型 27 * @param timeout 28 * 元素在隊列中存活的時間,單位:毫秒 29 */ 30 public DelayItem(T submit, long timeout) { 31 this.time = now() + timeout;// 出隊時刻 32 this.item = submit;// 入隊元素 33 this.sequenceNumber = sequencer.getAndIncrement();// 在隊列中的編號 34 } 35 36 public T getItem() { 37 return this.item; 38 } 39 40 @Override 41 public long getDelay(TimeUnit unit) { 42 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); 43 return d; 44 } 45 46 @Override 47 public int compareTo(Delayed other) { 48 if (other == this) 49 return 0; 50 if (other instanceof DelayItem) { 51 DelayItem<?> x = (DelayItem<?>) other; 52 long diff = time - x.time; 53 if (diff < 0) 54 return -1; 55 else if (diff > 0) 56 return 1; 57 else if (sequenceNumber < x.sequenceNumber) // 如果是同時進入隊列的,則先進者先出 58 return -1; 59 else 60 return 1; 61 } 62 long d = (getDelay(TimeUnit.NANOSECONDS) - other 63 .getDelay(TimeUnit.NANOSECONDS)); 64 return (d == 0) ? 0 : ((d < 0) ? -1 : 1); 65 } 66 }
JavaSerializer.java

1 import java.io.ByteArrayInputStream; 2 import java.io.ByteArrayOutputStream; 3 import java.io.IOException; 4 import java.io.ObjectInputStream; 5 import java.io.ObjectOutputStream; 6 7 public class JavaSerializer { 8 9 public static Object deepCopy(Object obj) throws IOException, 10 ClassNotFoundException { 11 // 將該對象序列化成流,因為寫在流里的是對象的一個拷貝,而原對象仍然存在於JVM里面。所以利用這個特性可以實現對象的深拷貝 12 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 13 ObjectOutputStream oos = new ObjectOutputStream(bos); 14 oos.writeObject(obj);// 要寫入ObjectOutputStream的話必須實現Serializable接口 15 // 將流序列化成對象 16 ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); 17 ObjectInputStream ois = new ObjectInputStream(bis); 18 return ois.readObject(); 19 } 20 }
Redis省內存的技巧
redis自帶持久化功能,當它決定要把哪些數據換出內存寫入磁盤時,使用的也是LRU算法。同時redis也有timeout機制,但它不像上面的TimeoutCache.java類一樣開個無限循環的線程去掃描到期的元素,而是每次get元素時判斷一個該元素有沒有到期,所以redis中一個元素的存活時間遠遠超出了設置的時間是很正常的。
本節想講的重點其實是redis省內存的技巧,這也是實踐中經常遇到的問題,因為內存總是很昂貴的,運維大哥總是很節約的。在我們的推薦系數中使用Redis來存儲信息的索引,沒有使用Lucene是因為Lucene不支持分布式,但是省內存的技巧都是從Lucene那兒學來的。
首先,如果你想為redis節省內存那你就不能再用<String,String>類型的key-value結構,必須全部將它們序列化成二進制的形式。我寫了一個工具類,實現各種數據類型和byte[]的互相置換。
DataTransform.java

1 import java.nio.ByteBuffer; 2 import java.util.ArrayList; 3 import java.util.List; 4 5 /** 6 * 各種數據類型的相互轉換<br> 7 * <ul> 8 * <li>{@code <<} 左移,符號位不動 9 * <li>{@code >>} 右移,符號位不動 10 * <li>{@code >>>} 循環右移,符號位要跟着移,高位用0填充 11 * </ul> 12 * 位移運算只對32位和64位值有意義。位移運算返回一個新值,但是不改變原值。 13 * 14 * @Author:zhangchaoyang 15 * @Since:2014-7-9 16 * @Version: 17 */ 18 public class DataTransform { 19 20 private static final char[] Digit = { '0', '1', '2', '3', '4', '5', '6', 21 '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; 22 23 /** 24 * byte數組轉換成int 25 * 26 * @param bRefArr 27 * byte數組 28 * @param LowEndian 29 * byte數組是否按小端字節序存儲 30 * @return int值 31 * @throws ArgumentException 32 * byte數組長度超過4時拋出該異常 33 */ 34 public static int bytesToInt(byte[] bRefArr, boolean LowEndian) 35 throws ArgumentException { 36 int len = bRefArr.length; 37 if (len > 4) { 38 throw new ArgumentException("字節數組長度不能超過4"); 39 } 40 41 int iOutcome = 0; 42 byte bLoop; 43 for (int i = 0; i < len; i++) { 44 bLoop = bRefArr[i]; 45 int shift; 46 if (LowEndian) { 47 shift = i; 48 } else { 49 shift = len - 1 - i; 50 } 51 iOutcome += (bLoop & 0xFF) << (8 * shift);// 之所以要跟0xFF進行與運行是為了把bLoop轉換成int,去除符號位的影響 52 } 53 return iOutcome; 54 } 55 56 /** 57 * byte數組轉換成long 58 * 59 * @param bRefArr 60 * byte數組 61 * @param LowEndian 62 * byte數組是否按小端字節序存儲 63 * @return long值 64 * @throws ArgumentException 65 * byte數組長度超過8時拋出該異常 66 */ 67 public static long bytesToLong(byte[] bRefArr, boolean LowEndian) 68 throws ArgumentException { 69 int len = bRefArr.length; 70 if (len > 8) { 71 throw new ArgumentException("字節數組長度不能超過8"); 72 } 73 74 long iOutcome = 0; 75 byte bLoop; 76 for (int i = 0; i < len; i++) { 77 bLoop = bRefArr[i]; 78 int shift; 79 if (LowEndian) { 80 shift = i; 81 } else { 82 shift = len - 1 - i; 83 } 84 iOutcome += (bLoop & 0xFFL) << (8 * shift);// 之所以要跟0xFFL進行與運行是為了把bLoop轉換成long,去除符號位的影響 85 } 86 return iOutcome; 87 } 88 89 /** 90 * byte數組轉換成double 91 * 92 * @param bRefArr 93 * byte數組 94 * @param LowEndian 95 * byte數組是否按小端字節序存儲 96 * @return double值 97 * @throws ArgumentException 98 * byte數組長度超過8時拋出該異常 99 */ 100 public static double bytesToDouble(byte[] bRefArr, boolean LowEndian) 101 throws ArgumentException { 102 long l = bytesToLong(bRefArr, LowEndian); 103 return Double.longBitsToDouble(l); 104 } 105 106 /** 107 * int轉換為byte數組,采用大端字節序會更快一些 108 * 109 * @param number 110 * int數 111 * @param LowEndian 112 * byte數組是否按小端字節序存儲 113 * @return byte數組 114 */ 115 public static byte[] intToBytes(int number, boolean LowEndian) { 116 int len = 4; 117 byte[] rect = new byte[len]; 118 for (int i = 0; i < len; i++) { 119 rect[i] = (byte) (number >>> (len - 1 - i) * 8); 120 } 121 if (LowEndian) { 122 for (int i = 0; i < len / 2; i++) { 123 byte swap = rect[i]; 124 rect[i] = rect[len - i - 1]; 125 rect[len - i - 1] = swap; 126 } 127 } 128 return rect; 129 } 130 131 /** 132 * 仿照Lucene的可變長度整型:最高位表示是否還有字節要讀取,低七位就是就是具體的有效位,添加到結果數據中.<br> 133 * 比如00000001 最高位表示0,那么說明這個數就是一個字節表示,有效位是后面的七位0000001,值為1。10000010 00000001 134 * 第一個字節最高位為1 135 * ,表示后面還有字節,第二位最高位0表示到此為止了,即就是兩個字節,那么具體的值注意,是從最后一個字節的七位有效數放在最前面,依次放置 136 * ,最后是第一個自己的七位有效位,所以這個數表示 0000001 0000010,換算成整數就是130。<br> 137 * 用VInt來表示Integer.MAX_VALUE時需要5個字節. 138 * 139 * @param num 140 * @return 141 */ 142 public static byte[] vintToByte(int num) { 143 ByteBuffer buffer = ByteBuffer.allocate(32); 144 while ((num & ~0x7F) != 0) { 145 buffer.put((byte) ((num & 0x7F) | 0x80)); 146 num >>>= 7;// 等價於num=num>>>7; 147 } 148 buffer.put((byte) num); 149 byte[] rect = new byte[buffer.position()]; 150 buffer.flip(); 151 buffer.get(rect); 152 return rect; 153 } 154 155 public static byte[] vintArrToByteArr(int[] arr) { 156 ByteBuffer buffer = ByteBuffer.allocate(32 * arr.length); 157 for (int ele : arr) { 158 byte[] brr = vintToByte(ele); 159 buffer.put(brr); 160 } 161 byte[] rect = new byte[buffer.position()]; 162 buffer.flip(); 163 buffer.get(rect); 164 return rect; 165 } 166 167 /** 168 * 仿照Lucene的可變長度整型 169 * 170 * @see #vintToByte 171 * @param bytes 172 * @return 173 */ 174 public static int byteToVInt(byte[] bytes) { 175 int i = 0; 176 byte b = bytes[i++]; 177 int num = b & 0x7F; 178 for (int shift = 7; (b & 0x80) != 0; shift += 7) { 179 b = bytes[i++]; 180 num |= (b & 0x7F) << shift; 181 } 182 return num; 183 } 184 185 public static int[] byteArrToVIntArr(byte[] bytes) { 186 List<Integer> list = new ArrayList<Integer>(); 187 int i = 0; 188 while (i < bytes.length) { 189 byte b = bytes[i++]; 190 int num = b & 0x7F; 191 for (int shift = 7; (b & 0x80) != 0; shift += 7) { 192 b = bytes[i++]; 193 num |= (b & 0x7F) << shift; 194 } 195 list.add(num); 196 } 197 int[] rect = new int[list.size()]; 198 for (int j = 0; j < rect.length; j++) { 199 rect[j] = list.get(j); 200 } 201 return rect; 202 } 203 204 /** 205 * 仿照Lucene的可變長度整型 206 * 207 * @see #vintToByte 208 * @param num 209 * @return 210 */ 211 public static byte[] vlongToByte(long num) { 212 ByteBuffer buffer = ByteBuffer.allocate(64); 213 while ((num & ~0x7F) != 0) { 214 buffer.put((byte) ((num & 0x7F) | 0x80)); 215 num >>>= 7; 216 } 217 buffer.put((byte) num); 218 byte[] rect = new byte[buffer.position()]; 219 buffer.flip(); 220 buffer.get(rect); 221 return rect; 222 } 223 224 /** 225 * 仿照Lucene的可變長度整型 226 * 227 * @see #vintToByte 228 * @param bytes 229 * @return 230 */ 231 public static long byteToVLong(byte[] bytes) { 232 int i = 0; 233 byte b = bytes[i++]; 234 long num = b & 0x7FL; 235 for (int shift = 7; (b & 0x80) != 0; shift += 7) { 236 b = bytes[i++]; 237 num |= (b & 0x7FL) << shift; 238 } 239 return num; 240 } 241 242 /** 243 * long轉換為byte數組 244 * 245 * @param number 246 * long數 247 * @param LowEndian 248 * byte數組是否按小端字節序存儲 249 * @return byte數組,長度為8 250 */ 251 public static byte[] longToBytes(long number, boolean LowEndian) { 252 int len = 8; 253 byte[] rect = new byte[len]; 254 for (int i = 0; i < len; i++) { 255 rect[i] = (byte) (number >>> (len - 1 - i) * 8); 256 } 257 if (LowEndian) { 258 for (int i = 0; i < len / 2; i++) { 259 byte swap = rect[i]; 260 rect[i] = rect[len - i - 1]; 261 rect[len - i - 1] = swap; 262 } 263 } 264 return rect; 265 } 266 267 /** 268 * double轉換為byte數組 269 * 270 * @param number 271 * double數值 272 * @param LowEndian 273 * byte數組是否按小端字節序存儲 274 * @return byte數組,長度為8 275 */ 276 public static byte[] doubleToBytes(double number, boolean LowEndian) { 277 long l = Double.doubleToLongBits(number); 278 return longToBytes(l, LowEndian); 279 } 280 281 /** 282 * IP轉換成int值,int在全域上和IP是一一對應的 283 * 284 * @param ip 285 * @return 286 * @throws ArgumentException 287 * IP范圍超界時拋出該異常 288 */ 289 public static int ip2int(String ip) throws ArgumentException { 290 String[] arr = ip.trim().split("\\."); 291 int part1 = Integer.parseInt(arr[0]); 292 int part2 = Integer.parseInt(arr[1]); 293 int part3 = Integer.parseInt(arr[2]); 294 int part4 = Integer.parseInt(arr[3]); 295 if (part1 >= 0 && part1 < 256 && part2 >= 0 && part2 < 256 296 && part3 >= 0 && part3 < 256 && part4 >= 0 && part4 < 256) { 297 // 左移,正數左移之后有可能把最高位變為1,從而成為負數 298 int rect = part1 << 24; 299 rect += part2 << 16; 300 rect += part3 << 8; 301 rect += part4; 302 return rect; 303 } else { 304 throw new ArgumentException("IP范圍超界"); 305 } 306 } 307 308 /** 309 * int值轉換成IP,int在全域上和IP是一一對應的 310 * 311 * @param number 312 * @return 313 */ 314 public static String int2ip(int number) { 315 StringBuilder sb = new StringBuilder(); 316 int part1 = number >>> 24;// 右移,如果是負數最高位的1會向右移,且最高位變為0 317 int part2 = (0x00ff0000 & number) >>> 16;// 位移的優先級高於與運算的優先級 318 int part3 = (0x0000ff00 & number) >>> 8; 319 int part4 = 0x000000ff & number; 320 sb.append(String.valueOf(part1)); 321 sb.append("."); 322 sb.append(String.valueOf(part2)); 323 sb.append("."); 324 sb.append(String.valueOf(part3)); 325 sb.append("."); 326 sb.append(String.valueOf(part4)); 327 return sb.toString(); 328 } 329 330 /** 331 * 一個將字節轉化為十六進制ASSIC碼的函數 332 * 333 * @param ib 334 * @return 335 */ 336 public static String byteHEX(byte ib) { 337 char[] ob = new char[2]; 338 ob[0] = Digit[(ib >>> 4) & 0X0F]; 339 ob[1] = Digit[ib & 0X0F]; 340 String s = new String(ob); 341 return s; 342 } 343 344 public static String byteHEX(byte[] bytes) { 345 StringBuilder sb = new StringBuilder(); 346 for (byte ib : bytes) { 347 char[] ob = new char[2]; 348 ob[0] = Digit[(ib >>> 4) & 0X0F]; 349 ob[1] = Digit[ib & 0X0F]; 350 String s = new String(ob); 351 sb.append(s); 352 } 353 return sb.toString(); 354 } 355 356 /** 357 * 把一個byte表示成二進制的字符串字面值 358 * 359 * @param ib 360 * @return 361 */ 362 public static String byteLiteral(byte ib) { 363 StringBuilder sb = new StringBuilder(); 364 for (int i = 7; i >= 0; i--) { 365 int v = (ib >>> i) & 0x01; 366 if (v == 0) { 367 sb.append("0"); 368 } else { 369 sb.append("1"); 370 } 371 } 372 return sb.toString(); 373 } 374 375 public static String byteLiteral(byte[] ib) { 376 StringBuilder sb = new StringBuilder(); 377 for (int i = 0; i < ib.length; i++) { 378 sb.append(byteLiteral(ib[i])); 379 } 380 return sb.toString(); 381 } 382 }
請留意一下上述代碼中出現了VInt和VLong兩種類型,具體看注釋。
倒排索引常見的形式為:term --> [infoid1,infoid2,infoid3...],針對這種形式的索引我們看下如何節省內存。首先value要采用redis中的list結構,而且是list<byte[]>而非list<String>(想省內存就要杜絕使用String,上面已經說過了)。假如infoid是個int,置換成byte[]就要占4個字節,而絕大部分情況下infoid都1000萬以內的數字,因此使用VInt只需要3個字節。內存還可以進一步壓縮。鏈表的第1個infoid我們存儲它的VInt形式,后面的infoid與infoid1相減,差值也是個1000萬以內的數字而且有可能非常小,我們采用VInt存儲這個差值最多需要3個字節,有可能只需要1個字節。訪問鏈表中的任意一個元素時都需要先把首元素取出來。
另一種常見的索引形式為:infoid --> infoDetail,infoDetail中包含很多字段,譬如city、valid、name等,通常情況下人們會使用Redis的hash結構來存儲實體,而我們現在要做的就是把infoDetail這個實體序列化成盡可能短的字節流。首先city代表城市,本來是個String類型,而city這個東西是可以窮舉的,我們事先對所有city進行編號,在redis中只存儲city編號即可。valid表示信息是否過期是個bool類型,在java中存儲一個bool也需要1個字節,這顯然很浪費,本來一個bit就夠了嘛,同時city又用不滿一個int,所以可以讓valid跟city擠一擠,把city左移一位,把valid塞到city的末位上去。

1 import java.nio.ByteBuffer; 2 3 /** 4 * 5 *@Author:orisun 6 *@Since:2016-5-14 7 *@Version:1.0 8 */ 9 public class Info { 10 11 private int city; 12 private boolean valid; 13 private String name; 14 15 public byte[] serialize() { 16 ByteBuffer buffer = ByteBuffer.allocate(10); 17 int cv = (city << 1) + (valid ? 1 : 0); 18 byte[] cv_b = DataTransform.intToBytes(cv, false); 19 buffer.put(cv_b); 20 buffer.put(name.getBytes()); 21 byte[] rect = new byte[buffer.position()]; 22 buffer.flip(); 23 buffer.get(rect); 24 return rect; 25 } 26 27 public static Info deserialize(byte[] value) { 28 if (value == null || value.length <= 4) { 29 return null; 30 } 31 Info inst = new Info(); 32 try { 33 int cv = DataTransform.bytesToInt(new byte[] { (byte) value[0], 34 value[1], value[2], value[3] }, false); 35 inst.setValid(cv % 2 != 0); 36 inst.setCity(cv >> 1); 37 inst.setName(new String(value, 4, value.length - 4)); 38 } catch (ArgumentException e) { 39 e.printStackTrace(); 40 } 41 return inst; 42 } 43 44 public int getCity() { 45 return city; 46 } 47 48 public void setCity(int city) { 49 this.city = city; 50 } 51 52 public boolean isValid() { 53 return valid; 54 } 55 56 public void setValid(boolean valid) { 57 this.valid = valid; 58 } 59 60 public String getName() { 61 return name; 62 } 63 64 public void setName(String name) { 65 this.name = name; 66 } 67 68 public static void main(String[] args) { 69 Info inst1 = new Info(); 70 inst1.setCity(100); 71 inst1.setValid(true); 72 inst1.setName("pc"); 73 Info inst2 = Info.deserialize(inst1.serialize()); 74 assert inst1.getCity() == inst2.getCity(); 75 assert inst1.getName().equals(inst2.getName()); 76 assert inst1.isValid() ^ inst2.isValid(); 77 } 78 }