#encoding=utf8 #架構:后端服務 --> zookeeper -> 注冊模型服務 import socket import time from kazoo import security from kazoo.client import KazooClient from kazoo.client import EventType, WatchedEvent from kazoo.exceptions import NoNodeError, SessionExpiredError, ConnectionLossException, NoAuthError import random from functools import wraps class ExceptionHandler(object): def __init__(self): #KazooTimeoutError pass def __call__(self, func): @wraps(func) def wrapped_function(*args, **kwargs): is_success = False while not is_success: try: is_success = func(*args, **kwargs) except (ConnectionLossException, SessionExpiredError) as e: is_success = False except NoAuthError as e: raise e except Exception as e: is_success = False time.sleep(1) return wrapped_function class ZKModelWatcher(object): def __init__(self, hosts, acl_user, acl_pass, model_name, model_port): hosts_arr = hosts.split(',') self.acl = security.make_digest_acl(username=acl_user, password=acl_pass, read=True, create=True) self.auth_data = auth_data = [("digest", f"{acl_user}:{acl_pass}")] self._a_host = hosts_arr[0].split(':')[0] self._a_port = int(hosts_arr[0].split(':')[1]) self._hosts = hosts #zookeeper集群連接地址 self.model_name = model_name #注冊模型服務名稱,用於zookeeper根節點名稱 self.model_port = model_port #注冊模型服務端口號 self._zkc = KazooClient(hosts=self._hosts) def get_host_ip(self): ''' 用於獲取注冊模型服務的ip地址(能夠動態改變) 后端服務與zookeeper在同一網段中,注冊模型服務的請求地址與獲取服務的網段一致 ''' try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((self._a_host, self._a_port)) ip = s.getsockname()[0] finally: s.close() return ip def create_model_node(self): #創建臨時、序列化節點。 ip = self.get_host_ip() res = self._zkc.create(f'/{self.model_name}/model', bytes('%s:%s' % (ip, self.model_port), encoding='utf8') ,makepath=True, sequence=True, ephemeral=True) return res def restart_zk(self): try: self._zkc.stop() except: pass self._zkc = KazooClient(hosts=self._hosts, default_acl=[self.acl], auth_data=self.auth_data) self._zkc.start() @ExceptionHandler() def register(self, data, stat, event): print(event) if event and event.type == EventType.NONE: self.restart_zk() res = self.create_model_node() self._zkc.DataWatch(path=f'{res}', func=self.register) if not self._zkc.exists(path=f'{res}'): return False print(res) elif event and event.type == EventType.DELETED: res = self.create_model_node() self._zkc.DataWatch(path=f'{res}', func=self.register) if not self._zkc.exists(path=f'{res}'): return False print(res) return True def get_model_host(self): #后端服務通過zookeeper獲取注冊模型服務的地址,如果注冊模型服務存在多個,則需要隨機選擇 name = self.model_name is_success = False while not is_success: children = self._zkc.get_children(f'/{name}') if len(children) <= 0: raise Exception('沒有可以運行的服務') try: index = random.randint(0, len(children) - 1) host = self._zkc.get(f'/{name}/{children[index]}') is_success = True return host[0].decode() except NoNodeError as e: is_success = False # 選中的節點已經失效的情況 @ExceptionHandler() def run(self): self.restart_zk() res = self.create_model_node() self._zkc.DataWatch(path=f'{res}', func=self.register) if not self._zkc.exists(path=f'{res}'): return False self.get_model_host() return True def close(self): try: self._zkc.stop() self._zkc.close() except Exception as e: print(str(e)) if __name__ == '__main__': acl_username = 'user' acl_password = 'pass' zw = ZKModelWatcher('127.0.0.1:2181,127.0.0.1:12181,127.0.0.1:22181', acl_username, acl_password, 'model', 5000) zw.run() time.sleep(4000) #此處可以開啟注冊模型服務 zw.close()
參考資料:
1、Zookeeper簡介 - https://blog.51cto.com/hmtk520/2105110
2、使用kazoo連接zookeeper並監聽節點數量以及值變化 - https://blog.csdn.net/pysense/article/details/100709138
3、python kazoo 監視zookeeper節點數據發生變化 - https://blog.csdn.net/KWSY2008/article/details/52042303?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control
4、關於Zookeeper中Session Expired和Watch - https://www.coder4.com/archives/3181
5、ZK session客戶端過期(Expired)過程 - https://blog.csdn.net/lovingprince/article/details/6885746
6、zookeeper Session Expired - https://blog.csdn.net/specialsun/article/details/84812575
7、zookeeper curator處理會話過期session expired - https://www.cnblogs.com/kangoroo/p/7538314.html
8、python獲取自身ip - https://www.cnblogs.com/hei-hei-hei/p/10489924.html