java代碼的實現 https://github.com/wangjiuong/Distributed/tree/master/MyPaxosDemo
對於部分代碼實現 參考了文章 http://blog.csdn.net/21aspnet/article/details/50700123
Client的主要流程如下:
@Override public void run() { int halfCount = ((int) acceptors.size() / 2) + 1; while (true) { round++; System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + name + " round " + (round) + " " + proposal); List<Acceptor> onPrepareSuccess = new ArrayList<Acceptor>(); HashMap<Proposal, Integer> ProposalCount = new HashMap<>(); for (Acceptor acceptor : acceptors) { Promise prepareResult = acceptor.onPrepare(proposal); if (prepareResult != null) { if (prepareResult.isAcctped()) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + " accepted by " + acceptor); //決策者已經接受該提議 onPrepareSuccess.add(acceptor); } else { System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + " refused by " + acceptor); //決策者拒絕了該提議, if (prepareResult.getStatus() == ProPosalStatus.ACCESPTED) { //表示該節點已經確認了某一個提案,將其保存下來 System.out.println( "Thread Name" + Thread.currentThread().getName() + " find one accepted Proposal " + prepareResult+" and save it "); Proposal acceptedAcceptorProposal = prepareResult.getProposal(); /* if (proposal.getSerialId() <= acceptedAcceptorProposal.getSerialId()) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " current serial is less accepted proposal,update the serial ID "); //表明當前正在提交的提案比已經確認的提案要小,那么將當前提案的序列增加1 proposal.setSerialId(acceptedAcceptorProposal.getSerialId() + 1); }*/ int count = 1; if (ProposalCount.containsKey(acceptedAcceptorProposal)) { count = ProposalCount.get(acceptedAcceptorProposal) + 1; } ProposalCount.put(acceptedAcceptorProposal, count); } /*else if (prepareResult.getProposal().getSerialId() >= proposal .getSerialId()) { //當前決策者的提案大於本client的提案 proposal.setSerialId(prepareResult.getProposal().getSerialId() + 1); break; }*/ } } } info(); boolean existVote = false; boolean continuePrePare = true; if (onPrepareSuccess.size() < halfCount) { //在prePare階段沒有超過一半的投票 proposal = Util.nextProposal(proposal); for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) { if (entry.getValue() >= halfCount) { //表明該提案已經超過一半人同意,得到后直接退出流程 proposal = entry.getKey(); existVote = true; break; } } } else { //在prePare階段有超過一半的投票 continuePrePare = false; } if (existVote) { //已經找到當前達成一致的提案 break; } else if (continuePrePare) { //繼續投票 generatorNextProposal(ProposalCount); continue; } List<Acceptor> onAcceptSuccess = new ArrayList<>(); for (Acceptor acceptor : acceptors) { Promise acceptorResult = acceptor.onAccept(proposal); if (null != acceptorResult) { if (acceptorResult.isAcctped()) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " onAccept success" + proposal); onAcceptSuccess.add(acceptor); } } } System.out.println( "Thread Name" + Thread.currentThread().getName() + " Size " + onAcceptSuccess.size() + " onAcceptSuccess " + onAcceptSuccess); if (onAcceptSuccess.size() < halfCount) { proposal = Util.nextProposal(proposal); continue; } else {
//表示超過一半的決策者接受本議題,那么就退出 // proposal = onAcceptSuccess.get(0).getAcceptedProposal(); break; } } System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + name + " " + proposal.getSubject() + " has accepted "); latch.countDown(); }
產生新的議題:
private void generatorNextProposal(HashMap<Proposal, Integer> ProposalCount){ if(ProposalCount.isEmpty()){ //將序列增加1 proposal.setSerialId(proposal.getSerialId()+1); }else{ System.out.println( "Thread Name" + Thread.currentThread().getName() + " generator from accepted proposal "); List<Proposal> proposals=new ArrayList<>(); for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) { if(!proposals.contains(entry.getKey())) { proposals.add(entry.getKey()); } } Collections.sort(proposals); Proposal maxVote = proposals.get(proposals.size() - 1); int serialId = maxVote.getSerialId(); String name = maxVote.getName(); String subject = maxVote.getSubject(); proposal.setSerialId(serialId+1); proposal.setName(name); proposal.setSubject(subject); } }
決策者的流程如下:
public synchronized Promise onPrepare(Proposal proposal) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name + " Para proposal " + proposal + " lastPrePare proposal " + lastPrePare); //假設這個過程有50%的幾率失敗 if (Math.random() - 0.5 > 0) { Util.printInfo( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name, "PREPARE", "NO RESPONSE"); return null; } if (proposal == null) throw new IllegalArgumentException("null proposal"); sleepRandom(); if (!isAccepted) { //當前沒有確認 if (proposal.getSerialId() > lastPrePare.getSerialId()) { //本次提交的議題的serialID大於保存中的serialID,那么就替換掉 Promise response = new Promise(true, ProPosalStatus.PREPARE, proposal); lastPrePare = proposal; Util.printInfo( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name, "PREPARE", "OK"); System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name + " current proposal " + lastPrePare); return response; } else { //返回保存的最大號的議題 Util.printInfo( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name, "PREPARE", "REJECTED"); return new Promise(false, ProPosalStatus.PREPARE, lastPrePare); } } else { //已經確認某一個提議, if (acceptedProposal.getName().equals(proposal.getName())) { //表示是同一個議題的不同serialID的提交 if (acceptedProposal.getSerialId() < proposal.getSerialId()) { //表示已經確認的提案的提交人已經有的更新,那么就去除已經確認,重新設置狀態為PrePare isAccepted = false; lastPrePare = proposal; acceptedProposal = null; return new Promise(true, ProPosalStatus.PREPARE, proposal); } else { return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal); } } else { //當前已經確認的提案與當前的提案不是來自同一個人 //那么就將確認的提議返回,並且保存當前最大的proposal lastPrePare = proposal; return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal); } } }