問題導讀:
1.zookeeper如何實現分布式鎖?
2.什么是羊群效應?
3.zookeeper如何釋放鎖?
在zookeeper應用場景有關於分布式集群配置文件同步問題的描述,設想一下如果有100台機器同時對同一台機器上某個文件進行修改,如何才能保證文本不會被寫亂,這就是最簡單的分布式鎖,本文介紹利用zk實現分布式鎖。下面是寫鎖的實現步驟
分布式寫鎖
create一個PERSISTENT類型的znode,/Locks/write_lock
- 客戶端創建SEQUENCE|EPHEMERAL類型的znode,名字是lockid開頭,創建的znode是/Locks/write_lock/lockid0000000001
- 調用getChildren()不要設置Watcher獲取/Locks/write_lock下的znode列表
- 判斷自己步驟2創建znode是不是znode列表中最小的一個,如果是就代表獲得了鎖,如果不是往下走
- 調用exists()判斷步驟2自己創建的節點編號小1的znode節點(也就是獲取的znode節點列表中最小的znode),並且設置Watcher,如果exists()返回false,執行步驟3
- 如果exists()返回true,那么等待zk通知,從而在回掉函數里返回執行步驟3
釋放鎖就是刪除znode節點或者斷開連接就行
*注意:上面步驟2中getChildren()不設置Watcher的原因是,防止羊群效應,如果getChildren()設置了Watcher,那么集群一抖動都會收到通知。在整個分布式鎖的競爭過程中,大量重復運行,並且絕大多數的運行結果都是判斷出自己並非是序號最小的節點,從而繼續等待下一次通知—,這個顯然看起來不怎么科學。客戶端無端的接受到過多的和自己不相關的事件通知,這如果在集群規模大的時候,會對Server造成很大的性能影響,並且如果一旦同一時間有多個節點的客戶端斷開連接,這個時候,服務器就會像其余客戶端發送大量的事件通知——這就是所謂的羊群效應。
下面是代碼實現
import sys class GJZookeeper(object): ZK_HOST = "localhost:2181" ROOT = "/Locks" WORKERS_PATH = join(ROOT,"write_lock") MASTERS_NUM = 1 TIMEOUT = 10000 def __init__(self, verbose = True): self.VERBOSE = verbose self.masters = [] self.is_master = False self.path = None self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT) self.say("login ok!") # init self.__init_zk() # register self.register() def __init_zk(self): """ create the zookeeper node if not exist |--Locks |--write_lock """ nodes = (self.ROOT, self.WORKERS_PATH) for node in nodes: if not self.zk.exists(node): try: self.zk.create(node, "") except: pass def register(self): """ register a node for this worker |--Locks |--write_lock |--lockid000000000x ==> hostname """ import socket hostname = socket.gethostname() self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE) self.lockid = basename(self.path) self.say("register ok! I'm %s" % self.path) # check who is the master self.get_lock() def get_lock(self): """ get children znode try to get lock """ @watchmethod def watcher(event): self.say("child changed, try to get lock again.") self.get_lock() children = self.zk.get_children(self.WORKERS_PATH) children.sort() min_lock_id = children[0] self.say("%s's children: %s" % (self.WORKERS_PATH, children)) if cmp(self.lockid,min_lock_id) == 0: self.get_lock_success() return True elif cmp(self.lockid,min_lock_id) > 0: index = children.index(self.lockid) new_lockid_watch = join(self.WORKERS_PATH,children[index-1]) self.say("Add watch on %s"%new_lockid_watch) res = self.zk.exists(new_lockid_watch,watcher) if not res : """代表沒有存在之前小的鎖,但是這並不意味着能拿到鎖了,因為還可能有比當前還小的鎖,還沒輪到,要重新執行一遍""" # self.get_lock_success() return False else : """現在的鎖有人在使用,等他釋放了再搶""" self.say("I can not get the lock this time,wait for the next time") return False def get_lock_success(self): self.say("I get the lock !!!") self.write_file() self.zk.delete(join(self.WORKERS_PATH,self.lockid)) self.say("I release the lock !!!") sys.exit(1) def write_file(self): fd = open("lock.log",'a') fd.write("%s\n"%self.lockid) fd.close() def say(self, msg): """ print messages to screen """ if self.VERBOSE: if self.path: log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg)) else: log.info(msg) def start_get_lock(): gj_zookeeper = GJZookeeper() def main(): th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ()) th1.start() th1.join() if __name__ == "__main__": main() time.sleep(1000)
文章轉自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9267&ctid=16