zookeeper適用場景:分布式鎖實現


問題導讀:
1.zookeeper如何實現分布式鎖?
2.什么是羊群效應?
3.zookeeper如何釋放鎖?






zookeeper應用場景有關於分布式集群配置文件同步問題的描述,設想一下如果有100台機器同時對同一台機器上某個文件進行修改,如何才能保證文本不會被寫亂,這就是最簡單的分布式鎖,本文介紹利用zk實現分布式鎖。下面是寫鎖的實現步驟

分布式寫鎖
create一個PERSISTENT類型的znode,/Locks/write_lock

  • 客戶端創建SEQUENCE|EPHEMERAL類型的znode,名字是lockid開頭,創建的znode是/Locks/write_lock/lockid0000000001
  • 調用getChildren()不要設置Watcher獲取/Locks/write_lock下的znode列表
  • 判斷自己步驟2創建znode是不是znode列表中最小的一個,如果是就代表獲得了鎖,如果不是往下走
  • 調用exists()判斷步驟2自己創建的節點編號小1的znode節點(也就是獲取的znode節點列表中最小的znode),並且設置Watcher,如果exists()返回false,執行步驟3
  • 如果exists()返回true,那么等待zk通知,從而在回掉函數里返回執行步驟3


釋放鎖就是刪除znode節點或者斷開連接就行

*注意:上面步驟2中getChildren()不設置Watcher的原因是,防止羊群效應,如果getChildren()設置了Watcher,那么集群一抖動都會收到通知。在整個分布式鎖的競爭過程中,大量重復運行,並且絕大多數的運行結果都是判斷出自己並非是序號最小的節點,從而繼續等待下一次通知—,這個顯然看起來不怎么科學。客戶端無端的接受到過多的和自己不相關的事件通知,這如果在集群規模大的時候,會對Server造成很大的性能影響,並且如果一旦同一時間有多個節點的客戶端斷開連接,這個時候,服務器就會像其余客戶端發送大量的事件通知——這就是所謂的羊群效應。
下面是代碼實現

import sys

class GJZookeeper(object):

    ZK_HOST = "localhost:2181"
    ROOT = "/Locks"
    WORKERS_PATH = join(ROOT,"write_lock")
    MASTERS_NUM = 1
    TIMEOUT = 10000

    def __init__(self, verbose = True):
        self.VERBOSE = verbose
        self.masters = []
        self.is_master = False
        self.path = None

        self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
        self.say("login ok!")
        # init
        self.__init_zk()
        # register
        self.register()

    def __init_zk(self):
        """
        create the zookeeper node if not exist
        |--Locks
                |--write_lock
        """
        nodes = (self.ROOT, self.WORKERS_PATH)
        for node in nodes: 
            if not self.zk.exists(node):
                try:
                    self.zk.create(node, "")
                except:
                    pass

    def register(self):
        """
        register a node for this worker
        |--Locks
                |--write_lock
                            |--lockid000000000x ==> hostname
        """

        import socket
        hostname = socket.gethostname()
        self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
        self.lockid = basename(self.path)
        self.say("register ok! I'm %s" % self.path)
        # check who is the master
        self.get_lock()

    def get_lock(self):
        """
        get children znode try to get lock
        """
        @watchmethod
        def watcher(event):
            self.say("child changed, try to get lock again.")
            self.get_lock()

        children = self.zk.get_children(self.WORKERS_PATH)
        children.sort()
        min_lock_id = children[0]
        self.say("%s's children: %s" % (self.WORKERS_PATH, children)) 
        if cmp(self.lockid,min_lock_id) == 0:
            self.get_lock_success()
            return True
        elif cmp(self.lockid,min_lock_id) > 0:
            index = children.index(self.lockid)
            new_lockid_watch = join(self.WORKERS_PATH,children[index-1])
            self.say("Add watch on %s"%new_lockid_watch)
            res = self.zk.exists(new_lockid_watch,watcher)
            if not res :
                """代表沒有存在之前小的鎖,但是這並不意味着能拿到鎖了,因為還可能有比當前還小的鎖,還沒輪到,要重新執行一遍"""
#               self.get_lock_success()
                return False
            else :
                """現在的鎖有人在使用,等他釋放了再搶"""
                self.say("I can not get the lock this time,wait for the next time")
                return False

    def get_lock_success(self):
        self.say("I get the lock !!!")
        self.write_file()
        self.zk.delete(join(self.WORKERS_PATH,self.lockid))
        self.say("I release the lock !!!")
        sys.exit(1)

    def write_file(self):
        fd = open("lock.log",'a')
        fd.write("%s\n"%self.lockid)
        fd.close()

    def say(self, msg):
        """
        print messages to screen
        """
        if self.VERBOSE:
            if self.path:
                log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
            else:
                log.info(msg)

def start_get_lock():
    gj_zookeeper = GJZookeeper()

def main():
    th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())
    th1.start()
    th1.join()
    
if __name__ == "__main__":
    main()
    time.sleep(1000)

  文章轉自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9267&ctid=16

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM