apache ignite系列(五):分布式計算


ignite分布式計算

在ignite中,有傳統的MapReduce模型的分布式計算,也有基於分布式存儲的並置計算,當數據分散到不同的節點上時,根據提供的並置鍵,計算會傳播到數據所在的節點進行計算,再結合數據並置,相關聯的數據存儲在相同節點,這樣可以避免在計算過程中涉及到大量的數據移動,有效保證計算的性能。

ignite分布式計算的主要特點如下:

特性 描述
自動部署 計算用到的類可以自動傳播,而不需要在每個節點都部署相關的類,這個可以通過配置peerClassLoadingEnabled選項開啟計算類的自動傳播,但是緩存的實體類是無法自動傳播的。
平衡加載 數據在加載之后會在集群中進行一個再平衡的過程,保證數據均勻分布在各個節點,當有計算在集群中執行的時候,可以根據提供的並置鍵定位到數據所在節點進行計算,也就是並置計算。
故障轉移 當節點出現故障或者其它計算的時候,任務會自動轉移到集群中的其他節點執行

1.分布式閉包:

Ignite計算網格可以對集群或者集群組內的任何閉包進行廣播和負載平衡,包括純Java的runnablescallables

閉包類型 功能
broadcast 將任務傳播到部分指定節點或者全部節點
call/run 執行單個任務或者任務集
apply apply接收一個閉包和一個集合作為參數,生成與參數數量等量的任務,每個任務分別是將閉包應用在其中一個參數上,並且會返回結果集。

ComputeTestController.java


    /** broadCast測試*/
    @RequestMapping("/broadcast")
    String broadcastTest(HttpServletRequest request, HttpServletResponse response) {
//        IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());  //只傳播遠程節點
        IgniteCompute compute = ignite.compute();
        compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));
        return "all executed.";
    }

    /** call和run測試 */
    @RequestMapping("/call")
    public @ResponseBody
    String callTest(HttpServletRequest request, HttpServletResponse response) {
        Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

        /** call */
        System.out.println("-----------call-----------");
        for(String word : "How many characters".split(" ")) {
            calls.add(word::length);
//            calls.add(() -> word.length());
        }
        Collection<Integer> res = ignite.compute().call(calls);
        int total = res.stream().mapToInt(Integer::intValue).sum();
        System.out.println(String.format("the total lengths of all words is [%s].", total));

        /** run */
        System.out.println("-----------run-----------");
        for (String word : "Print words on different cluster nodes".split(" ")) {
            ignite.compute().run(() -> System.out.println(word));
        }

        /** async call */
        System.out.println("-----------async call-----------");
        IgniteCompute asyncCompute =  ignite.compute().withAsync();
        asyncCompute.call(calls);
        asyncCompute.future().listen(fut -> {
            Collection<Integer> result = (Collection<Integer>)fut.get();
            int t = result.stream().mapToInt(Integer::intValue).sum();
            System.out.println("Total number of characters: " + total);
        });

        /** async run */
        System.out.println("-----------async run-----------");
        Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
        asyncCompute = ignite.compute().withAsync();
        for (String word : "Print words on different cluster nodes".split(" ")) {
            asyncCompute.run(() -> System.out.println(word));
            futs.add(asyncCompute.future());
        }
        futs.stream().forEach(ComputeTaskFuture::get);

        return "all executed.";
    }

    /** apply測試 */
    @RequestMapping("/apply")
    public @ResponseBody
    String applyTest(HttpServletRequest request, HttpServletResponse response) {
        /** apply */
        System.out.println("-----------apply-----------");
        IgniteCompute compute = ignite.compute();
        Collection<Integer> res = compute.apply(
                String::length,
                Arrays.asList("How many characters".split(" "))
        );
        int total = res.stream().mapToInt(Integer::intValue).sum();
        System.out.println(String.format("the total lengths of all words is [%s].", total));

        /** async apply */
        IgniteCompute asyncCompute = ignite.compute().withAsync();
        res = asyncCompute.apply(
                String::length,
                Arrays.asList("How many characters".split(" "))
        );
        asyncCompute.future().listen(fut -> {
            int t = ((Collection<Integer>)fut.get()).stream().mapToInt(Integer::intValue).sum();
            System.out.println(String.format("Total number of characters: " + total));
        });

        return "all executed.";
    }

2. MapReduce:

在ignite中MapReduce的實現是ComputeTask,其主要方法是map()和reduce(),map()可以控制任務映射到節點的過程,而reduce()則是對最終計算結果集的一個處理。ComputeTask有兩個主要實現ComputeTaskAdapterComputeTaskSplitAdapter, 主要的區別在於ComputeTaskAdapter需要手動實現map()方法,而ComputeTaskSplitAdapter可以自動映射任務。

ComputeTaskAdapter

    /**ComputeTaskAdapter*/
    @RequestMapping("/taskMap")
    public @ResponseBody
    String taskMapTest(HttpServletRequest request, HttpServletResponse response) {
        /**ComputeTaskMap*/
        int cnt = ignite.compute().execute(MapExampleCharacterCountTask.class, "Hello Ignite Enable World!");

        System.out.println(String.format(">>> Total number of characters in the phrase is %s.", cnt));

        return "all executed.";
    }

    private static class MapExampleCharacterCountTask extends ComputeTaskAdapter<String, Integer> {
        /**節點映射*/
        @Override
        public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) throws IgniteException {
            Map<ComputeJob, ClusterNode> map = new HashMap<>();
            Iterator<ClusterNode> it = nodes.iterator();
            for (final String word : arg.split(" ")) {
                // If we used all nodes, restart the iterator.
                if (!it.hasNext()) {
                    it = nodes.iterator();
                }
                ClusterNode node = it.next();
                map.put(new ComputeJobAdapter() {
                    @Override
                    public Object execute() throws IgniteException {
                        System.out.println("-------------------------------------");
                        System.out.println(String.format(">>> Printing [%s] on this node from ignite job.", word));
                        return word.length();
                    }
                }, node);
            }
            return map;
        }
        /**結果匯總*/
        @Override
        public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
            int sum = 0;
            for (ComputeJobResult res : results) {
                sum += res.<Integer>getData();
            }
            return sum;
        }
    }

運行結果:

-------------------------------------
>>> Printing [Ignite] on this node from ignite job.
-------------------------------------
>>> Printing [World!] on this node from ignite job.
>>> Total number of characters in the phrase is 23.

ComputeTaskSplitAdapter

    /**ComputeTaskSplitAdapter*/
    @RequestMapping("/taskSplit")
    public @ResponseBody
    String taskSplitTest(HttpServletRequest request, HttpServletResponse response) {
        /**ComputeTaskSplitAdapter(自動映射) */
        int result = ignite.compute().execute(SplitExampleDistributedCompute.class, null);
        System.out.println(String.format(">>> result: [%s]", result));

        return "all executed.";
    }

    private static class SplitExampleDistributedCompute extends ComputeTaskSplitAdapter<String, Integer> {

        @Override
        protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteException {
            Collection<ComputeJob> jobs = new LinkedList<>();
            jobs.add(new ComputeJobAdapter() {
                @Override
                public Object execute() throws IgniteException {
//                    IgniteCache<Long, Student> cache = Ignition.ignite().cache(CacheKeyConstant.STUDENT);
                    IgniteCache<Long, BinaryObject> cache = Ignition.ignite().cache(CacheKeyConstant.STUDENT).withKeepBinary();
                    /**普通查詢*/
                    String sql_query = "name = ? and email = ?";
//                    SqlQuery<Long, Student> cSqlQuery = new SqlQuery<>(Student.class, sql_query);
                    SqlQuery<Long, BinaryObject> cSqlQuery = new SqlQuery<>(Student.class, sql_query);
                    cSqlQuery.setReplicatedOnly(true).setArgs("student_54", "student_54gmail.com");
//                  List<Cache.Entry<Long, Student>> result = cache.query(cSqlQuery).getAll();
                    List<Cache.Entry<Long, BinaryObject>> result = cache.query(cSqlQuery).getAll();
                    System.out.println("--------------------");
                    result.stream().map(x -> {
                        Integer studId = x.getValue().field("studId");
                        String name = x.getValue().field("name");
                        return String.format("name=[%s], studId=[%s].", name, studId);
                    }).forEach(System.out::println);
                    System.out.println(String.format("the query size is [%s].", result.size()));
                    return result.size();
                }
            });
            return jobs;
        }

        @Override
        public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
            int sum = results.stream().mapToInt(x -> x.<Integer>getData()).sum();
            return sum;
        }
    }

運行結果:

--------------------
name=[student_54], studId=[54].
the query size is [1].
>>> result: [1]

MapReduce的局限性:

MapReduce適合解決並行和批處理的場景,不適合串行,迭代和遞歸一類無法並行和分割任務的場景。

分布式計算存在的問題以及注意點
   在使用ignite的分布式計算功能的時候,如果用到了緩存, 並且緩存value不是平台類型(java基礎類型),則需要考慮反序列化的問題。

現有兩種解決方案:

  • 部署緩存實體類包到ignite節點

緩存實體類得實現Serializable接口,並且得指定serialVersionUID

serialVersionUID表示實體類的當前版本,每個實現Serializable接口的類都有,如果沒有的設置該值,java序列化機制會幫你默認生成一個。最好在使用serializable接口時,設定serialVersionUID為某個值,不然當在傳輸的某一端修改實體類時,serialVersionUID會被虛擬機設置成一個新的值,造成兩端的serialVersionUID不一致會發生異常。

public class Student implements Serializable {

    private static final long serialVersionUID = -5941489737545326242L;
    ....
}

將實體類打包成普通jar包,並放在$IGNITE_HOME/libs/路徑下面:

注意:打包的時候不能打包成spring-boot的可執行包,要打包成普通jar包,這樣相關類才能正常加載。當然如果集群里的節點均為應用節點,則可以不用考慮這個問題。

  • 使用二進制對象對緩存進行操作

    Ignite默認使用反序列化值作為最常見的使用場景,要啟用BinaryObject處理,需要獲得一個IgniteCache的實例然后使用withKeepBinary()方法。啟用之后,如果可能,這個標志會確保從緩存返回的對象都是BinaryObject格式的。

 IgniteCache<Long, BinaryObject> cache = ignite.cache("student").withKeepBinary();
 BinaryObject obj = cache.get(k);  //獲取二進制對象
 String name = obj.<String>field("name");  //讀取二進制對象屬性值<使用field方法>

3.並置計算:

affinityCall(...)affinityRun(...)方法使作業和緩存着數據的節點位於一處,換句話說,給定緩存名字和關系鍵,這些方法會試圖在指定的緩存中定位鍵所在的節點,然后在那里執行作業。

並置的兩種類型以及區別:

並置 特點
數據並置 將相關的緩存數據並置到一起,確保其所有鍵會緩存在同一個節點上,避免節點間數據移動產生的網絡開銷。
計算並置 根據關系鍵和緩存名稱,定位關系鍵所在節點,並在該節點執行作業單元。

ComputeTestController.class

    /**並置計算測試*/
    @RequestMapping("/affinity")
    public @ResponseBody
    String affinityTest(HttpServletRequest request, HttpServletResponse response) {

        /** affinityRun call */
        System.out.println("-----------affinityRun call-----------");
        IgniteCompute compute = ignite.compute();
//        IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());
        for(int key = 0; key < 100; key++) {
//            final long k = key;
            //生成隨機k值
            final long k = IntStream.generate(() -> (int)(System.nanoTime() % 100)).limit(1).findFirst().getAsInt();
            compute.affinityRun(CacheKeyConstant.STUDENT, k, () -> {
                IgniteCache<Long, BinaryObject> cache = ignite.cache(CacheKeyConstant.STUDENT).withKeepBinary();
                BinaryObject obj = cache.get(k);
                if(obj!=null) {
                    System.out.println(String.format("Co-located[key= %s, value= %s]", k, obj.<String>field("name")));
                }
            });
        }

        IgniteCache<Long, BinaryObject> cache = ignite.cache(CacheKeyConstant.STUDENT).withKeepBinary();
        cache.forEach(lo -> compute.affinityRun(CacheKeyConstant.STUDENT, lo.getKey(), () -> {
            System.out.println(lo.getValue().<String>field("name"));
        }));

        return "all executed.";
    }

運行結果:

-----------affinityRun call-----------
student_495
student_496
student_498
...

至此,ignite分布式計算完畢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM