Python 基於Python及zookeeper實現簡單分布式任務調度系統設計思路及核心代碼實現


基於Python及zookeeper實現簡單分布式任務調度系統設計思路及核心代碼實現

 

by:授客 QQ:1033553122

 

測試環境 

功能需求 

實現思路 

代碼實踐(關鍵技術點實現) 

代碼模塊組織結構 

配置文件解析 

MyTCPServer.py 

MyTCPClient.py 

appClient.py 

loadAgent.py 

運行效果 13

 

 

 

測試環境

Win7 64位

 

Linux 64位

 

Python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

https://pypi.org/project/kazoo/#files

 

zookeeper-3.4.13.tar.gz

下載地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

功能需求

把不同的負載主機,注冊為zookeeper的節點,其它應用模塊請求zookeeper獲取相關節點信息(服務器ip,端口號,服務器任務執行狀態),通過服務器任務狀態選擇沒有運行指定任務的服務器執行相關任務。

 

針對以上需求,做一個技術預研,核心代碼實現

 

 

實現思路

負載服務器啟動時,初始化zookeeper客戶端,創建tcp服務器,注冊節點信息到zookeeper服務器(信息包含tcp服務器ip,端口,負載服務器任務執行狀態),然后定時檢測負載服務器任務執行狀態(通過檢測某個進程的名稱是否存在進行判斷),其它應用模塊通過zookeeper獲取節點信息后,通過tcp socket通信,向負載服務器發送執行命令,然后負載服務器根據這些命令進行不同的處理。

 

 

代碼實踐(關鍵技術點實現)

代碼模塊組織結構

 

 

 

配置文件解析

conf/agent.conf

[AGENT]

interval = 5

proc = sftp-server

 

 

[README]

interval = 更新服務器節點信息頻率(單位 秒

proc = 需要檢測的進程名稱(程序通過查找對應進程名稱來判斷負載程序是否還在運行,從而判斷服務器狀態

 

conf/tcpserver.conf

[TCPSERVER]

host=10.202.7.165

port = 8000

 

[README]

host = tcp服務器主機地址

port = tcp服務器監聽端口

 

conf/zookeeper.conf

[ZOOKEEPER]

hosts = 10.118.52.26:2181

nodeParentPath=/rootNode

 

[README]

hosts = zookeeper地址,如果是集群地址,即有多個,用英文逗號分隔

nodeParentPath=負載機節點所在父級路徑

 

MyTCPServer.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socketserver

from log import logger

 

 

class MyTCPHandler(socketserver.BaseRequestHandler):

    """

    The RequestHandler class for our server.

 

    It is instantiated once per connection to the server, and must

    override the handle() method to implement communication to the

    client.

    """

 

    def handle(self):

        while True:

            # self.request is the TCP socket connected to the client

            self.data = self.request.recv(1024).decode('utf-8').strip()

            logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))

            if self.data == 'bye':

                self.request.sendall(bytes('bye', encoding='utf-8'))

                self.request.close()

                break

            else:

                self.request.sendall(self.data.upper().encode('utf-8'))

 

class MyTCPServer:

    def __init__(self, host, port):

        try:

            self.host = host

            self.port = port

 

            # Create the server, binding to self.host on port 'self.port'

            self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)

        except Exception as e:

            logger.error('初始化TCPServer失敗:%s' % e)

            exit(1)

 

    def start(self):

        # Activate the server; this will keep running until you interrupt the program with Ctrl-C

        self.server.serve_forever()

 

MyTCPClient.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socket

import configparser

import time

 

from log import logger

 

 

if __name__ == '__main__':

    if_sock_connected = False

    try:

        config_parser = configparser.ConfigParser()

        config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

        host = config_parser.get('TCPSERVER', 'host')

        port = int(config_parser.get('TCPSERVER', 'port'))

 

        # Create a socket (SOCK_STREAM means a TCP socket)

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

 

        # Connect to server and send data

        sock.connect((host, port))

 

        if_sock_connected = True # 標記socket是否已連接

        i = 0

 

        while i < 10000:

            if i == 1000:

                sock.sendall(bytes('bye\n', "utf-8"))

            else:

                sock.sendall(bytes('hello world with tcp\n', "utf-8"))

 

            # Receive data from the server

            received = str(sock.recv(1024), "utf-8")

            logger.info('receive data from server:%s' % received)

            if received == 'bye':

                break

 

            time.sleep(5)

 

            i += 1

    except Exception as e:

        logger.error('程序運行出錯:%s' % e)

    finally:

        if if_sock_connected:

            sock.close()

 

 

appClient.py

 

#!/usr/bin/env python

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

from log import logger

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

 

def my_listener(state):

    if state == KazooState.LOST:

        logger.info('LOST')

 

        # Register somewhere that the session was lost

    elif state == KazooState.SUSPENDED:

        logger.info('SUSPENDED')

        # Handle being disconnected from Zookeeper

    else:

        logger.info('CONNECTED')

        # Handle being connected/reconnected to Zookeeper

 

def my_event_listener(event):

    logger.info(event)

 

 

zk_client = KazooClient(hosts='10.118.52.26:2181')

zk_client.add_listener(my_listener)

zk_client.start()

 

node_path = '/rootNode'

sub_node = 'loaderAgent102027165'

children = zk_client.get_children(node_path, watch=my_event_listener)

logger.info('there are %s children with names %s' % (len(children), children))

 

 

@zk_client.ChildrenWatch(node_path)

def watch_children(children):

    logger.info("Children are now: %s" % children)

 

 

@zk_client.DataWatch("%s/%s" % (node_path, sub_node))

def watch_node(data, state):

    """監視節點數據是否變化"""

    if state:

        logger.info('Version:%s, data:%s' % (state.version, data))

 

i = 0

while i < 1000:

    time.sleep(5)

    children = zk_client.get_children(node_path, watch=my_event_listener)

    logger.info('there are %s children with names %s' % (len(children), children))

    i += 1

 

zk_client.stop()

zk_client.close()

 

 

 

 

 

loadAgent.py

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

import threading

import configparser

import json

import subprocess

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

from log import logger

 

from myTCPServer import MyTCPServer

 

# 全局變量

zk_conn_stat = 0 # zookeeper連接狀態 1-LOST   2-SUSPENDED 3-CONNECTED/RECONNECTED

registry_status = 0 # 服務器節點在zookeeper的注冊狀態  0-未注冊、正在注冊, 1-已注冊

 

def restart_zk_client():

    '''重啟zookeeper會話'''

 

    global zk_client

    global zk_conn_stat

    try:

        zk_client.restart()

        registry_zookeeper()

    except Exception as e:

        logger.error('重啟zookeeper客戶端異常:%s' % e)

 

 

def zk_conn_listener(state):

    '''zookeeper連接狀態監聽器'''

 

    global zk_conn_stat

    global registry_status

    if state == KazooState.LOST:

        logger.warn('zookeeper connection lost')

        zk_conn_stat = 1

        registry_status = 0 # 重置是否完成注冊

        # Register somewhere that the session was lost

 

        thread = threading.Thread(target=restart_zk_client)

        thread.start()

 

    elif state == KazooState.SUSPENDED:

        logger.warn('zookeeper connection dicconnected')

        zk_conn_stat = 2

        # Handle being disconnected from Zookeeper

    else:

        zk_conn_stat = 3

        logger.info('zookeeper connection cconnected/reconnected')

        # Handle being connected/reconnected to Zookeeper

 

def registry_zookeeper():

    '''注冊節點信息到zookeeper'''

 

    global node_parent_path

    global host

    global port

    global zk_client

    global zk_conn_stat

    global registry_status

 

    try:

        while zk_conn_stat != 3: # 如果zookeeper客戶端沒連上zookeeper,則先不讓注冊

            continue

 

        logger.info('正在注冊負載機到zookeeper...')

        zk_client.ensure_path(node_parent_path)

 

        loader_agent_info = '{"host":"%s", "port":%s, "status":"idle"}' % (host, port)

 

        if not zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):

            zk_client.create('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'), ephemeral=True, sequence=False)

 

        # children = zk_client.get_children(node_parent_path)

        # logger.info('there are %s children with names: %s' % (len(children), children))

        # for child in children:

        #     logger.info(child)

        #     data, stat = zk_client.get('%s/%s' % (node_parent_path, child))

        #     logger.info(data)

        registry_status = 1 # 完成注冊

        logger.info('注冊負載機到zookeeper成功')

        return True

    except Exception as e:

        logger.error('注冊負載機到zookeeper失敗:%s' % e)

        return False

 

 

def start_tcpserver(tcpserver):

    '''啟動tcp服務器'''

 

    tcpserver.start()

 

 

def get_server_status(proc_name):

    '''通過給定進程名稱獲取服務器狀態'''

 

    with subprocess.Popen('ps -e | grep "%s"' % proc_name, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) as proc:

        try:

            outs, errs = proc.communicate(timeout=30)

            outs = outs.strip()

            if outs.find(proc_name) != -1:

                # logger.info('獲取負載機狀態成功 %s' % outs)

                server_status = 'busy'

            elif outs == '':

                # logger.info('獲取負載機狀態成功')

                server_status = 'idle'

            else:

                logger.error('獲取負載機狀態失敗:%s' % errs)

                server_status = 'unknow'

        except Exception as e:

            proc.kill()

            logger.error('獲取負載機狀態失敗:%s' % e)

            server_status = 'unknow'

    return server_status

 

 

def update_server_status(interval, proc_name):

    '''定時檢測並更新服務器狀態:根據進程名稱是否存在來判斷服務器狀態,如果存在則表示服務器被占用,標記服務器狀態為busy,否則標記服務器狀態為 idle

    如果根據進程名,檢查進程失敗,則標記服務器狀態為unknow'''

 

    global node_parent_path

    global host

    global port

 

    while True:

        second_for_localtime1 = time.mktime(time.localtime()) # UTC時間(秒)

 

        if zk_conn_stat != 3: # 如果zookeeper客戶端還沒連上zookeeper,則不讓進行后續操作

            continue

 

        if registry_status != 1: # 如果zookeeper客戶端已連上zookeeper,但是還沒注冊節點到zookeeper,則不讓進行后續操作

            continue

 

        server_status = get_server_status(proc_name)

        loader_agent_info = '{"host":"%s", "port":%s, "status":"%s"}' % (host, port, server_status)

        '''

        這里為啥要加這個判斷:zookeeper刪除臨時節點存在延遲,如果zookeeper客戶端主動關閉后快速重啟並注冊節點信息 這個過程耗時比較短,可能注冊完節點信息時,zookeeper

        還沒來得及刪除重啟之前創建的臨時節點,而本次創建的臨時節點路徑和重啟前的一模一樣,這樣導致的結果是,zookeeper接下來的刪除操作,會把重啟后注冊的節點也刪除

       '''

        if zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):

            zk_client.set('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'))

        else:

            registry_zookeeper()

 

        second_for_localtime2 = time.mktime(time.localtime()) # UTC時間(秒)

        time_difference = second_for_localtime2 - second_for_localtime1

        if time_difference < interval:

            time.sleep(interval - time_difference)

 

 

if __name__ == '__main__':

    logger.info('正在啟動代理...')

 

    try:

        logger.info('正在讀取zookeeper配置...')

        config_parser = configparser.ConfigParser()

        config_parser.read('./conf/zookeeper.conf', encoding='utf-8-sig')

        zk_hosts = config_parser.get('ZOOKEEPER', 'hosts').replace(',', ',').strip()

        node_parent_path = config_parser.get('ZOOKEEPER', 'nodeParentPath').replace(',', ',').strip()

 

        logger.info('正在構建並啟動zookeeper客戶端...')

        zk_client = KazooClient(hosts=zk_hosts)

        zk_client.add_listener(zk_conn_listener)

        zk_client.start()

    except Exception as e:

        logger.error('初始化zookeeper客戶端失敗: %s' % e)

        exit(1)

 

    try:

        config_parser.clear()

        config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

        host = config_parser.get('TCPSERVER', 'host')

        port = int(config_parser.get('TCPSERVER', 'port'))

        tcp_server  = MyTCPServer(host, port)

        thread = threading.Thread(target=start_tcpserver, args=(tcp_server,))

        thread.start()

    except Exception as e:

        logger.error('TCPServer啟動失敗:%s,請檢查配置/conf/tcpserver.conf是否正確' % e)

        exit(1)

 

 

    try:

        # 注冊到zookeeper

        registry_zookeeper()

 

        config_parser.clear()

        config_parser.read('./conf/agent.conf', encoding='utf-8-sig')

        interval = int(config_parser.get('AGENT', 'interval'))

        proc = config_parser.get('AGENT', 'proc').strip()

 

        # 定時更新服務器節點繁忙狀態

        update_server_status(interval, proc)

    except Exception as e:

        logger.error('zk_client運行失敗:%s,請檢查配置/conf/agent.conf是否正確' % e)

        exit(1)

 

 

 

運行效果

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM