java代码的实现 https://github.com/wangjiuong/Distributed/tree/master/MyPaxosDemo
对于部分代码实现 参考了文章 http://blog.csdn.net/21aspnet/article/details/50700123
Client的主要流程如下:
@Override public void run() { int halfCount = ((int) acceptors.size() / 2) + 1; while (true) { round++; System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + name + " round " + (round) + " " + proposal); List<Acceptor> onPrepareSuccess = new ArrayList<Acceptor>(); HashMap<Proposal, Integer> ProposalCount = new HashMap<>(); for (Acceptor acceptor : acceptors) { Promise prepareResult = acceptor.onPrepare(proposal); if (prepareResult != null) { if (prepareResult.isAcctped()) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + " accepted by " + acceptor); //决策者已经接受该提议 onPrepareSuccess.add(acceptor); } else { System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + prepareResult + " refused by " + acceptor); //决策者拒绝了该提议, if (prepareResult.getStatus() == ProPosalStatus.ACCESPTED) { //表示该节点已经确认了某一个提案,将其保存下来 System.out.println( "Thread Name" + Thread.currentThread().getName() + " find one accepted Proposal " + prepareResult+" and save it "); Proposal acceptedAcceptorProposal = prepareResult.getProposal(); /* if (proposal.getSerialId() <= acceptedAcceptorProposal.getSerialId()) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " current serial is less accepted proposal,update the serial ID "); //表明当前正在提交的提案比已经确认的提案要小,那么将当前提案的序列增加1 proposal.setSerialId(acceptedAcceptorProposal.getSerialId() + 1); }*/ int count = 1; if (ProposalCount.containsKey(acceptedAcceptorProposal)) { count = ProposalCount.get(acceptedAcceptorProposal) + 1; } ProposalCount.put(acceptedAcceptorProposal, count); } /*else if (prepareResult.getProposal().getSerialId() >= proposal .getSerialId()) { //当前决策者的提案大于本client的提案 proposal.setSerialId(prepareResult.getProposal().getSerialId() + 1); break; }*/ } } } info(); boolean existVote = false; boolean continuePrePare = true; if (onPrepareSuccess.size() < halfCount) { //在prePare阶段没有超过一半的投票 proposal = Util.nextProposal(proposal); for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) { if (entry.getValue() >= halfCount) { //表明该提案已经超过一半人同意,得到后直接退出流程 proposal = entry.getKey(); existVote = true; break; } } } else { //在prePare阶段有超过一半的投票 continuePrePare = false; } if (existVote) { //已经找到当前达成一致的提案 break; } else if (continuePrePare) { //继续投票 generatorNextProposal(ProposalCount); continue; } List<Acceptor> onAcceptSuccess = new ArrayList<>(); for (Acceptor acceptor : acceptors) { Promise acceptorResult = acceptor.onAccept(proposal); if (null != acceptorResult) { if (acceptorResult.isAcctped()) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " onAccept success" + proposal); onAcceptSuccess.add(acceptor); } } } System.out.println( "Thread Name" + Thread.currentThread().getName() + " Size " + onAcceptSuccess.size() + " onAcceptSuccess " + onAcceptSuccess); if (onAcceptSuccess.size() < halfCount) { proposal = Util.nextProposal(proposal); continue; } else {
//表示超过一半的决策者接受本议题,那么就退出 // proposal = onAcceptSuccess.get(0).getAcceptedProposal(); break; } } System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + name + " " + proposal.getSubject() + " has accepted "); latch.countDown(); }
产生新的议题:
private void generatorNextProposal(HashMap<Proposal, Integer> ProposalCount){ if(ProposalCount.isEmpty()){ //将序列增加1 proposal.setSerialId(proposal.getSerialId()+1); }else{ System.out.println( "Thread Name" + Thread.currentThread().getName() + " generator from accepted proposal "); List<Proposal> proposals=new ArrayList<>(); for (Map.Entry<Proposal, Integer> entry : ProposalCount.entrySet()) { if(!proposals.contains(entry.getKey())) { proposals.add(entry.getKey()); } } Collections.sort(proposals); Proposal maxVote = proposals.get(proposals.size() - 1); int serialId = maxVote.getSerialId(); String name = maxVote.getName(); String subject = maxVote.getSubject(); proposal.setSerialId(serialId+1); proposal.setName(name); proposal.setSubject(subject); } }
决策者的流程如下:
public synchronized Promise onPrepare(Proposal proposal) { System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name + " Para proposal " + proposal + " lastPrePare proposal " + lastPrePare); //假设这个过程有50%的几率失败 if (Math.random() - 0.5 > 0) { Util.printInfo( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name, "PREPARE", "NO RESPONSE"); return null; } if (proposal == null) throw new IllegalArgumentException("null proposal"); sleepRandom(); if (!isAccepted) { //当前没有确认 if (proposal.getSerialId() > lastPrePare.getSerialId()) { //本次提交的议题的serialID大于保存中的serialID,那么就替换掉 Promise response = new Promise(true, ProPosalStatus.PREPARE, proposal); lastPrePare = proposal; Util.printInfo( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name, "PREPARE", "OK"); System.out.println( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name + " current proposal " + lastPrePare); return response; } else { //返回保存的最大号的议题 Util.printInfo( "Thread Name" + Thread.currentThread().getName() + " " + "ACCEPTER_" + name, "PREPARE", "REJECTED"); return new Promise(false, ProPosalStatus.PREPARE, lastPrePare); } } else { //已经确认某一个提议, if (acceptedProposal.getName().equals(proposal.getName())) { //表示是同一个议题的不同serialID的提交 if (acceptedProposal.getSerialId() < proposal.getSerialId()) { //表示已经确认的提案的提交人已经有的更新,那么就去除已经确认,重新设置状态为PrePare isAccepted = false; lastPrePare = proposal; acceptedProposal = null; return new Promise(true, ProPosalStatus.PREPARE, proposal); } else { return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal); } } else { //当前已经确认的提案与当前的提案不是来自同一个人 //那么就将确认的提议返回,并且保存当前最大的proposal lastPrePare = proposal; return new Promise(false, ProPosalStatus.ACCESPTED, acceptedProposal); } } }