問題導讀:
1.zookeeper如何實現分布式鎖?
2.什么是羊群效應?
3.zookeeper如何釋放鎖?![]()
在zookeeper應用場景有關於分布式集群配置文件同步問題的描述,設想一下如果有100台機器同時對同一台機器上某個文件進行修改,如何才能保證文本不會被寫亂,這就是最簡單的分布式鎖,本文介紹利用zk實現分布式鎖。下面是寫鎖的實現步驟
分布式寫鎖
create一個PERSISTENT類型的znode,/Locks/write_lock
- 客戶端創建SEQUENCE|EPHEMERAL類型的znode,名字是lockid開頭,創建的znode是/Locks/write_lock/lockid0000000001
- 調用getChildren()不要設置Watcher獲取/Locks/write_lock下的znode列表
- 判斷自己步驟2創建znode是不是znode列表中最小的一個,如果是就代表獲得了鎖,如果不是往下走
- 調用exists()判斷步驟2自己創建的節點編號小1的znode節點(也就是獲取的znode節點列表中最小的znode),並且設置Watcher,如果exists()返回false,執行步驟3
- 如果exists()返回true,那么等待zk通知,從而在回掉函數里返回執行步驟3
釋放鎖就是刪除znode節點或者斷開連接就行
*注意:上面步驟2中getChildren()不設置Watcher的原因是,防止羊群效應,如果getChildren()設置了Watcher,那么集群一抖動都會收到通知。在整個分布式鎖的競爭過程中,大量重復運行,並且絕大多數的運行結果都是判斷出自己並非是序號最小的節點,從而繼續等待下一次通知—,這個顯然看起來不怎么科學。客戶端無端的接受到過多的和自己不相關的事件通知,這如果在集群規模大的時候,會對Server造成很大的性能影響,並且如果一旦同一時間有多個節點的客戶端斷開連接,這個時候,服務器就會像其余客戶端發送大量的事件通知——這就是所謂的羊群效應。
下面是代碼實現
import sys
class GJZookeeper(object):
ZK_HOST = "localhost:2181"
ROOT = "/Locks"
WORKERS_PATH = join(ROOT,"write_lock")
MASTERS_NUM = 1
TIMEOUT = 10000
def __init__(self, verbose = True):
self.VERBOSE = verbose
self.masters = []
self.is_master = False
self.path = None
self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
self.say("login ok!")
# init
self.__init_zk()
# register
self.register()
def __init_zk(self):
"""
create the zookeeper node if not exist
|--Locks
|--write_lock
"""
nodes = (self.ROOT, self.WORKERS_PATH)
for node in nodes:
if not self.zk.exists(node):
try:
self.zk.create(node, "")
except:
pass
def register(self):
"""
register a node for this worker
|--Locks
|--write_lock
|--lockid000000000x ==> hostname
"""
import socket
hostname = socket.gethostname()
self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
self.lockid = basename(self.path)
self.say("register ok! I'm %s" % self.path)
# check who is the master
self.get_lock()
def get_lock(self):
"""
get children znode try to get lock
"""
@watchmethod
def watcher(event):
self.say("child changed, try to get lock again.")
self.get_lock()
children = self.zk.get_children(self.WORKERS_PATH)
children.sort()
min_lock_id = children[0]
self.say("%s's children: %s" % (self.WORKERS_PATH, children))
if cmp(self.lockid,min_lock_id) == 0:
self.get_lock_success()
return True
elif cmp(self.lockid,min_lock_id) > 0:
index = children.index(self.lockid)
new_lockid_watch = join(self.WORKERS_PATH,children[index-1])
self.say("Add watch on %s"%new_lockid_watch)
res = self.zk.exists(new_lockid_watch,watcher)
if not res :
"""代表沒有存在之前小的鎖,但是這並不意味着能拿到鎖了,因為還可能有比當前還小的鎖,還沒輪到,要重新執行一遍"""
# self.get_lock_success()
return False
else :
"""現在的鎖有人在使用,等他釋放了再搶"""
self.say("I can not get the lock this time,wait for the next time")
return False
def get_lock_success(self):
self.say("I get the lock !!!")
self.write_file()
self.zk.delete(join(self.WORKERS_PATH,self.lockid))
self.say("I release the lock !!!")
sys.exit(1)
def write_file(self):
fd = open("lock.log",'a')
fd.write("%s\n"%self.lockid)
fd.close()
def say(self, msg):
"""
print messages to screen
"""
if self.VERBOSE:
if self.path:
log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
else:
log.info(msg)
def start_get_lock():
gj_zookeeper = GJZookeeper()
def main():
th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())
th1.start()
th1.join()
if __name__ == "__main__":
main()
time.sleep(1000)
文章轉自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9267&ctid=16
