ignite分布式計算
在ignite中,有傳統的MapReduce模型的分布式計算,也有基於分布式存儲的並置計算,當數據分散到不同的節點上時,根據提供的並置鍵,計算會傳播到數據所在的節點進行計算,再結合數據並置,相關聯的數據存儲在相同節點,這樣可以避免在計算過程中涉及到大量的數據移動,有效保證計算的性能。
ignite分布式計算的主要特點如下:
特性 | 描述 |
---|---|
自動部署 | 計算用到的類可以自動傳播,而不需要在每個節點都部署相關的類,這個可以通過配置peerClassLoadingEnabled 選項開啟計算類的自動傳播,但是緩存的實體類是無法自動傳播的。 |
平衡加載 | 數據在加載之后會在集群中進行一個再平衡的過程,保證數據均勻分布在各個節點,當有計算在集群中執行的時候,可以根據提供的並置鍵定位到數據所在節點進行計算,也就是並置計算。 |
故障轉移 | 當節點出現故障或者其它計算的時候,任務會自動轉移到集群中的其他節點執行 |
1.分布式閉包:
Ignite計算網格可以對集群或者集群組內的任何閉包進行廣播和負載平衡,包括純Java的
runnables
和callables
閉包類型 | 功能 |
---|---|
broadcast | 將任務傳播到部分指定節點或者全部節點 |
call/run | 執行單個任務或者任務集 |
apply | apply接收一個閉包和一個集合作為參數,生成與參數數量等量的任務,每個任務分別是將閉包應用在其中一個參數上,並且會返回結果集。 |
ComputeTestController.java
/** broadCast測試*/
@RequestMapping("/broadcast")
String broadcastTest(HttpServletRequest request, HttpServletResponse response) {
// IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes()); //只傳播遠程節點
IgniteCompute compute = ignite.compute();
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));
return "all executed.";
}
/** call和run測試 */
@RequestMapping("/call")
public @ResponseBody
String callTest(HttpServletRequest request, HttpServletResponse response) {
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
/** call */
System.out.println("-----------call-----------");
for(String word : "How many characters".split(" ")) {
calls.add(word::length);
// calls.add(() -> word.length());
}
Collection<Integer> res = ignite.compute().call(calls);
int total = res.stream().mapToInt(Integer::intValue).sum();
System.out.println(String.format("the total lengths of all words is [%s].", total));
/** run */
System.out.println("-----------run-----------");
for (String word : "Print words on different cluster nodes".split(" ")) {
ignite.compute().run(() -> System.out.println(word));
}
/** async call */
System.out.println("-----------async call-----------");
IgniteCompute asyncCompute = ignite.compute().withAsync();
asyncCompute.call(calls);
asyncCompute.future().listen(fut -> {
Collection<Integer> result = (Collection<Integer>)fut.get();
int t = result.stream().mapToInt(Integer::intValue).sum();
System.out.println("Total number of characters: " + total);
});
/** async run */
System.out.println("-----------async run-----------");
Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
asyncCompute = ignite.compute().withAsync();
for (String word : "Print words on different cluster nodes".split(" ")) {
asyncCompute.run(() -> System.out.println(word));
futs.add(asyncCompute.future());
}
futs.stream().forEach(ComputeTaskFuture::get);
return "all executed.";
}
/** apply測試 */
@RequestMapping("/apply")
public @ResponseBody
String applyTest(HttpServletRequest request, HttpServletResponse response) {
/** apply */
System.out.println("-----------apply-----------");
IgniteCompute compute = ignite.compute();
Collection<Integer> res = compute.apply(
String::length,
Arrays.asList("How many characters".split(" "))
);
int total = res.stream().mapToInt(Integer::intValue).sum();
System.out.println(String.format("the total lengths of all words is [%s].", total));
/** async apply */
IgniteCompute asyncCompute = ignite.compute().withAsync();
res = asyncCompute.apply(
String::length,
Arrays.asList("How many characters".split(" "))
);
asyncCompute.future().listen(fut -> {
int t = ((Collection<Integer>)fut.get()).stream().mapToInt(Integer::intValue).sum();
System.out.println(String.format("Total number of characters: " + total));
});
return "all executed.";
}
2. MapReduce:
在ignite中MapReduce的實現是ComputeTask
,其主要方法是map()和reduce(),map()可以控制任務映射到節點的過程,而reduce()則是對最終計算結果集的一個處理。ComputeTask
有兩個主要實現ComputeTaskAdapter
和ComputeTaskSplitAdapter
, 主要的區別在於ComputeTaskAdapter
需要手動實現map()方法,而ComputeTaskSplitAdapter
可以自動映射任務。
ComputeTaskAdapter:
/**ComputeTaskAdapter*/
@RequestMapping("/taskMap")
public @ResponseBody
String taskMapTest(HttpServletRequest request, HttpServletResponse response) {
/**ComputeTaskMap*/
int cnt = ignite.compute().execute(MapExampleCharacterCountTask.class, "Hello Ignite Enable World!");
System.out.println(String.format(">>> Total number of characters in the phrase is %s.", cnt));
return "all executed.";
}
private static class MapExampleCharacterCountTask extends ComputeTaskAdapter<String, Integer> {
/**節點映射*/
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) throws IgniteException {
Map<ComputeJob, ClusterNode> map = new HashMap<>();
Iterator<ClusterNode> it = nodes.iterator();
for (final String word : arg.split(" ")) {
// If we used all nodes, restart the iterator.
if (!it.hasNext()) {
it = nodes.iterator();
}
ClusterNode node = it.next();
map.put(new ComputeJobAdapter() {
@Override
public Object execute() throws IgniteException {
System.out.println("-------------------------------------");
System.out.println(String.format(">>> Printing [%s] on this node from ignite job.", word));
return word.length();
}
}, node);
}
return map;
}
/**結果匯總*/
@Override
public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
int sum = 0;
for (ComputeJobResult res : results) {
sum += res.<Integer>getData();
}
return sum;
}
}
運行結果:
-------------------------------------
>>> Printing [Ignite] on this node from ignite job.
-------------------------------------
>>> Printing [World!] on this node from ignite job.
>>> Total number of characters in the phrase is 23.
ComputeTaskSplitAdapter:
/**ComputeTaskSplitAdapter*/
@RequestMapping("/taskSplit")
public @ResponseBody
String taskSplitTest(HttpServletRequest request, HttpServletResponse response) {
/**ComputeTaskSplitAdapter(自動映射) */
int result = ignite.compute().execute(SplitExampleDistributedCompute.class, null);
System.out.println(String.format(">>> result: [%s]", result));
return "all executed.";
}
private static class SplitExampleDistributedCompute extends ComputeTaskSplitAdapter<String, Integer> {
@Override
protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteException {
Collection<ComputeJob> jobs = new LinkedList<>();
jobs.add(new ComputeJobAdapter() {
@Override
public Object execute() throws IgniteException {
// IgniteCache<Long, Student> cache = Ignition.ignite().cache(CacheKeyConstant.STUDENT);
IgniteCache<Long, BinaryObject> cache = Ignition.ignite().cache(CacheKeyConstant.STUDENT).withKeepBinary();
/**普通查詢*/
String sql_query = "name = ? and email = ?";
// SqlQuery<Long, Student> cSqlQuery = new SqlQuery<>(Student.class, sql_query);
SqlQuery<Long, BinaryObject> cSqlQuery = new SqlQuery<>(Student.class, sql_query);
cSqlQuery.setReplicatedOnly(true).setArgs("student_54", "student_54gmail.com");
// List<Cache.Entry<Long, Student>> result = cache.query(cSqlQuery).getAll();
List<Cache.Entry<Long, BinaryObject>> result = cache.query(cSqlQuery).getAll();
System.out.println("--------------------");
result.stream().map(x -> {
Integer studId = x.getValue().field("studId");
String name = x.getValue().field("name");
return String.format("name=[%s], studId=[%s].", name, studId);
}).forEach(System.out::println);
System.out.println(String.format("the query size is [%s].", result.size()));
return result.size();
}
});
return jobs;
}
@Override
public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
int sum = results.stream().mapToInt(x -> x.<Integer>getData()).sum();
return sum;
}
}
運行結果:
--------------------
name=[student_54], studId=[54].
the query size is [1].
>>> result: [1]
MapReduce的局限性:
MapReduce適合解決並行和批處理的場景,不適合串行,迭代和遞歸一類無法並行和分割任務的場景。
分布式計算存在的問題以及注意點
在使用ignite的分布式計算功能的時候,如果用到了緩存, 並且緩存value不是平台類型(java基礎類型),則需要考慮反序列化的問題。
現有兩種解決方案:
- 部署緩存實體類包到ignite節點
緩存實體類得實現Serializable接口,並且得指定serialVersionUID
serialVersionUID表示實體類的當前版本,每個實現Serializable接口的類都有,如果沒有的設置該值,java序列化機制會幫你默認生成一個。最好在使用serializable接口時,設定serialVersionUID為某個值,不然當在傳輸的某一端修改實體類時,serialVersionUID會被虛擬機設置成一個新的值,造成兩端的serialVersionUID不一致會發生異常。
public class Student implements Serializable {
private static final long serialVersionUID = -5941489737545326242L;
....
}
將實體類打包成普通jar包,並放在$IGNITE_HOME/libs/路徑下面:
注意:打包的時候不能打包成spring-boot的可執行包,要打包成普通jar包,這樣相關類才能正常加載。當然如果集群里的節點均為應用節點,則可以不用考慮這個問題。
-
使用二進制對象對緩存進行操作
Ignite默認使用反序列化值作為最常見的使用場景,要啟用
BinaryObject
處理,需要獲得一個IgniteCache
的實例然后使用withKeepBinary()
方法。啟用之后,如果可能,這個標志會確保從緩存返回的對象都是BinaryObject
格式的。
IgniteCache<Long, BinaryObject> cache = ignite.cache("student").withKeepBinary();
BinaryObject obj = cache.get(k); //獲取二進制對象
String name = obj.<String>field("name"); //讀取二進制對象屬性值<使用field方法>
3.並置計算:
affinityCall(...)
和affinityRun(...)
方法使作業和緩存着數據的節點位於一處,換句話說,給定緩存名字和關系鍵,這些方法會試圖在指定的緩存中定位鍵所在的節點,然后在那里執行作業。
並置的兩種類型以及區別:
並置 | 特點 |
---|---|
數據並置 | 將相關的緩存數據並置到一起,確保其所有鍵會緩存在同一個節點上,避免節點間數據移動產生的網絡開銷。 |
計算並置 | 根據關系鍵和緩存名稱,定位關系鍵所在節點,並在該節點執行作業單元。 |
ComputeTestController.class
/**並置計算測試*/
@RequestMapping("/affinity")
public @ResponseBody
String affinityTest(HttpServletRequest request, HttpServletResponse response) {
/** affinityRun call */
System.out.println("-----------affinityRun call-----------");
IgniteCompute compute = ignite.compute();
// IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());
for(int key = 0; key < 100; key++) {
// final long k = key;
//生成隨機k值
final long k = IntStream.generate(() -> (int)(System.nanoTime() % 100)).limit(1).findFirst().getAsInt();
compute.affinityRun(CacheKeyConstant.STUDENT, k, () -> {
IgniteCache<Long, BinaryObject> cache = ignite.cache(CacheKeyConstant.STUDENT).withKeepBinary();
BinaryObject obj = cache.get(k);
if(obj!=null) {
System.out.println(String.format("Co-located[key= %s, value= %s]", k, obj.<String>field("name")));
}
});
}
IgniteCache<Long, BinaryObject> cache = ignite.cache(CacheKeyConstant.STUDENT).withKeepBinary();
cache.forEach(lo -> compute.affinityRun(CacheKeyConstant.STUDENT, lo.getKey(), () -> {
System.out.println(lo.getValue().<String>field("name"));
}));
return "all executed.";
}
運行結果:
-----------affinityRun call-----------
student_495
student_496
student_498
...
至此,ignite分布式計算完畢。