//此文基於《Java並發編程實踐》
我們都知道在應用程序中合理地使用緩存,能更快的訪問我們之前的計算結果,從而提高吞吐量。例如Redis和Memcached基於內存的數據存儲系統等。此篇文章介紹如何實現簡單緩存。
首先定義一個Computable接口A是輸入,V是輸出。
1 package simplecache; 2 3 /** 4 * Created by yulinfeng on 12/25/16. 5 */ 6 public interface Computable<A, V> { 7 V compute(A arg) throws InterruptedException; 8 }
實現這個接口,也即是在ExpensiveFunction做具體的計算過程。
1 package simplecache; 2 3 /** 4 * Created by yulinfeng on 12/25/16. 5 */ 6 public class ExpensiveFunction implements Computable<String, Integer> { 7 @Override 8 public Integer compute(String arg) throws InterruptedException { 9 //計算 10 return new Integer(arg); 11 } 12 }
接着將創建一個Computable包裝器,幫助記住之前的計算結果,並將緩存過程封裝起來(Memoization)。
1.利用簡單HashMap實現緩存
1 package simplecache; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 /** 7 * Created by yulinfeng on 12/25/16. 8 */ 9 public class Memoizer1<A, V> implements Computable<A, V> { 10 private final Map<A, V> cache = new HashMap<A, V>(); 11 private final Computable<A, V> c; 12 13 public Memoizer1(Computable<A, V> c){ 14 this.c = c; 15 } 16 17 @Override 18 public synchronized V compute(A arg) throws InterruptedException { 19 V result = cache.get(arg); 20 if (null == result){ 21 result = c.compute(arg); 22 cache.put(arg, result); 23 } 24 return result; 25 } 26 }
我們首先利用最簡單的HashMap實現緩存,由於HashMap並不是線程安全的,所以在compute方法使用synchronized關鍵字,同步以實現線程安全。可見使用synchronized同步方法如此大粒度的同步必然會帶來並發性的降低,因為每次只有一個線程執行compute方法,其余線程只能排隊等待。
2.利用並發容器ConcurrentHashMap
第1種方法能實現緩存,且能實現線程安全的緩存,不過帶來的問題就是並發性降低。我們使用並發包中的ConcurrentHashMap並發容器。
1 package simplecache; 2 3 import java.util.Map; 4 import java.util.concurrent.ConcurrentHashMap; 5 6 /** 7 * Created by yulinfeng on 12/25/16. 8 */ 9 public class Memoizer2<A, V> implements Computable<A, V> { 10 private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); 11 private final Computable<A, V> c; 12 13 public Memoizer2(Computable<A, V> c){ 14 this.c = c; 15 } 16 17 @Override 18 public V compute(A arg) throws InterruptedException { 19 V result = cache.get(arg); 20 if (null == result){ 21 result = c.compute(arg); 22 cache.put(arg, rsult); 23 } 24 return result; 25 } 26 }
毫無疑問,利用ConcurrentHashMap會比簡單HashMap帶來更好的並發性,同時它也是線程安全的。不過在有一種條件下,這種方式會帶來一個新的問題,當這個計算過程比較復雜,計算時間比較長時,線程T1正在計算沒有結束,此時線程T2並不知道此時T1已經在計算了,所以它同樣會再次進行計算,這種條件下相當於一個值被計算了2次。我們應該想要達到的效果應該是T1正在計算,而此時T2能發現T1正在計算相同值,此時應該阻塞等待T1計算完畢返回計算結果,而不是T2也去做一次計算。FutureTask表示一個計算過程,這個計算過程可能已經計算完成,也有可能正在計算。如果有結果可用,那么FutureTask.get將立即返回結果,否則會一直阻塞直到計算結束返回結果。這正好我們想要達到的效果。
3.緩存的最佳實踐——ConcurrentHashMap+FutureTask
1 package simplecache; 2 3 import java.util.Map; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.Future; 6 7 /** 8 * Created by yulinfeng on 12/25/16. 9 */ 10 public class Memoizer3<A, V> implements Computable<A, V> { 11 private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); 12 private final Computable<A, V> c; 13 14 public Memoizer3(Computable<A, V> c) { 15 this.c = c; 16 } 17 18 @Override 19 public V compute(final A arg) throws InterruptedException { 20 Future<V> f = cache.get(arg); 21 if (null == f){ 22 Callable<V> eval = new Callable<V>() { 23 @Override 24 public V call() throws InterruptedException { 25 return c.compute(arg); 26 } 27 }; 28 FutureTask<V> ft = new FutureTask<V>(eval); 29 cache.put(arg, ft); 30 ft.run(); //調用執行c.compute 31 } 32 try { 33 return f.get(); 34 } catch (ExecutionException e) { 35 e.printStackTrace(); 36 } 37 } 38 }
不了解FutureTask可以去補補了,但記住上面所說“FutureTask表示一個計算過程,這個計算過程可能已經計算完成,也有可能正在計算。如果有結果可用,那么FutureTask.get將立即返回結果,否則會一直阻塞直到計算結束返回結果。”,但這並不算是最完美的實現,在compute方法中出現了if的復合操作,也就是說在期間還是很有可能出現如同ConcurrentHashMap一樣的重復計算,只是概率降低了而已。幸好,ConcurrentHashMap為我們提供了putIfAbsent的原子方法,從而完美的避免了這個問題。
1 package simplecache; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Created by yulinfeng on 12/25/16. 7 */ 8 public class Memoizer<A, V> implements Computable<A, V> { 9 private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); 10 private final Computable<A, V> c; 11 12 public Memoizer(Computable<A, V> c){ 13 this.c = c; 14 } 15 16 @Override 17 public V compute(final A arg) throws InterruptedException { 18 while (true) { 19 Future<V> f = cache.get(arg); 20 if (null == f) { 21 Callable<V> eval = new Callable<V>() { 22 @Override 23 public V call() throws Exception { 24 return c.compute(arg); 25 } 26 }; 27 FutureTask<V> ft = new FutureTask<V>(eval); 28 f = cache.putIfAbsent(arg, ft); 29 if (null == f){ 30 f = ft; 31 ft.run(); 32 } 33 } 34 try { 35 return f.get(); 36 } catch (CancellationException e){ 37 e.printStackTrace(); 38 } catch (ExecutionException e) { 39 e.printStackTrace(); 40 } 41 } 42 } 43 }
這樣我們利用ConcurrentHashMap的並發性已經putIfAbsent原子性,以及FutureTask的特性實現了一個簡單緩存。