Kazoo Python Zookeeper 選主


本文講述基於zookeeper選主與故障切換的方法。我們的例子使用的是python。

使用的庫是kazoo,安裝方式

pip install kazoo 

應用場景:

  • 多個實例部署,但不是“去中心化”的部署方式;
  • 有且只有一個節點作為master,履行master的職責,在例子中是注冊調度器;
  • 其他實例作為slave,不提供調度功能,但是在master節點掛掉之后,可以重新進行選主調度。

 

1、注冊調度器

我們只給出偽代碼,簡單的打印調度器注冊結果。

# -*- coding:utf-8 -*-


# 調度器注冊和關閉
# 模擬主節點的職責
class MyScheduler(object):

    # 注冊調度器
    def init_scheduler(self):
        print '########## 開啟調度器成功 ############'

    # 關閉調度器
    def stop_scheduler(self):
        print '########## 關閉調度器成功 ############'

 

2、選主與故障切換代碼

1)使用add_listener注冊監聽器,監聽zookeeper會話超時,Session Expired,否則在會話超時的場景中會出現鎖不一致的問題,可以參看這篇文章

  • 會話超時之后,我們要重新建立Session,在我們的例子里,會循環直到Session重新建立。
  • 重新注冊Watcher,因為Watcher會隨着Session失效而失效。在我們的例子里,通過執行get_children重新注冊了Watcher。

2)向zookeeper注冊自己,使用參數makepath=True級聯創建節點;使用參數ephemeral=True, sequence=True參數,也就是創建臨時有序節點:

  • 當實例掛掉,session斷開,注冊的節點會自行消失
  • 有序節點,會節點后添加一個有序編號

3)注冊Watcher,觀察/dmonitor/master的子節點,當發生以下事件時:

  • Created:新增子節點
  • Deleted:刪除子節點
  • Changed:子節點數據變化
  • Child:子節點的下一級節點

進行重新選主。

# -*- coding:utf-8 -*-

import socket
import traceback
from kazoo.client import KazooClient
from kazoo.client import KazooState
from MyScheduler import MyScheduler


class HAMaster(object):

    def __init__(self):
        self.path = '/dmonitor/master'
        self.scheduler = MyScheduler()
        self.zk = KazooClient('10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181', timeout=10)
        self.zk.start()
        self.zk.add_listener(self.my_listener)
        self.is_leader = False

    def create_instance(self):
        instance = self.path + '/' + socket.gethostbyname(socket.gethostname()) + '-'
        self.zk.create(path=instance, value="", ephemeral=True, sequence=True, makepath=True)

    # 選主邏輯: master節點下, 所有ephemeral+sequence類型的節點中, 編號最大的獲得領導權.
    def choose_master(self):
        print "########## 選主開始 ############"
        instance_list = self.zk.get_children(path=self.path, watch=self.my_watcher)
        instance = max(instance_list).split('-')[0]
        # 本實例獲得領導權
        if instance == socket.gethostbyname(socket.gethostname()):
            if not self.is_leader:
                self.scheduler.init_scheduler()
                self.is_leader = True
                print "######### 我被選為master, 我以前不是master, 注冊調度 ##########"
            else:
                print "######### 我被選為master, 我以前是master, 不再注冊調度 ##########"
        # 本實例沒有獲得領導權
        else:
            if self.is_leader:
                self.scheduler.stop_scheduler()
                self.is_leader = False
                print "######### 我被選為slave, 我以前不是slave, 關閉調度 ##########"
            else:
                print "######### 我被選為slave, 我以前是slave, 不再關閉調度 ##########"
        print "########## 選主完成 ############"

    def my_listener(self, state):
        if state == KazooState.LOST:
            print "########## 會話超時:KazooState.LOST ############"
            while True:
                try:
                    self.create_instance()
                    self.zk.get_children(path=self.path, watch=self.my_watcher)
                    print "########## 會話超時:重建會話完成! ############"
                    break
                except Exception, _:
                    traceback.print_exc()
        elif state == KazooState.SUSPENDED:
            print "########## 會話超時:KazooState.SUSPENDED ############"
        elif state == KazooState.CONNECTED:
            print "########## 會話超時:KazooState.CONNECTED ############"
        else:
            print "########## 會話超時:非法狀態 ############"

    def my_watcher(self, event):
        if event.state == "CONNECTED" and event.type == "CREATED" or event.type == "DELETED" or event.type == "CHANGED" or event.type == "CHILD":
            print "########## 監聽到子節點變化事件 ############"
            self.choose_master()
        else:
            print "########## 監聽到未識別的事件 ############"

 

 

3、測試一下

測試代碼如下

# -*- coding:utf-8 -*-
import time
from HAMaster import HAMaster

ha = HAMaster()
# 向zk注冊自己
ha.create_instance()
# 進行選主
ha.choose_master()

while 1:
    time.sleep(10)

運行這個腳本,模擬如下幾個場景:

1)初始選主

 順序在兩台機器上啟動測試腳本。

client10

[data_monitor@bigdata-arch-client10 zookeeper]$ python run.py 

client11

[data_monitor@bigdata-arch-client11 zookeeper]$ python run.py 

client10輸出

[data_monitor@bigdata-arch-client10 zookeeper]$ python run.py 
########## 選主開始 ############
########## 開啟調度器成功 ############
######### 我被選為master, 我以前不是master, 注冊調度 ##########
########## 選主完成 ############
########## 監聽到子節點變化事件 ############
########## 選主開始 ############
######### 我被選為master, 我以前是master, 不再注冊調度 ##########
########## 選主完成 ############

client11輸出

[data_monitor@bigdata-arch-client11 zookeeper]$ python run.py 
########## 選主開始 ############
######### 我被選為slave, 我以前是slave, 不再關閉調度 ##########
########## 選主完成 ############

可以看到,client10倍選為主節點並注冊調度器,client11作為slave節點。

 

2)主實例掛掉

我們把client10上的實例kill掉。

client10上不再輸出

client11上總體輸出如下

[data_monitor@bigdata-arch-client11 zookeeper]$ python run.py 
########## 選主開始 ############
######### 我被選為slave, 我以前是slave, 不再關閉調度 ##########
########## 選主完成 ############
########## 監聽到子節點變化事件 ############
########## 選主開始 ############
########## 開啟調度器成功 ############
######### 我被選為master, 我以前不是master, 注冊調度 ##########
########## 選主完成 ############

可以看到監聽到節點的變化,並進行了重新選主,client11被選為主節點(只剩他了)並注冊了調度器。

 

tips:觀察到節點變化的實效性是通過timeout=10參數控制的,也就是超過10s session不能維持就會認為實例掛了,zookeeper會刪除節點。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM