之前的文章我們其實已經用到了兩種不同的方式訪問Ignite中的數據。一種方式是第一篇文章中提到通過JDBC客戶端用SQL訪問數據,在這篇文章中我們也會看到不使用JDBC,如何通過Ignite API用SQL訪問數據。還有用一種方式我稱之為cache API, 即用get/put來訪問數據。Ignite實現了JCache(JSR 107)標准,所以除了基本的cache操作外,我們也會介紹一些cache的原子操作和EntryProcessor的使用。
Cache API
Ignite提供了類似Map的API用來操作緩存上的數據,只不過Ignite的實現把這個Map上的數據分布在多個節點上,並且保證了這些操作是多線程/進程安全的。我們可以簡單的在多個節點上使用get/put往Ignite緩存里讀寫數據,而把數據同步,並發控制等復雜問題留給Ignite來解決。除了get/put操作外,Ignite還提供了其他的原子操作以及異步操作,比如getAndPutIfAbsent, getAndPutAsync, putIfAbsent, putIfAbsentAsync, getAndReplace, getAndReplaceAsync等,完整的API列表可以看這里。
Ignite也支持在JCache標准中定義的entry processor。我沒仔細讀過JCache中對entry processor的定義,但根據Ignite的文檔和使用經驗,相比於基本的緩存get/put操作,entry processor有下面幾個特性/優點:
- 相比於get/put等基本操作,在entry processor中我們可以實現更為復雜的cache更新邏輯,比如我們可以讀出緩存中的某個值,然后做一些自定義計算后,再更新緩存中的值。
- 和get/put/putIfAbsent等操作一樣,在entry processor中所有的操作是原子性的, 即保證了entry processor中定義的操作要么都成功,要么都失敗。如果不用entry processor,為了達到相同目的,我們需要對需要要更新的緩存數據加鎖,更新緩存數據,最后釋放鎖。而有了entry proce,我們可以更專注於緩存更新的邏輯,而不用考慮如何加解鎖。
- Entry processor允許在數據節點上直接進行操作。分布式緩存中,如果更新的緩存數據需要根據已經在緩存中的數據計算得到,往往需要在多個節點之間傳送的緩存數據。而entry processor是把操作序列化后發送到緩存數據所在的節點,比起序列化緩存數據,要更高效。
Entry Processor代碼示例
下面我們改造一下之前的例子,看看在Ignite中如何實現並調用一個entry processor。在這個例子中,cache中key的值依舊是城市的名字,但是value的值不再是簡單的城市所在省份的名字,而是一個City類的實例。下面是City類的定義:
public class City {
private String cityName;
private String provinceName;
private long population;
public City(String cityName, String provinceName, long population) {
this.cityName = cityName;
this.provinceName = provinceName;
this.population = population;
}
...
}
在City類中,我們放了一個population的成員變量,用來表示該城市的人口數量。在主程序中,我們創建多個線程,通過entry processor不斷修改不同城市的人口數量。每個entry processor做的事情也很簡單: 讀取當前人口數量加1,再把新值更新到cache中。下面是主程序的代碼
public class IgniteEntryProcessorExample {
public static void main(String[] args) {
// start an ignite cluster
Ignite ignite = startCluster(args);
CacheConfiguration<String, City> cacheCfg = new CacheConfiguration<>();
cacheCfg.setName("CITY");
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
cacheCfg.setBackups(1);
IgniteCache<String, City> cityProvinceCache = ignite.getOrCreateCache(cacheCfg);
// let's create a city and put it in the cache
City markham = new City("Markham", "Ontario", 0);
cityProvinceCache.put(markham.getCityName(), markham);
System.out.println("Insert " + markham.toString());
// submit two tasks to increase population
ExecutorService service = Executors.newFixedThreadPool(2);
IncreaseCityPopulationTask task1 = new IncreaseCityPopulationTask(cityProvinceCache, markham.getCityName(), 10000);
IncreaseCityPopulationTask task2 = new IncreaseCityPopulationTask(cityProvinceCache, markham.getCityName(), 20000);
Future<?> result1 = service.submit(task1);
Future<?> result2 = service.submit(task2);
System.out.println("Submit two tasks to increase the population");
service.shutdown();
try {
service.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// get the population and check whether it is 30000
City city = cityProvinceCache.get(markham.getCityName());
if (city.getPopulation() != 30000) {
System.out.println("Oops, the population is " + city.getPopulation() + " instead of 30000");
} else {
System.out.println("Yeah, the population is " + city.getPopulation());
}
}
public static class IncreaseCityPopulationTask implements Runnable {
private IgniteCache<String, City> cityProvinceCache;
private String cityName;
private long population;
public IncreaseCityPopulationTask(IgniteCache<String, City> cityProvinceCache,
String cityName, long population) {
this.cityProvinceCache = cityProvinceCache;
this.cityName = cityName;
this.population = population;
}
@Override
public void run() {
long p = 0;
while(p++ < population) {
cityProvinceCache.invoke(cityName, new EntryProcessor<String, City, Object>() {
@Override
public Object process(MutableEntry<String, City> mutableEntry, Object... objects)
throws EntryProcessorException {
City city = mutableEntry.getValue();
if (city != null) {
city.setPopulation(city.getPopulation() + 1);
mutableEntry.setValue(city);
}
return null;
}
});
}
}
}
private static Ignite startCluster(String[] args) {
...
}
}
- 4~10行,和之前的例子一樣,我們啟動一個Ignite節點,並且創建一個名為“CITY”的cache,cache的key是城市的名字(String),cache的value是一個City的對象實例。
- 13~15行,我們創建了一個名字為“Markham”的City實例,它的初始population值是0。
- 18~30行,我們創建了2個線程,每個線程啟動后都會調用IncreaseCityPopulationTask的Run()函數,不同的是在線程創建時我們指定了不同的population增加次數,一個增加10000次,一個增加20000次。
- 在33~38行,我們從cache中取回名為"Markham"的實例,並檢查它最終的人口數量是不是30000。如果兩個線程之間的操作(讀cache,增加人口,寫cache)是原子操作的話,那么最終結果應該是30000。
- 57~68是Entry Processor的具體用法,通過cityProvinceCache.invoke()函數就可以調用entry processor,invoke()函數的第一參數是entry processor要作用的數據的key。第二個參數是entry processor的一個實例,該實例必須要實現接口類EntryProcessor的process()函數。在第二個參數之后,還可以傳入多個參數,調用時這些參數會傳給process()函數。
- 在process()函數的中,第一個參數mutableEntry包含了process()函數作用的數據的key和value,可以通過MutableEntry.getKey()和MutableEntry.getValue()得到(如果該key的value不存在cache中,getValue()會返回null)。第二個之后的objects參數,是調用invoke()函數時除了key和EntryProcessor之外,傳入的參數。
- 在entry processor中可以實現一些復雜的邏輯,然后調用MutableEntry.setValue()對value值進行修改。如果需要刪除value,調用MutableEntry.remove()。
- EntryProcessor()被調用時,cache中對應的key值會被加鎖,所以對同一個鍵值的不同entry processor之間是互斥的,保證了一個entry processor中的所有操作是原子操作。
- 另外,有一點需要注意的是,在entry processor中的操作需要時無狀態的,因為同一個entry processor有可能會在primary和backup節點上執行多次,所以要保證entry processor中的操作只和cache中的當前值相關,如果還和當前節點的一些參數和狀態相關,會導致在不同節點上運行entry processor后寫入cache的值不一致。詳情見invoke()函數的文檔。
總結
這篇文章我們介紹了Ignite Cache基本的put/get()操作外的其他操作,比如異步的操作和entry processor**這篇文章里用到的例子的完整代碼和maven工程可以在這里找到。
下一篇文章,我們會繼續看看如何使用Ignite的SQL API對cache進行查詢和修改。