分布式一致性算法(2)--Paxos java代碼的實現


java代碼的實現   https://github.com/wangjiuong/Distributed/tree/master/MyPaxosDemo

 

對於部分代碼實現 參考了文章 http://blog.csdn.net/21aspnet/article/details/50700123

 

Client的主要流程如下:

 @Override public void run() {
        int halfCount = ((int) acceptors.size() / 2) + 1;
        while (true) {
            round++;
            System.out.println(
                "Thread Name" + Thread.currentThread().getName() + " " + name + " round   "
                    + (round) + "   " + proposal);
            List<Acceptor> onPrepareSuccess = new ArrayList<Acceptor>();
            HashMap<Proposal, Integer> ProposalCount = new HashMap<>();
            for (Acceptor acceptor : acceptors) {
                Promise prepareResult = acceptor.onPrepare(proposal);
                if (prepareResult != null) {
                    if (prepareResult.isAcctped()) {
                        System.out.println(
                            "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + "   accepted by " + acceptor);
                        //決策者已經接受該提議
                        onPrepareSuccess.add(acceptor);
                    } else {
                        System.out.println(
                            "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + "   refused by " + acceptor);
                        //決策者拒絕了該提議,
                        if (prepareResult.getStatus() == ProPosalStatus.ACCESPTED) {
                            //表示該節點已經確認了某一個提案,將其保存下來
                            System.out.println(
                                "Thread Name" + Thread.currentThread().getName() + " find one accepted Proposal " + prepareResult+" and  save it ");
                            Proposal acceptedAcceptorProposal = prepareResult.getProposal();
/*                            if (proposal.getSerialId() <= acceptedAcceptorProposal.getSerialId()) {
                                System.out.println(
                                    "Thread Name" + Thread.currentThread().getName() + " current serial is less accepted proposal,update the serial ID ");
                                //表明當前正在提交的提案比已經確認的提案要小,那么將當前提案的序列增加1
                                proposal.setSerialId(acceptedAcceptorProposal.getSerialId() + 1);
                            }*/
                            int count = 1;
                            if (ProposalCount.containsKey(acceptedAcceptorProposal)) {
                                count = ProposalCount.get(acceptedAcceptorProposal) + 1;
                            }
                            ProposalCount.put(acceptedAcceptorProposal, count);
                        } /*else if (prepareResult.getProposal().getSerialId() >= proposal
                            .getSerialId()) {
                            //當前決策者的提案大於本client的提案
                            proposal.setSerialId(prepareResult.getProposal().getSerialId() + 1);
                            break;
                        }*/
                    }
                }
            }
            info();
            boolean existVote = false;
            boolean continuePrePare = true;
            if (onPrepareSuccess.size() < halfCount) {
                //在prePare階段沒有超過一半的投票
                proposal = Util.nextProposal(proposal);
                for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) {
                    if (entry.getValue() >= halfCount) {
                        //表明該提案已經超過一半人同意,得到后直接退出流程
                        proposal = entry.getKey();
                        existVote = true;
                        break;
                    }
                }
            } else {
                //在prePare階段有超過一半的投票
                continuePrePare = false;
            }
            if (existVote) {
                //已經找到當前達成一致的提案
                break;
            } else if (continuePrePare) {
                //繼續投票
                generatorNextProposal(ProposalCount);
                continue;
            }

            List<Acceptor> onAcceptSuccess = new ArrayList<>();

            for (Acceptor acceptor : acceptors) {
                Promise acceptorResult = acceptor.onAccept(proposal);
                if (null != acceptorResult) {
                    if (acceptorResult.isAcctped()) {
                        System.out.println(
                            "Thread Name" + Thread.currentThread().getName() + " onAccept success" + proposal);
                        onAcceptSuccess.add(acceptor);
                    }
                }
            }
            System.out.println(
                "Thread Name" + Thread.currentThread().getName() + " Size " + onAcceptSuccess.size() + " onAcceptSuccess " + onAcceptSuccess);
            if (onAcceptSuccess.size() < halfCount) {
                proposal = Util.nextProposal(proposal);
                continue;
            } else {
          //表示超過一半的決策者接受本議題,那么就退出
// proposal = onAcceptSuccess.get(0).getAcceptedProposal(); break; } } System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + name + " " + proposal.getSubject() + " has accepted "); latch.countDown(); }

 

產生新的議題:

private void generatorNextProposal(HashMap<Proposal, Integer> ProposalCount){
        if(ProposalCount.isEmpty()){
            //將序列增加1
            proposal.setSerialId(proposal.getSerialId()+1);
        }else{
            System.out.println(
                "Thread Name" + Thread.currentThread().getName() + "  generator from accepted proposal ");
            List<Proposal> proposals=new ArrayList<>();
            for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) {
                if(!proposals.contains(entry.getKey())) {
                    proposals.add(entry.getKey());
                }
            }
            Collections.sort(proposals);
            Proposal maxVote = proposals.get(proposals.size() - 1);
            int serialId = maxVote.getSerialId();
            String name = maxVote.getName();
            String subject = maxVote.getSubject();
            proposal.setSerialId(serialId+1);
            proposal.setName(name);
            proposal.setSubject(subject);
        }
    }

決策者的流程如下:

    public synchronized Promise onPrepare(Proposal proposal) {
        System.out.println(
            "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name
                + "   Para proposal " + proposal + "     lastPrePare proposal " + lastPrePare);
        //假設這個過程有50%的幾率失敗
        if (Math.random() - 0.5 > 0) {
            Util.printInfo(
                "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name,
                "PREPARE", "NO RESPONSE");
            return null;
        }
        if (proposal == null)
            throw new IllegalArgumentException("null proposal");

        sleepRandom();
        if (!isAccepted) {
            //當前沒有確認
            if (proposal.getSerialId() > lastPrePare.getSerialId()) {
                //本次提交的議題的serialID大於保存中的serialID,那么就替換掉
                Promise response = new Promise(true, ProPosalStatus.PREPARE, proposal);
                lastPrePare = proposal;
                Util.printInfo(
                    "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name,
                    "PREPARE", "OK");
                System.out.println(
                    "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name
                        + "     current proposal " + lastPrePare);
                return response;
            } else {
                //返回保存的最大號的議題
                Util.printInfo(
                    "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name,
                    "PREPARE", "REJECTED");
                return new Promise(false, ProPosalStatus.PREPARE, lastPrePare);
            }
        } else {
            //已經確認某一個提議,
            if (acceptedProposal.getName().equals(proposal.getName())) {
                //表示是同一個議題的不同serialID的提交
                if (acceptedProposal.getSerialId() < proposal.getSerialId()) {
                    //表示已經確認的提案的提交人已經有的更新,那么就去除已經確認,重新設置狀態為PrePare
                    isAccepted = false;
                    lastPrePare = proposal;
                    acceptedProposal = null;
                    return new Promise(true, ProPosalStatus.PREPARE, proposal);
                } else {
                    return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal);
                }
            } else {
                //當前已經確認的提案與當前的提案不是來自同一個人
                //那么就將確認的提議返回,並且保存當前最大的proposal
                lastPrePare = proposal;
                return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal);
            }
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM