Zookeeper實現服務動態ip注冊和發現,並能在網絡波動后重連(重新注冊)


#encoding=utf8
#架構:后端服務 --> zookeeper -> 注冊模型服務
import socket
import time
from kazoo import security
from kazoo.client import KazooClient
from kazoo.client import EventType, WatchedEvent
from kazoo.exceptions import NoNodeError, SessionExpiredError, ConnectionLossException, NoAuthError
import random
from functools import wraps
 
 
class ExceptionHandler(object):
    def __init__(self):
        #KazooTimeoutError
        pass
 
    def __call__(self, func):
        @wraps(func)
        def wrapped_function(*args, **kwargs):
            is_success = False
            while not is_success:
                try:
                    is_success = func(*args, **kwargs)
                except (ConnectionLossException, SessionExpiredError) as e:
                    is_success = False
                except NoAuthError as e:
                    raise e
                except Exception as e:
                    is_success = False
                    time.sleep(1)
        return wrapped_function
 
class ZKModelWatcher(object):
 
    def __init__(self, hosts, acl_user, acl_pass, model_name, model_port):
        hosts_arr = hosts.split(',')
        self.acl = security.make_digest_acl(username=acl_user, password=acl_pass, read=True, create=True)
        self.auth_data = auth_data = [("digest", f"{acl_user}:{acl_pass}")]
        self._a_host = hosts_arr[0].split(':')[0]
        self._a_port = int(hosts_arr[0].split(':')[1])
        self._hosts = hosts #zookeeper集群連接地址
        self.model_name = model_name #注冊模型服務名稱,用於zookeeper根節點名稱
        self.model_port = model_port #注冊模型服務端口號
        self._zkc = KazooClient(hosts=self._hosts)
 
    def get_host_ip(self):
        '''
            用於獲取注冊模型服務的ip地址(能夠動態改變)
            后端服務與zookeeper在同一網段中,注冊模型服務的請求地址與獲取服務的網段一致
        '''
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect((self._a_host, self._a_port))
            ip = s.getsockname()[0]
        finally:
            s.close()
        return ip
 
    def create_model_node(self):
        #創建臨時、序列化節點。
        ip = self.get_host_ip()
        res = self._zkc.create(f'/{self.model_name}/model', bytes('%s:%s' % (ip, self.model_port), encoding='utf8')
                        ,makepath=True, sequence=True, ephemeral=True)
        return res
 
    def restart_zk(self):
        try:
            self._zkc.stop()
        except:
            pass
        self._zkc = KazooClient(hosts=self._hosts, default_acl=[self.acl], auth_data=self.auth_data)
        self._zkc.start()
 
    @ExceptionHandler()
    def register(self, data, stat, event):
        print(event)
        if event and event.type == EventType.NONE:
            self.restart_zk()
            res = self.create_model_node()
            self._zkc.DataWatch(path=f'{res}', func=self.register)
            if not self._zkc.exists(path=f'{res}'):
                return False
            print(res)
        elif event and event.type == EventType.DELETED:
            res = self.create_model_node()
            self._zkc.DataWatch(path=f'{res}', func=self.register)
            if not self._zkc.exists(path=f'{res}'):
                return False
            print(res)
        return True
 
    def get_model_host(self):
        #后端服務通過zookeeper獲取注冊模型服務的地址,如果注冊模型服務存在多個,則需要隨機選擇
        name = self.model_name
        is_success = False
        while not is_success:
            children = self._zkc.get_children(f'/{name}')
            if len(children) <= 0:
                raise Exception('沒有可以運行的服務')
            try:
                index = random.randint(0, len(children) - 1)
                host = self._zkc.get(f'/{name}/{children[index]}')
                is_success = True
                return host[0].decode()
            except NoNodeError as e:
                is_success = False
                # 選中的節點已經失效的情況
 
    @ExceptionHandler()
    def run(self):
        self.restart_zk()
        res = self.create_model_node()
        self._zkc.DataWatch(path=f'{res}', func=self.register)
        if not self._zkc.exists(path=f'{res}'):
            return False
        self.get_model_host()
        return True
 
    def close(self):
        try:
            self._zkc.stop()
            self._zkc.close()
        except Exception as e:
            print(str(e))
 
if __name__ == '__main__':
    acl_username = 'user'
    acl_password = 'pass'
    zw = ZKModelWatcher('127.0.0.1:2181,127.0.0.1:12181,127.0.0.1:22181', acl_username,  acl_password, 'model', 5000)
    zw.run()
    time.sleep(4000) #此處可以開啟注冊模型服務
    zw.close()

 

參考資料:

1、Zookeeper簡介 - https://blog.51cto.com/hmtk520/2105110

2、使用kazoo連接zookeeper並監聽節點數量以及值變化 - https://blog.csdn.net/pysense/article/details/100709138

3、python kazoo 監視zookeeper節點數據發生變化 - https://blog.csdn.net/KWSY2008/article/details/52042303?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control

4、關於Zookeeper中Session Expired和Watch - https://www.coder4.com/archives/3181

5、ZK session客戶端過期(Expired)過程 - https://blog.csdn.net/lovingprince/article/details/6885746

6、zookeeper Session Expired - https://blog.csdn.net/specialsun/article/details/84812575

7、zookeeper curator處理會話過期session expired - https://www.cnblogs.com/kangoroo/p/7538314.html

8、python獲取自身ip - https://www.cnblogs.com/hei-hei-hei/p/10489924.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM