Curator leader 選舉(一)


要想使用Leader選舉功能,需要添加recipes包,可以在maven中添加如下依賴:

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.9.0</version>
</dependency>

 

當然了,由於recipes需要使用framework,所以你肯定還要添加如下依賴:


<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.9.0</version>
</dependency>

 

 

最后,為了簡化測試也為了便於學習,可以添加test依賴:


<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.9.0</version>
</dependency>

 

LeaderLatch使用流程

  recipes包里面提供了Leader選舉實現,Spark中的master選舉使用的就是reciples包里面的LeaderLatch,使用他們可以極大的簡化代碼,使你將注意力更多的放在核心業務邏輯上。Leader選舉的實現在org.apache.curator.framework.recipes.leader包中,這個包提供了兩組Leader選舉:

  1.LeaderLatch,LeaderLatchListener

  2.LeaderSelector,LeaderSelectorListener,LeaderSelectorListenerAdapter

這兩組類都可以實現Leader選舉,spark 使用的是第一種。再這篇文章里,只介紹第一種。

 

第一組使用起來非常簡單,使用思路大致如下:假設你有3個節點,姑且叫做node0,node1,node2。你需要為每一個node創建一個CuratorFramework,LeaderLatch,LeaderLatchListener,如下:

 

node0:

  1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

  2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

 

node1:

  1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

  2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

 

node2:

  1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

  2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

 

你首先要創建CuratorFramework,然后並啟動它,一個CuratorFramework就是一個ZooKeeper客戶端。然后創建LeaderLatch,並制定剛才創建的CuratorFramework和一個leaderPath,leaderPath是一個ZooKeepe路徑,node0,node1,node2中的leaderPath必須一致。創建好LeaderLatch之后,需要為他注冊一個LeaderLatchListener回掉,如果某個node成為leader,那么會調用這個node的LeaderLatchListener的isLeader(),因此你可以在這里寫自己的業務邏輯。最后,調用LeaderLatch的start(),這個LeaderLatch將參加選舉了。

 

可以參考如下代碼:

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class LeaderDemo {
    public static void main(String[]args) throws Exception{
        List<LeaderLatch>leaders=new ArrayList<LeaderLatch>();
        List<CuratorFramework>clients=new ArrayList<CuratorFramework>();
        
        TestingServer server=new TestingServer();
        
        
        try{
            for(int i=0;i<10;i++){
              CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(20000,3));
              clients.add(client);
              
              LeaderLatch leader=new LeaderLatch(client,"/francis/leader");
              leader.addListener(new LeaderLatchListener(){

                @Override
                public void isLeader() {
                    // TODO Auto-generated method stub
                    System.out.println("I am Leader");
                }

                @Override
                public void notLeader() {
                    // TODO Auto-generated method stub
                    System.out.println("I am not Leader");
                }});
              
              
              leaders.add(leader);
        
              client.start();
              leader.start();
            }
            
            Thread.sleep(Integer.MAX_VALUE);
        }finally{
            
            for(CuratorFramework client:clients){
              CloseableUtils.closeQuietly(client);    
            }
            
            for(LeaderLatch leader:leaders){
                CloseableUtils.closeQuietly(leader);
            }
            
            CloseableUtils.closeQuietly(server);
        }
        
        
        Thread.sleep(Integer.MAX_VALUE);
    }

}

 

LeaderLatch和LeaderLatchListener方法介紹

LeaderLatch提供了如下方法:

 start()/close():啟動/停止LeaderLatch

 addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener

 hasLeadership():如果LeaderLatch是Leader,那么返回true,否則false。

 getLeader():

 await:等待Leaderlatch成為Leader。

 

LeaderLatchListener提供了如下方法:

  isLeader():當LeaderLatch的hasLeaderShip()從false到true后,就會調用isLeader(),表明這個LeaderLatch成為leader了。

  notLeader():當LeaderLatch的hahLeaderShip從true到false后,就會調用notLeader(),表明這個LeaderLatch不再是leader了。

 

LeaderLatch在Master-Slave中的應用

在一個典型的master-slave場景下。你可以在isLeader中做如下處理:

  1.每一個master類都有一個state屬性,初始值為standby.

  2.在isLeader中,從持久話引擎中讀取要恢復的數據到一個臨時的內存緩存中

  3.將這個master的state修改為recovering

  4.通知所有worker將其內部的master修改為當前master。

  5.將臨時內存緩存中的數據恢復到master內部。

  6.將master狀態修改為alive,然后這個master就可以對外服務了。

注意第5步,由於將持久話引擎中的數據添加到了master內部的內存中,所以需要確保之多恢復一次語義。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM