本篇將介紹Hazelcast實現的分布式Map的原理和使用方法。
分布式Map基礎功能
Map是我們再最常用的數據接口之一,時常用於存儲某種關系值。在前面介紹Hazelcast的文章中已經用Map舉了很多分布式環境使用的例子。下面我們將由淺入深的介紹Hazelcast的Map。
在Hazelcast中自定義了一個名為IMap的接口,該接口自java.util.concurrent.ConcurrentMap接口,所以可以通過常規的Map::get和Map::put方法來控制集群Map的數據。先看一個Map使用簡單的例子:
(再次申明,文中所有的例子的代碼都在:https://github.com/chkui/hazelcast-demo,git clone到本地用Maven導入即可運行)
首先創建一個服務端節點,並向節點中的Map添加數據。
// // 服務端節點 public class ServiceNode { public static void main(String[] args) { // 獲取Hazelcast實例 HazelcastInstance ins = Hazelcast.newHazelcastInstance(); // 從集群中讀取Map實例 Map<Integer, String> map = ins.getMap("default map"); // 向集群中添加數據 System.out.println("Begin insert data"); map.put(1, "Cait Cassiopeia"); map.put(2, "Annie"); map.put(3, "Evelynn"); map.put(4, "Ashe"); System.out.println("End"); } }
然后創建一個客戶端節點,從節點的Map讀取數據。
// // 客戶端節點 public class ClientNode { public static void main(String[] args) { // 獲取Hazelcast實例 HazelcastInstance ins = Hazelcast.newHazelcastInstance(); // 從集群中讀取Map實例 Map<Integer, String> map = ins.getMap("default map"); // 輸出map中數據 map.forEach((k,v)->{ System.out.println("Pos:" + k + ". name:" + v); }); } }
這就是使用集群Map的過程,和常規Map並沒有什么差異。在使用集群Map時,最主要是了解Map的各種配置對Map功能的影響,以及Hazelcast為Map提供了哪些擴展接口。下面將會結合配置文檔,說明每一個配置參數的功效。
先看分布式Map的基礎配置參數:
<map name="default"> <backup-count>0</backup-count> <async-backup-count>1</async-backup-count> <read-backup-data>true</read-backup-data> <in-memory-format>BINARY</in-memory-format> <eviction-policy>LRU</eviction-policy> <time-to-live-seconds>0</time-to-live-seconds> <max-idle-seconds>0</max-idle-seconds> <min-eviction-check-millis>150</min-eviction-check-millis> <max-size policy="PER_NODE">5000</max-size> <eviction-percentage>25</eviction-percentage> </map>
下面這個是一個使用代碼配置的例子:
// // 代碼設置Map public class StaticMapConfig { public static void main(String[] args) { MapConfig mapConfig = new MapConfig(); mapConfig.setName("cacheMap")// 設置Map名稱 .setInMemoryFormat(InMemoryFormat.BINARY)// 設置內存格式 .setBackupCount(1);// 設置副本個數 mapConfig.getMapStoreConfig()// .setWriteDelaySeconds(60)// .setWriteBatchSize(1000);// 設置緩存格式 mapConfig.addMapIndexConfig(new MapIndexConfig().setAttribute("id").setOrdered(true));// 增加索引 mapConfig.addMapIndexConfig(new MapIndexConfig().setAttribute("name").setOrdered(true)); } }
下面我們將一一介紹每個配置的含義。
backup-count
備份副本個數[0~Integer.MAX_VALUE]。 前面的博文已經介紹,集群中分布式存儲的數據都會被均勻的存儲在每個節點上。我們使用Map進行分布式數據存儲時,每個節點會按條目(Entry)數將數據進行分布,並且每條數據都會有備份。例如集群中的一個Map有1000條數據,此時有2個節點,那么每個節點會存儲1000條數——500條主數據和500條備份數據,以此類推,當有5個節點時,每個節點200條主數據加200條備份數據。
backup-count 就是用來定義備份副本個數的,默認為1。當設置為0時,集群中不會有任何數據副本。這個參數需要根據數據的業務需要來定義,值越大,需要備份的副本就越多,集群中需要處理的數據就越多,會導致性能降低。
async-backup-count
異步備份副本的個數[0~Integer.MAX_VALUE]。這個參數和backup-count類似,也是指定備份副本的個數,區別在於這里指定的副本,是異步備份的。例如,我們執行map.put(key,value)時,Hazelcast會先向主表添加數據。這時如果指定了backup-count = 1,會先更新副本數據,然后再return到調用put方法的線程,在添加數據完成之前,調用線程都是被阻塞的。如果指定了async-backup-count = 1,那么當添加主表數據成功后,會直接返回給調用線程,然后再去異步執行數據備份。使用同步方法還是異步方法,需要根據業務數據的重要性來決定,如果是一定不能丟失的數據,最好用同步方法,如果備份出現異常,會馬上通知到調用線程然后執行補償操作。
read-backup-data
副本直讀數據[true|false]。當我們的集群中有一個map的備份數據后,這些備份數據也是分散存儲在各個節點的。當某個節點需要讀取數據時,read-backup-data設置為false表示只能從主表數據讀取,設置為true表示可以從備份副本中讀取數據。設置為true,可以提升數據的讀取數據,因為在某個節點要讀取某條數據時,該節點正好有該條數據,可以減少網絡交互消耗。但是設置為trure可能會導致“數據臟讀”。
in-memory-format
內存數據格式[BINARY|OBJECT]。
BINARY:這是默認配置。數據將被序列化成二進制的方式存儲。如果在應用中Map的主要執行的都是像get和put這樣的常規操作,建議使用這個配置。
OBJECT:數據將以非序列化的結構存儲。這個配置有利於當Map中存儲的對象比較復雜,對其進行序列化的成本較高時。當需要對存儲復雜對象的Map條目進行大量查詢時,建議使用OBJECT。
用一個場景來說明他們的區別。我們的對象都是存儲在每個節點中的,當某個節點需要get不在本地的一條數據時,Hazelcast需要去其他節點獲取數據。此時如果以二進制的方式存儲,不用進行序列化,直接將數據進行傳輸,而如果以對象的方式存儲,在傳輸之前,需要進行一次序列化操作,然后再傳遞數據。
eviction-policy
數據釋放策略[NONE|LRU|LFU]。這是Map作為緩存的一個參數,用於指定數據的回收算法。默認為NONE。
NONE:當設置為NONE時,不會發生數據回收,同時max-size會失效。但是仍然可以使用time-to-live-seconds和max-idle-seconds參數來控制數據留存時間。
LRU:“最近最少使用“策略。
LFU:“最不常用的使用”策略。
time-to-live-seconds(TTL)
數據留存時間[0~Integer.MAX_VALUE]。緩存相關參數,單位秒,默認為0。這個參數決定了一條數據在map中的停留時間。當數據在Map中留存超過這個時間並且沒有被更新時,它會根據指定的回收策略從Map中移除。值為0時,意味着無求大。
max-idle-seconds
數據的最大空閑時間[0~Integer.MAX_VALUE]。緩存相關參數,單位秒,默認值為0。當條目的空閑時間大於這個數值時,將會被自動釋放。條目的空閑是指沒get、put、EntryProcessor.process或containsKey方法被調用。默認值為0,意味着無求大。
min-eviction-check-millis
分區數據釋放檢查周期[0~Integer.MAX_VALUE]。緩存相關參數,單位秒,默認值為100。前面提到了Hazelcast會對map存儲的數據進行釋放。為了移除這些數據,有一個輪詢工作在不間斷的執行。換一種說法,就是數據釋放的頻率。當設置為0時,每一次數據的put操作,都會導致一次數據釋放執行。
max-size
Map中存儲條目的最大值[0~Integer.MAX_VALUE]。默認值為0。當條目數量接近最大值時,map將基於配置的策略進行數據釋放。如果期望max-size生效,必須eviction-policy將設置為NONE之外的其他值。max-size中包含一個屬性參數——policy,他定義了max-size對應的存儲策略,回收機制會根據這個策略檢查數據。其值有[PER_NODE|PER_PARTITION|USED_HEAP_SIZE|USED_HEAP_PERCENTAGE|FREE_HEAP_SIZE|FREE_HEAP_PERCENTAGE]。
PER_NODE:max-size指定單個集群成員中map條目的最大數量。這是max-size的默認策略。如果使用這個配置,需要注意max-size的值必須大於分區的數量(默認為271)。
PER_PARTITION:max-size指定每個分區存儲的map條目最大數。這個策略建議不要在小規模的集群中使用,因為小規模的集群,單個節點包含了大量的分區,在執行回收策略時,會去按照分區的划分組個檢查回收條件,導致效率低下。
USED_HEAP_SIZE:指在每個Hazelcast實例中,max-size指定map所占用的內存堆的(以megabytes計算,兆字節)最大值。需要注意這個策略不能工作在in-memory-format=OBJECT,因為當數據被設置為OBJECT時,無法確定所占用的內存大小。
USED_HEAP_PERCENTAGE:每個Hazelcast實例中,max-size指定map占用內存堆的百分比。例如,JVM被設置有1000MB,而這個值設置為max-size=10,當map條目數占用的堆數據超過100MB時,Hazelcast開始執行數據釋放工作。需要注意的是當使用這個策略時,不能將in-memory-format設置為OBJECT,理由同上。
FREE_HEAP_SIZE:max-size指定了單個JVM的堆最小空閑空間,單位為megabytes。
FREE_HEAP_PERCENTAGE:max-size指定單個JVM的最小空閑空間的百分比。例如JVM分配了1000MB的空間,這個值設置為10,當空閑堆只有100MB時,會引發map的數據清除放行為。
eviction-percentage
數據清理的百分比[0-100]。當觸發數據清除條件,這個參數所配置的百分比MAP條目將被釋放。例如設置為25,25%的條目將會被清除。將這個值設置較小時會導致Map中只有較少的條目被釋放,導致Hazelcast頻繁的執行數據清除操作。如果map的條目數據經常被添加,請將這個比率提高,默認為25。
Near Cache
Near cache是Hazelcast分布式Map重要的功能之一。根據前面的知識我們知道,Hazelcast的所有數據都是按照分區存儲在每個集群節點之上的。假設集群中的一個節點需要根據key讀取某條數據,而這些數據被放置在其他的節點。這樣每次Map.get操作都會導致一次網絡數據傳輸,如果節點分布較廣、傳輸能力參差不齊,會導致大量的網絡擁塞,進而影響每個節點的執行。尤其是某個map的讀操作遠遠多於寫操作時,我們可以考慮使用Near cache功能。Near cache會將那些被某個節點經常使用的數據存儲到當前節點或“附近”節點,以此來減少過多的網絡傳輸工作。使用Near cache也會導致一些問題出現,在使用之前,必須了解一下問題:
- 使用Near cache功能會導致集群中的成員額外存儲緩存數據,會增加內存的消耗。
- Near cache會破壞數據一致性性,可能會出現“臟讀”現象,因此在頻繁寫或數據一致性要求較高的應用中不建議使用。
- 建議在高頻讀操作的Map中啟用Near cache功能,這樣可以極大的提升執行效率。
Near cache的配置都在near-cache元素中。下面介紹Near cache的相關參數。
<map name="my-read-mostly-map"> <near-cache name="default"> <in-memory-format>BINARY</in-memory-format> <max-size>5000</max-size> <time-to-live-seconds>0</time-to-live-seconds> <max-idle-seconds>60</max-idle-seconds> <eviction-policy>LRU</eviction-policy> <invalidate-on-change>true</invalidate-on-change> <cache-local-entries>false</cache-local-entries> </near-cache> </map>
in-memory-format
與Map的in-memory-format配置一樣,指定了Map在Near cache中的存儲格式。參見前文介紹的in-memory-format功能。
max-size
Near cache緩存中存儲的最大條目數[0~Integer.MAX_VALUE]。Near cache會根據eviction-policy指定的策略來釋放數據。默認為0,表示不限定最大條目數。
time-to-live-seconds
單條數據在Near cache中的最大駐留時間[0~Integer.MAX_VALUE]。單位秒,默認為0。如果存儲在Near cache中的某條數據在Near cache中的駐留時間(沒有被更新)超過這個時間,則在執行數據回收時會被釋放掉。值為0時表示永遠不會過期。
max-idle-seconds
單條數據在Near cache中的最大失效時間[0~Integer.MAX_VALUE]。單位秒,默認值為0。如果存儲在Near cache中的某條數據在指定時間內沒有被讀取,則認為該條數據失效。此時在執行數據回收時會釋放掉該條數據。值為0時表示用於不會失效。
eviction-policy
數據釋放策略,見前面 Map釋放策略 的說明。
invalidate-on-change
設定當Near cache中的某條數據被更新或移除時,是否對其執行釋放[true|false]。默認為true。
cache-local-entries
指定那些已經被存儲在當前節點的數據條目,是否也進行Near cache緩存[true|false]。這個參數最大的作用在於,可以將Near cache的內存格式設定成和Map存儲格式不一樣的方式。默認為fasle。
MapStore數據持久化
后續的篇幅將介紹Hazelcast分布式Map的一些基礎功能。這里先介紹如何對數據庫進行數據讀寫。Hazelcast分布式Map的持久化數據讀寫通過MapStore來實現。請看下面這個例子:
先是一個配置文件,后面在說他的意義:
<!--https://github.com/chkui/hazelcast-demo/blob/master/src/main/java/org/palm/hazelcast/map/store/mapStoreConfig.xml --> <?xml version="1.0" encoding="UTF-8"?> <hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd" xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <map name="demo"> <map-store enabled="true" initial-mode="EAGER"> <class-name>org.palm.hazelcast.map.store.MapStoreExample</class-name> <write-delay-seconds>60</write-delay-seconds> <write-batch-size>1000</write-batch-size> <write-coalescing>true</write-coalescing> </map-store> </map> </hazelcast>
然后定義一個當Map發生數據讀寫時對數據庫進行操作的MapStore類:
//https://github.com/chkui/hazelcast-demo/blob/master/src/main/java/org/palm/hazelcast/map/store/MapStoreExample.java public class MapStoreExample implements MapStore<Integer, String> { Map<Integer, String> store; public MapStoreExample(){ store = new HashMap<Integer, String>(); store.put(1, "Azeroth"); store.put(2, "Duskwood"); store.put(3, "Elwynn Forest"); store.put(4, "Deadwind Pass"); store.put(5, "Dead Mines"); store.put(6, "Grand Hamlet"); store.put(7, "Dark Portal"); store.put(8, "Ashenvale"); store.put(9, "Felwood"); store.put(10, "Orgrimmar"); } @Override public String load(Integer key) {//讀取 if(store.size() < key){ key = 0; } return store.get(key); } @Override public Map<Integer, String> loadAll(Collection<Integer> arg0) {//讀取所有 return store; } @Override public Iterable<Integer> loadAllKeys() {// 讀取所有鍵值 return store.keySet(); } @Override public void delete(Integer key) {// 刪除鍵值 store.remove(key); } @Override public void deleteAll(Collection<Integer> list) {// 刪除所有鍵值 list.forEach(key->store.remove(key)); } @Override public void store(Integer key, String value) {// 存儲鍵值 store.put(key, value); } @Override public void storeAll(Map<Integer, String> map) {// 存儲所有鍵值 store.putAll(map); } }
最后時一個測試main:
//https://github.com/chkui/hazelcast-demo/blob/master/src/main/java/org/palm/hazelcast/map/store/MapStoreExampleMain.java public class MapStoreExampleMain { public static void main(String[] args) throws FileNotFoundException { //加載配置 Config config = new ClasspathXmlConfig("org/palm/hazelcast/map/store/mapStoreConfig.xml"); //創建Hazelcast實力 HazelcastInstance ins = Hazelcast.newHazelcastInstance(config); //獲取Map Map<Integer, String> map = ins.getMap("demo"); println(map.get(1));//輸出第一條數據 map.put(11, "Moonbrook");//添加一條數據 println(map.get(11));//輸出第一條數據 map.remove(11);//移除添加的數據 println(map.get(11));//輸出被移除的數據 } static private void println(Object obj){ System.out.println(obj); } }
仔細看代碼例子的兄弟應該明白怎么回事了吧。配置文件中<map-store>元素定義了Mapstore的行為,<class-name>定義了當發生數據讀寫時要調用的存儲類,該類需要實現MapStore接口。MapStore接口定義了當對Map進行put、get、remove操作時會被調用實現類的store、load、delete方法,我們可以通過自己的代碼來完成對數據庫的寫入和讀取操作。
MapStore支持Read-Through、Write-Through、Write-Behind模式,下面簡單介紹一下這三種模式:
1.Read-Through:當應用系統向緩存系統請求數據時(例如使用key=x向緩存請求數據);如果緩存中並沒有對應的數據存在(key=x的value不存在),緩存系統將向底層數據源的讀取數據。如果數據在緩存中存在(命中key=x),則直接返回緩存中存在的數據。這就是所謂的Read-throug。
2.Write-Through:當應用系統對緩存中的數據進行更新時(例如調用put方法更新或添加條目),緩存系統會同步更新緩存數據和底層數據源。
3.Write-Behind:當應用系統對緩存中的數據進行更新時(例如調用put方法更新或添加條目),緩存系統會在指定的時間后向底層數據源更新數據。
write-delay-seconds
我們可以使用<write-delay-seconds>來指定Map是使用Write-Through模式還是Write-Behind模式。
當設定<write-delay-seconds>為0時,表示當執行Map::put時立刻調用注冊的Mapstore的store方法,直到自定義的代碼執行完畢返回后,Map::put方法才會返回,整個過程都會阻塞線程。這樣就實現了Write-Through模式。
當設置<write-delay-seconds>大於0時,表示延遲指定的時間后(秒)再異步調用Mapstore::store方法。整個過程不會產生阻塞,數據被添加到Map里后就返回給調用者。這就實現了Write-Behind模式。
使用什么模式,請根據你的業務要求設置。下面是其他幾個參數的含義。
write-batch-size
批量更新參數[0~Integer.MAX_VALUE]。這個參數用於指定當累計多少次更新數據之后再調用Mapstore::store一次性寫入數據庫。例如設置為50,只有調用50次Map::put方法后,Hazelcast才會去調用一次storeAll,並且傳入所有的更新數據。如果運行我上面提供的例子,你會發現MapStoreExample的store和remove方法並沒有被調用。
write-coalescing
標記是否執行所有更新[true|false]。默認為true。用於標記Mapstore::store是否獲取所有的更新。在Write-Behind模式下,在延遲的時間中,可能已經對某個key對應的value值進行了多次更新,若參數設定為true,表示只傳遞最后一次更新給Mapstore::store(Mapstore::storeAll)。如果設置為false,會將所有的更新傳遞給Mapstore::storeAll。
enabled屬性
表示是否啟用Mapstore[true|false]。
initial-mode屬性
初始化模式[LAZY|EAGER]。默認為LAZY,在此參數下,會對Mapstore執行異步初始化。當設置為“EAGER”時, 初始化過程將被阻塞,直到加載完成。
除了上面的配置,我們可以通過Mapstore::loadAllKeys方法來設定當Map初始化時要加載的數據。如果loadAllKeys返回null,則不預加載任何數據。因此我們可以在loadAllKeys方法中指定當Map初始化時需要先加載的數據。
Map攔截器(Interceptors)
我們可以為Map的所有方法添加攔截器,攔截器類似於AOP機制。對某些方法增加攔截器后,當這些方法被調用時,會根據攔截器的配置進入攔截器。攔截器的代碼都是會組賽線程的,也就是說我們我們在攔截器中處理完某些事物后,必須馬上返回。因為組賽了線程,攔截器的功能強大,可以對返回值、更新值進行任何修改。
攔截器采取鏈式操作,也就是說可以為一個方法增加多個攔截器,這些攔截器會根據添加的次序逐個被調用。Hazelcast會根據攔截器的hashCode()方法來判斷是否是同一個攔截器,如果是具有相同的hashcode,則認為是同一個攔截器,不會被添加。因此建議最好根據需要重載hashcode方法,以免重復添加相同的攔截器。
下面是一個代碼的例子,首先是實現了一個攔截器:
// https://github.com/chkui/hazelcast-demo/blob/master/src/main/java/org/palm/hazelcast/map/interceptor/InterceptorExample.java public class InterceptorExample implements MapInterceptor, Serializable { private static final long serialVersionUID = -7591859317633144192L; @Override /**攔截get。可以返回其他值來替換原有值。返回null將不影響原有操作。 */ public Object interceptGet(Object obj) { prinfln("get : " + obj); return obj; } @Override /**在get操作執行完畢后被調用。*/ public void afterGet(Object obj) { prinfln("after get : " + obj); } @Override /**攔截put。返回的值會設置到map中。返回null時原有的put數據不會發生任何改變。 拋出異常會取消put操作。 */ public Object interceptPut(Object oldValue, Object newValue ) { prinfln("put old value : " + oldValue); prinfln("put new value : " + newValue); return newValue; } @Override /**在put操作執行完畢后被調用。*/ public void afterPut(Object obj) { prinfln("after put : " + obj); } @Override /**攔截remove。返回被刪除對象或null將繼續執行刪除。 拋出異常會取消remove操作。 */ public Object interceptRemove(Object obj) { prinfln("remove : " + obj); return null; } @Override /**在remove操作執行完畢后被調用。*/ public void afterRemove(Object obj) { prinfln("afrer remove : " + obj); } private void prinfln(Object obj){ System.out.println(obj); } }
Map攔截器的攔截器需要實現MapInterceptor接口。這些接口提供了針對get、put、remove實現了攔截。然后下面是一個使用的例子:
public class InterceptorDemo { public static void main(String[] args) { HazelcastInstance ins = Hazelcast.newHazelcastInstance(); IMap<Integer, String> imap = ins.getMap(""); imap.addInterceptor(new InterceptorExample());// 添加攔截器 imap.put(1, "Mei"); imap.put(1, "Tracer"); imap.put(1, "D.va"); imap.put(1, "Mercy"); imap.get(1); imap.remove(1); System.out.println(imap.get(1)); } }
我們使用IMap接口來獲取map實例。然后使用IMap::addInterceptor方法來增加前面實現的攔截器。隨后,所有針對這個Map的get、put、remove都會進入我們設定的攔截器。IMap::removeInterceptor可以用來移除一個攔截器。
Map事件監聽器
除了攔截器,Hazelcast還有監聽器。監聽器和攔截器的區別在於:攔截器會嵌入到業務流程中去,攔截器可以在處理數據的過程中改變數據和行為。而監聽器並不會攝入到處理邏輯中,他只是觀察到發生某個事件后,通知我們注冊的監聽器。下面還是同一個代碼的例子說明監聽器,先創建一個監聽器:
// https://github.com/chkui/hazelcast-demo/blob/master/src/main/java/org/palm/hazelcast/map/listener/ListenerExample.java public class ListenerExample implements EntryAddedListener<Integer, String>, EntryRemovedListener<Integer, String>, EntryUpdatedListener<Integer, String> { @Override public void entryUpdated(EntryEvent<Integer, String> entry) {//監聽更新數據 print("put entry. key = " + entry.getKey() + ". value = " + entry.getValue()); } @Override public void entryRemoved(EntryEvent<Integer, String> entry) {//監聽移除數據 print("remove entry. key = " + entry.getKey() + ". value = " + entry.getValue()); } @Override public void entryAdded(EntryEvent<Integer, String> entry) {//監聽新增數據 print("add entry. key = " + entry.getKey() + ". value = " + entry.getValue()); } private void print(Object obj){ System.out.println(obj); } }
一個監聽器可以實現多個監聽接口,除了例子中的接口,還有EntryEvictedListener(釋放單條數據)、MapEvictedListener(清除Map數據)、MapClearedListener(清空Map數據)等。
至此,Hazelcasl分布式Map的基本功能就介紹完畢了。