分布式一致性算法(2)--Paxos java代码的实现


java代码的实现   https://github.com/wangjiuong/Distributed/tree/master/MyPaxosDemo

 

对于部分代码实现 参考了文章 http://blog.csdn.net/21aspnet/article/details/50700123

 

Client的主要流程如下:

 @Override public void run() {
        int halfCount = ((int) acceptors.size() / 2) + 1;
        while (true) {
            round++;
            System.out.println(
                "Thread Name" + Thread.currentThread().getName() + " " + name + " round   "
                    + (round) + "   " + proposal);
            List<Acceptor> onPrepareSuccess = new ArrayList<Acceptor>();
            HashMap<Proposal, Integer> ProposalCount = new HashMap<>();
            for (Acceptor acceptor : acceptors) {
                Promise prepareResult = acceptor.onPrepare(proposal);
                if (prepareResult != null) {
                    if (prepareResult.isAcctped()) {
                        System.out.println(
                            "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + "   accepted by " + acceptor);
                        //决策者已经接受该提议
                        onPrepareSuccess.add(acceptor);
                    } else {
                        System.out.println(
                            "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + "   refused by " + acceptor);
                        //决策者拒绝了该提议,
                        if (prepareResult.getStatus() == ProPosalStatus.ACCESPTED) {
                            //表示该节点已经确认了某一个提案,将其保存下来
                            System.out.println(
                                "Thread Name" + Thread.currentThread().getName() + " find one accepted Proposal " + prepareResult+" and  save it ");
                            Proposal acceptedAcceptorProposal = prepareResult.getProposal();
/*                            if (proposal.getSerialId() <= acceptedAcceptorProposal.getSerialId()) {
                                System.out.println(
                                    "Thread Name" + Thread.currentThread().getName() + " current serial is less accepted proposal,update the serial ID ");
                                //表明当前正在提交的提案比已经确认的提案要小,那么将当前提案的序列增加1
                                proposal.setSerialId(acceptedAcceptorProposal.getSerialId() + 1);
                            }*/
                            int count = 1;
                            if (ProposalCount.containsKey(acceptedAcceptorProposal)) {
                                count = ProposalCount.get(acceptedAcceptorProposal) + 1;
                            }
                            ProposalCount.put(acceptedAcceptorProposal, count);
                        } /*else if (prepareResult.getProposal().getSerialId() >= proposal
                            .getSerialId()) {
                            //当前决策者的提案大于本client的提案
                            proposal.setSerialId(prepareResult.getProposal().getSerialId() + 1);
                            break;
                        }*/
                    }
                }
            }
            info();
            boolean existVote = false;
            boolean continuePrePare = true;
            if (onPrepareSuccess.size() < halfCount) {
                //在prePare阶段没有超过一半的投票
                proposal = Util.nextProposal(proposal);
                for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) {
                    if (entry.getValue() >= halfCount) {
                        //表明该提案已经超过一半人同意,得到后直接退出流程
                        proposal = entry.getKey();
                        existVote = true;
                        break;
                    }
                }
            } else {
                //在prePare阶段有超过一半的投票
                continuePrePare = false;
            }
            if (existVote) {
                //已经找到当前达成一致的提案
                break;
            } else if (continuePrePare) {
                //继续投票
                generatorNextProposal(ProposalCount);
                continue;
            }

            List<Acceptor> onAcceptSuccess = new ArrayList<>();

            for (Acceptor acceptor : acceptors) {
                Promise acceptorResult = acceptor.onAccept(proposal);
                if (null != acceptorResult) {
                    if (acceptorResult.isAcctped()) {
                        System.out.println(
                            "Thread Name" + Thread.currentThread().getName() + " onAccept success" + proposal);
                        onAcceptSuccess.add(acceptor);
                    }
                }
            }
            System.out.println(
                "Thread Name" + Thread.currentThread().getName() + " Size " + onAcceptSuccess.size() + " onAcceptSuccess " + onAcceptSuccess);
            if (onAcceptSuccess.size() < halfCount) {
                proposal = Util.nextProposal(proposal);
                continue;
            } else {
          //表示超过一半的决策者接受本议题,那么就退出
// proposal = onAcceptSuccess.get(0).getAcceptedProposal(); break; } } System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + name + " " + proposal.getSubject() + " has accepted "); latch.countDown(); }

 

产生新的议题:

private void generatorNextProposal(HashMap<Proposal, Integer> ProposalCount){
        if(ProposalCount.isEmpty()){
            //将序列增加1
            proposal.setSerialId(proposal.getSerialId()+1);
        }else{
            System.out.println(
                "Thread Name" + Thread.currentThread().getName() + "  generator from accepted proposal ");
            List<Proposal> proposals=new ArrayList<>();
            for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) {
                if(!proposals.contains(entry.getKey())) {
                    proposals.add(entry.getKey());
                }
            }
            Collections.sort(proposals);
            Proposal maxVote = proposals.get(proposals.size() - 1);
            int serialId = maxVote.getSerialId();
            String name = maxVote.getName();
            String subject = maxVote.getSubject();
            proposal.setSerialId(serialId+1);
            proposal.setName(name);
            proposal.setSubject(subject);
        }
    }

决策者的流程如下:

    public synchronized Promise onPrepare(Proposal proposal) {
        System.out.println(
            "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name
                + "   Para proposal " + proposal + "     lastPrePare proposal " + lastPrePare);
        //假设这个过程有50%的几率失败
        if (Math.random() - 0.5 > 0) {
            Util.printInfo(
                "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name,
                "PREPARE", "NO RESPONSE");
            return null;
        }
        if (proposal == null)
            throw new IllegalArgumentException("null proposal");

        sleepRandom();
        if (!isAccepted) {
            //当前没有确认
            if (proposal.getSerialId() > lastPrePare.getSerialId()) {
                //本次提交的议题的serialID大于保存中的serialID,那么就替换掉
                Promise response = new Promise(true, ProPosalStatus.PREPARE, proposal);
                lastPrePare = proposal;
                Util.printInfo(
                    "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name,
                    "PREPARE", "OK");
                System.out.println(
                    "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name
                        + "     current proposal " + lastPrePare);
                return response;
            } else {
                //返回保存的最大号的议题
                Util.printInfo(
                    "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name,
                    "PREPARE", "REJECTED");
                return new Promise(false, ProPosalStatus.PREPARE, lastPrePare);
            }
        } else {
            //已经确认某一个提议,
            if (acceptedProposal.getName().equals(proposal.getName())) {
                //表示是同一个议题的不同serialID的提交
                if (acceptedProposal.getSerialId() < proposal.getSerialId()) {
                    //表示已经确认的提案的提交人已经有的更新,那么就去除已经确认,重新设置状态为PrePare
                    isAccepted = false;
                    lastPrePare = proposal;
                    acceptedProposal = null;
                    return new Promise(true, ProPosalStatus.PREPARE, proposal);
                } else {
                    return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal);
                }
            } else {
                //当前已经确认的提案与当前的提案不是来自同一个人
                //那么就将确认的提议返回,并且保存当前最大的proposal
                lastPrePare = proposal;
                return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal);
            }
        }
    }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM