java實現數據緩存


摘抄自java並發實戰


有時候需要對數據緩存。用Map緩存數據比較合適。但是由於對吞吐量,一致性,計算性能的要求,對數據進行緩存的設計還是需要慎重考慮的。

一、利用HashMap加同步

(1)說明

把HashMap當作緩存容器。每緩存一個key的時候,都進行同步。

(2)代碼

 1 package memory;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 /**
 7  * Created by adrian.wu on 2018/12/12.
 8  */
 9 public class MemoryFirst<K, V> implements Computable<K, V> {
10     private final Map<K, V> cache = new HashMap<>();
11     private final Computable<K, V> c;
12 
13     public MemoryFirst(Computable<K, V> c) {
14         this.c = c;
15     }
16 
17     @Override
18     public synchronized V compute(K arg) throws InterruptedException {
19         V result = cache.get(arg);
20 
21         if (result == null) {
22             result = c.compute(arg);
23             cache.put(arg, result);
24         }
25 
26         return result;
27     }
28 }

(3)缺點

由於HashMap並非線程安全,因此每一次計算都使用同步機制確保線程安全。很明顯,這種方式伸縮性比較差。因為一個線程正在計算結果,其它所有線程都在等待,即使對應的arg是不同的。

 

二、用ConcurrentHashMap代替HashMap

(1)說明

ConcurrentHashMap是線程安全的,並且同步並非對整個Map進行同步而是對每一個分段進行同步,所以並發性也可以大大提升。

(2)代碼

 1 package memory;
 2 
 3 import java.util.Map;
 4 import java.util.concurrent.ConcurrentHashMap;
 5 
 6 /**
 7  * Created by adrian.wu on 2018/12/12.
 8  */
 9 public class MemorySecond<K, V> implements Computable<K, V> {
10     private final Map<K, V> cache = new ConcurrentHashMap<>();
11     private final Computable<K, V> c;
12 
13     public MemorySecond(Computable<K, V> c) {
14         this.c = c;
15     }
16 
17     @Override
18     public V compute(K arg) throws InterruptedException {
19         V result = cache.get(arg);
20 
21         if (result == null) {
22             result = c.compute(arg);
23             cache.put(arg, result);
24         }
25 
26         return result;
27     }
28 }

(3)缺點

相比第一個設計方案。這種方案已經有很大的提升了。但是如果一個compute的計算開銷很大,恰巧有另一個同一個arg的線程同時請求compute,則會造成重復計算,重復put的情況。所以我們希望如果有一個線程正在計算的時候另一個線程正在等待而不是重復計算。

 

三、利用FutureTask解決第二個設計的問題

(1)說明

利用FutrueTask, 如果get到結果則返回,如果正在計算則利用FutureTask的特性阻塞。否則計算。

(2)代碼

 1 package memory;
 2 
 3 import org.slf4j.Logger;
 4 
 5 import java.util.Map;
 6 import java.util.concurrent.*;
 7 
 8 import static memory.ErrorHandler.launderThrowable;
 9 
10 /**
11  * Created by adrian.wu on 2018/12/12.
12  */
13 public class MemoryThird<K, V> implements Computable<K, V> {
14     private final Map<K, Future<V>> cache = new ConcurrentHashMap<>();
15     private final Computable<K, V> c;
16 
17     public MemoryThird(Computable<K, V> c) {
18         this.c = c;
19     }
20 
21     @Override
22     public V compute(final K arg) throws InterruptedException {
23         Future<V> f = cache.get(arg);
24         if (f == null) {
25             Callable<V> eval = new Callable<V>() {
26                 @Override
27                 public V call() throws Exception {
28                     return c.compute(arg);
29                 }
30             };
31 
32             FutureTask<V> ft = new FutureTask<>(eval);
33             f = ft;
34             cache.put(arg, ft);
35             ft.run(); // start compute
36         }
37         try {
38             return f.get();
39         } catch (ExecutionException e) {
40             throw launderThrowable(e.getCause());
41         }
42     }
43 }

(3)缺點

只有一個缺陷,仍然存在兩個線程計算出相同值的漏洞。就是由於compute方法中的if代碼塊仍然是非原子的“先檢查,再執行”,因此仍然有可能兩個線程在同一時間計算一個不存在的arg。原因是第23行的get方法和34行的put方法是對底層的Map操作,所以無法保證原子性。由於cache里面的是future而不是真正的值,所以將有可能導致緩存污染(cache pollution)問題,即如果某個計算過程被取消或者失敗,那么緩存存入的Future是有缺陷的。

 

四、最終設計方案

(1)說明

使用putIfAbsent代替put,以保證原子性。如果發現Future計算被取消或失敗則刪除,從而緩存不會消耗過多內存。

(2)代碼

 1 package memory;
 2 
 3 import java.util.Map;
 4 import java.util.concurrent.*;
 5 
 6 import static memory.ErrorHandler.launderThrowable;
 7 
 8 /**
 9  * Created by adrian.wu on 2018/12/12.
10  */
11 public class Memory<K, V> implements Computable<K, V> {
12     private Map<K, Future<V>> cache = new ConcurrentHashMap<>();
13     private Computable<K, V> c;
14 
15     public Memory(Computable<K, V> c) {
16         this.c = c;
17     }
18 
19     @Override
20     public V compute(K arg) throws InterruptedException {
21         while (true) {
22             Future<V> f = cache.get(arg);
23 
24             if (f == null) {
25                 Callable<V> eval = new Callable<V>() {
26                     @Override
27                     public V call() throws Exception {
28                         return c.compute(arg);
29                     }
30                 };
31 
32                 FutureTask<V> ft = new FutureTask<>(eval);
33                 
34                 f = cache.putIfAbsent(arg, ft); //double check
35                 if (f == null) {
36                     f = ft;
37                     ft.run(); //start compute
38                 }
39             }
40             try {
41                 return f.get();
42             } catch (CancellationException e) {
43                 cache.remove(arg);
44             } catch (ExecutionException e) {
45                 throw launderThrowable(e.getCause());
46             }
47         }
48     }
49 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM