zookeeper client API實現(python kazoo 的實現)


這里主要分析zookeeper client API的實現方式,以python kazoo的實現代碼為藍本進行邏輯分析.

一.代碼框架及介紹

  API分為同步模式和異步模式.同步模式是在異步模式的基礎上通過一些等待,循環等方式進行實現的.

  主要實現邏輯如下:

     基本模式就是建立兩個線程,一個線程負責發送請求和接收響應.一個負責根據響應執行對應注冊的watcher.

     大部分語言的實現都是同步模式通過異步模式實現的.在不同的語言里具體有差異.

  kazoo的框架實現在client,connection,threading,serlallzation這幾個主要的類包中.

  client在kazoo項目的根目錄,connection,serlallzation在項目的protocol目錄,threading在handler目錄.

  client是主要類.所有zookeeper 接口均在這里實現.

  connection是核心實現,所有請求和返回的邏輯處理均在這里處理.

  serlallzation是請求封裝的實現.將所有請求封裝成二進制數據在這里實現.

  threading是線程實現的核心.建立線程,以及啟動線程和處理線程均在這里實現.

  https://github.com/python-zk/kazoo/blob/master/kazoo/client.py詳細代碼可以看這里.

二.詳細邏輯的實現.

  1.client的實現

    client是將底層邏輯連接和封裝起來的地方,存儲公共數據的地方.

    client的代碼實現基本是構造各種基礎變量,對象,隊列,記錄當前socket狀態,thread狀態.重置各種狀態.入口方法,和啟動,停止等功能.

    簡單分析下client下create方法.create方法基本就是檢查參數,然后將參數傳給create_async(對應的異步方法)方法,create_async將參數通過serlallzation包里的create類封裝成request對象.並將這個對象和新的async_object對象傳遞給入口函數_call._call函數將request對象和async_object對象放到_queue的隊列里.之后這個隊列由connection里的實現去發送.其他所有zookeeper api的方法都是類是create方法.都是使用同名的async方法通過調用_call來實現.

    connection的對象,threading對象都是在client __init__里初始化的.啟動線程是在client的start函數里調用threading.start和connection.start實現.

  2.connection的實現

    這里實現了整個client核心通信的部分.connection通過接收client對象參數,來使用各種client中初始化的公共數據和對象.在connection里核心函數是_connect_attempt函數.這個函數實現了數據交換和通信的核心部分.

    _connect_attempt函數主要部分如下.

            read_timeout, connect_timeout = self._connect(host, port)  #這里是從連接開始的部分貼的,並未將完整的函數貼上.需要完整的請看github
            read_timeout = read_timeout / 1000.0                       #_connect函數做了主要的握手鏈接的動作,以及一些錯誤處理.
            connect_timeout = connect_timeout / 1000.0
            retry.reset()
            self._xid = 0
            self.ping_outstanding.clear()
            with self._socket_error_handling():
                while not close_connection:                     #這里開始檢查是否有主動stop的行為.在connection.stop函數里將這個標志置為真了.
                    # Watch for something to read or send
                    jitter_time = random.randint(0, 40) / 100.0
                    # Ensure our timeout is positive
                    timeout = max([read_timeout / 2.0 - jitter_time,
                                   jitter_time])
                    s = self.handler.select([self._socket, self._read_sock],
                                            [], [], timeout)[0]  #通過select 來做數據檢查等動作.

                    if not s:                                    #這里開始檢查select的結果.結果為假的時候即為timeout這個時候發送心跳.
                        if self.ping_outstanding.is_set():
                            self.ping_outstanding.clear()
                            raise ConnectionDropped(
                                "outstanding heartbeat ping not received")
                        self._send_ping(connect_timeout)       #心跳的主要處理在這里.
                    elif s[0] == self._socket:                  #如果結果為可讀socket.則進行數據讀操作.
                        response = self._read_socket(read_timeout) #_read_socket 函數里做了返回數據檢查.根據返回數據做不同響應.
                        close_connection = response == CLOSE_RESPONSE  #這里檢查是否是server發過來的關閉連接的響應.
                    else:
                        self._send_request(read_timeout, connect_timeout) #_send_request就是發送請求的部分.client里將請求放入_queue隊列里.
            self.logger.info('Closing connection to %s:%s', host, port)   #就是_send_request函數在消費_queue這個對列的里數據.
            client._session_callback(KeeperState.CLOSED)
return STOP_CONNECTING

 

    _connect_attempt函數有被封裝在_connect_loop函數里.這個函數又被zk_loop調用.zk_loop最后在connection.start函數里被裝入由threading.spawn函數里創建的線程中一直運行.

    _read_socket函數是觸發事件的核心函數.代碼如下

    def _read_socket(self, read_timeout):
        """Called when there's something to read on the socket"""
        client = self.client

        header, buffer, offset = self._read_header(read_timeout)
        if header.xid == PING_XID:
            self.logger.log(BLATHER, 'Received Ping')
            self.ping_outstanding.clear()
        elif header.xid == AUTH_XID:
            self.logger.log(BLATHER, 'Received AUTH')

            request, async_object, xid = client._pending.popleft()
            if header.err:
                async_object.set_exception(AuthFailedError())
                client._session_callback(KeeperState.AUTH_FAILED)
            else:
                async_object.set(True)
        elif header.xid == WATCH_XID:                           #在這里通過返回xid判斷是否是事件通知.
            self._read_watch_event(buffer, offset)              #在_read_watch_event函數里將用戶設置的函數從client._datawatch隊列里放進事件等
        else:                                                   #待的進程里去.然后由等待事件的進程來執行.用戶設置的事件是在_read_sponse函數中被裝
            self.logger.log(BLATHER, 'Reading for header %r', header) #入對應的列表里的.

return self._read_response(header, buffer, offset)

     _read_watch_event函數通過將watch放入handler的事件的對列中去.代碼如下.

    def _read_watch_event(self, buffer, offset):
        client = self.client
        watch, offset = Watch.deserialize(buffer, offset)
        path = watch.path

        self.logger.debug('Received EVENT: %s', watch)

        watchers = []

        if watch.type in (CREATED_EVENT, CHANGED_EVENT):              #在這里判斷事件類型.並從不同的類型隊列中放入watchers列表里.
            watchers.extend(client._data_watchers.pop(path, []))
        elif watch.type == DELETED_EVENT:
            watchers.extend(client._data_watchers.pop(path, []))
            watchers.extend(client._child_watchers.pop(path, []))
        elif watch.type == CHILD_EVENT:
            watchers.extend(client._child_watchers.pop(path, []))
        else:
            self.logger.warn('Received unknown event %r', watch.type)
            return

        # Strip the chroot if needed
        path = client.unchroot(path)
        ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], client._state, path) #這個對象是事件函數的參數.

        # Last check to ignore watches if we've been stopped
        if client._stopped.is_set():
            return

        # Dump the watchers to the watch thread
        for watch in watchers:
            client.handler.dispatch_callback(Callback('watch', watch, (ev,))) #然后通過這個方法將需要觸發的事件放入事件線程中監聽的隊列里.
#從這里可以看出watch函數有一個參數.就是watchedEvent的對象.

    再看看handler.dispatch_callback是如何處理的.代碼如下:

def dispatch_callback(self, callback):
        """Dispatch to the callback object
        The callback is put on separate queues to run depending on the
        type as documented for the :class:`SequentialThreadingHandler`.
        """
        self.callback_queue.put(lambda: callback.func(*callback.args))  #可以看見這個函數非常簡單.關鍵就這一句話.就是通過lambda封裝成一句話的函數,然后把這個對象放到callback_queue隊列里.這個隊列在事件線程中循環監聽.

    看看事件監聽線程是如何做的.代碼如下.

def _create_thread_worker(self, queue):  #這個函數在handler.start里被調用.事件的隊列也由調用函數傳遞進來.
        def _thread_worker():  # pragma: nocover
            while True:
                try:
                    func = queue.get()     #這里將事件從隊列里拿出來.然后在下面執行.
                    try:
                        if func is _STOP:
                            break
                        func()           #這里執行事件.
                    except Exception:
                        log.exception("Exception in worker queue thread")
                    finally:
                        queue.task_done()
                except self.queue_empty:
                    continue
        t = self.spawn(_thread_worker)
        return t
def start(self):
        """Start the worker threads."""
        with self._state_change:
            if self._running:
                return

            # Spawn our worker threads, we have
            # - A callback worker for watch events to be called
            # - A completion worker for completion events to be called
            for queue in (self.completion_queue, self.callback_queue):  #可以看見在這里將callback_queue對列里的對象傳遞給了_create_thread_worker函數.然后由函數執行.
                w = self._create_thread_worker(queue)
                self._workers.append(w)
            self._running = True
            python2atexit.register(self.stop)

    到這里為止,整個事件被觸發到裝載執行都已經非常清晰了.接下來看看如何被從最初的位置被放進類型隊列里的.

            watcher = getattr(request, 'watcher', None)  #這是_read_response的最后一部分代碼.可以看見是通過判斷request的類型來判斷需要觸發的事件類型是字節點類型還是數據類型. 
            if not client._stopped.is_set() and watcher:
                if isinstance(request, GetChildren):
                    client._child_watchers[request.path].add(watcher)
                else:
                    client._data_watchers[request.path].add(watcher)

    基本上kazoo的實現核心邏輯就是這樣.通過client將所有共享數據初始化.然后把用戶的請求封裝成接口.在接口中將請求放入發送隊列.然后有connection包類將隊列里的請求發送出去.發送出去之后等待服務器響應,並根據響應做不同動作.並且接收到用戶這個請求的正常響應之后.將用戶注冊的事件裝入對應的類型隊列中.然后等待事件通知到達的時候將事件從類型隊列中拿出來放入一直等待執行事件的線程隊列中.然后由事件線程執行事件.

    在client的__init__函數里可以清晰的看到這些底層邏輯可以另外提供.只需按照接口名稱進行實現即可.

    再詳細的細節部分,可以自行參考對應的github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM