基於Python及zookeeper實現簡單分布式任務調度系統設計思路及核心代碼實現
by:授客 QQ:1033553122
測試環境
功能需求
實現思路
代碼實踐(關鍵技術點實現)
代碼模塊組織結構
配置文件解析
MyTCPServer.py
MyTCPClient.py
appClient.py
loadAgent.py
運行效果 13
測試環境
Win7 64位
Linux 64位
Python 3.3.4
kazoo-2.6.1-py2.py3-none-any.whl(windows)
kazoo-2.6.1.tar.gz (linux)
https://pypi.org/project/kazoo/#files
zookeeper-3.4.13.tar.gz
下載地址1:
http://zookeeper.apache.org/releases.html#download
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
功能需求
把不同的負載主機,注冊為zookeeper的節點,其它應用模塊請求zookeeper獲取相關節點信息(服務器ip,端口號,服務器任務執行狀態),通過服務器任務狀態選擇沒有運行指定任務的服務器執行相關任務。
針對以上需求,做一個技術預研,核心代碼實現
實現思路
負載服務器啟動時,初始化zookeeper客戶端,創建tcp服務器,注冊節點信息到zookeeper服務器(信息包含tcp服務器ip,端口,負載服務器任務執行狀態),然后定時檢測負載服務器任務執行狀態(通過檢測某個進程的名稱是否存在進行判斷),其它應用模塊通過zookeeper獲取節點信息后,通過tcp socket通信,向負載服務器發送執行命令,然后負載服務器根據這些命令進行不同的處理。
代碼實踐(關鍵技術點實現)
代碼模塊組織結構
配置文件解析
conf/agent.conf
[AGENT]
interval = 5
proc = sftp-server
[README]
interval = 更新服務器節點信息頻率(單位 秒
proc = 需要檢測的進程名稱(程序通過查找對應進程名稱來判斷負載程序是否還在運行,從而判斷服務器狀態
conf/tcpserver.conf
[TCPSERVER]
host=10.202.7.165
port = 8000
[README]
host = tcp服務器主機地址
port = tcp服務器監聽端口
conf/zookeeper.conf
[ZOOKEEPER]
hosts = 10.118.52.26:2181
nodeParentPath=/rootNode
[README]
hosts = zookeeper地址,如果是集群地址,即有多個,用英文逗號分隔
nodeParentPath=負載機節點所在父級路徑
MyTCPServer.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import socketserver
from log import logger
class MyTCPHandler(socketserver.BaseRequestHandler):
"""
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def handle(self):
while True:
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).decode('utf-8').strip()
logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))
if self.data == 'bye':
self.request.sendall(bytes('bye', encoding='utf-8'))
self.request.close()
break
else:
self.request.sendall(self.data.upper().encode('utf-8'))
class MyTCPServer:
def __init__(self, host, port):
try:
self.host = host
self.port = port
# Create the server, binding to self.host on port 'self.port'
self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)
except Exception as e:
logger.error('初始化TCPServer失敗:%s' % e)
exit(1)
def start(self):
# Activate the server; this will keep running until you interrupt the program with Ctrl-C
self.server.serve_forever()
MyTCPClient.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import socket
import configparser
import time
from log import logger
if __name__ == '__main__':
if_sock_connected = False
try:
config_parser = configparser.ConfigParser()
config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')
host = config_parser.get('TCPSERVER', 'host')
port = int(config_parser.get('TCPSERVER', 'port'))
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to server and send data
sock.connect((host, port))
if_sock_connected = True # 標記socket是否已連接
i = 0
while i < 10000:
if i == 1000:
sock.sendall(bytes('bye\n', "utf-8"))
else:
sock.sendall(bytes('hello world with tcp\n', "utf-8"))
# Receive data from the server
received = str(sock.recv(1024), "utf-8")
logger.info('receive data from server:%s' % received)
if received == 'bye':
break
time.sleep(5)
i += 1
except Exception as e:
logger.error('程序運行出錯:%s' % e)
finally:
if if_sock_connected:
sock.close()
appClient.py
#!/usr/bin/env python
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import time
from log import logger
from kazoo.client import KazooClient
from kazoo.client import KazooState
def my_listener(state):
if state == KazooState.LOST:
logger.info('LOST')
# Register somewhere that the session was lost
elif state == KazooState.SUSPENDED:
logger.info('SUSPENDED')
# Handle being disconnected from Zookeeper
else:
logger.info('CONNECTED')
# Handle being connected/reconnected to Zookeeper
def my_event_listener(event):
logger.info(event)
zk_client = KazooClient(hosts='10.118.52.26:2181')
zk_client.add_listener(my_listener)
zk_client.start()
node_path = '/rootNode'
sub_node = 'loaderAgent102027165'
children = zk_client.get_children(node_path, watch=my_event_listener)
logger.info('there are %s children with names %s' % (len(children), children))
@zk_client.ChildrenWatch(node_path)
def watch_children(children):
logger.info("Children are now: %s" % children)
@zk_client.DataWatch("%s/%s" % (node_path, sub_node))
def watch_node(data, state):
"""監視節點數據是否變化"""
if state:
logger.info('Version:%s, data:%s' % (state.version, data))
i = 0
while i < 1000:
time.sleep(5)
children = zk_client.get_children(node_path, watch=my_event_listener)
logger.info('there are %s children with names %s' % (len(children), children))
i += 1
zk_client.stop()
zk_client.close()
loadAgent.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import time
import threading
import configparser
import json
import subprocess
from kazoo.client import KazooClient
from kazoo.client import KazooState
from log import logger
from myTCPServer import MyTCPServer
# 全局變量
zk_conn_stat = 0 # zookeeper連接狀態 1-LOST 2-SUSPENDED 3-CONNECTED/RECONNECTED
registry_status = 0 # 服務器節點在zookeeper的注冊狀態 0-未注冊、正在注冊, 1-已注冊
def restart_zk_client():
'''重啟zookeeper會話'''
global zk_client
global zk_conn_stat
try:
zk_client.restart()
registry_zookeeper()
except Exception as e:
logger.error('重啟zookeeper客戶端異常:%s' % e)
def zk_conn_listener(state):
'''zookeeper連接狀態監聽器'''
global zk_conn_stat
global registry_status
if state == KazooState.LOST:
logger.warn('zookeeper connection lost')
zk_conn_stat = 1
registry_status = 0 # 重置是否完成注冊
# Register somewhere that the session was lost
thread = threading.Thread(target=restart_zk_client)
thread.start()
elif state == KazooState.SUSPENDED:
logger.warn('zookeeper connection dicconnected')
zk_conn_stat = 2
# Handle being disconnected from Zookeeper
else:
zk_conn_stat = 3
logger.info('zookeeper connection cconnected/reconnected')
# Handle being connected/reconnected to Zookeeper
def registry_zookeeper():
'''注冊節點信息到zookeeper'''
global node_parent_path
global host
global port
global zk_client
global zk_conn_stat
global registry_status
try:
while zk_conn_stat != 3: # 如果zookeeper客戶端沒連上zookeeper,則先不讓注冊
continue
logger.info('正在注冊負載機到zookeeper...')
zk_client.ensure_path(node_parent_path)
loader_agent_info = '{"host":"%s", "port":%s, "status":"idle"}' % (host, port)
if not zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):
zk_client.create('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'), ephemeral=True, sequence=False)
# children = zk_client.get_children(node_parent_path)
# logger.info('there are %s children with names: %s' % (len(children), children))
# for child in children:
# logger.info(child)
# data, stat = zk_client.get('%s/%s' % (node_parent_path, child))
# logger.info(data)
registry_status = 1 # 完成注冊
logger.info('注冊負載機到zookeeper成功')
return True
except Exception as e:
logger.error('注冊負載機到zookeeper失敗:%s' % e)
return False
def start_tcpserver(tcpserver):
'''啟動tcp服務器'''
tcpserver.start()
def get_server_status(proc_name):
'''通過給定進程名稱獲取服務器狀態'''
with subprocess.Popen('ps -e | grep "%s"' % proc_name, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) as proc:
try:
outs, errs = proc.communicate(timeout=30)
outs = outs.strip()
if outs.find(proc_name) != -1:
# logger.info('獲取負載機狀態成功 %s' % outs)
server_status = 'busy'
elif outs == '':
# logger.info('獲取負載機狀態成功')
server_status = 'idle'
else:
logger.error('獲取負載機狀態失敗:%s' % errs)
server_status = 'unknow'
except Exception as e:
proc.kill()
logger.error('獲取負載機狀態失敗:%s' % e)
server_status = 'unknow'
return server_status
def update_server_status(interval, proc_name):
'''定時檢測並更新服務器狀態:根據進程名稱是否存在來判斷服務器狀態,如果存在則表示服務器被占用,標記服務器狀態為busy,否則標記服務器狀態為 idle
如果根據進程名,檢查進程失敗,則標記服務器狀態為unknow'''
global node_parent_path
global host
global port
while True:
second_for_localtime1 = time.mktime(time.localtime()) # UTC時間(秒)
if zk_conn_stat != 3: # 如果zookeeper客戶端還沒連上zookeeper,則不讓進行后續操作
continue
if registry_status != 1: # 如果zookeeper客戶端已連上zookeeper,但是還沒注冊節點到zookeeper,則不讓進行后續操作
continue
server_status = get_server_status(proc_name)
loader_agent_info = '{"host":"%s", "port":%s, "status":"%s"}' % (host, port, server_status)
'''
這里為啥要加這個判斷:zookeeper刪除臨時節點存在延遲,如果zookeeper客戶端主動關閉后快速重啟並注冊節點信息 這個過程耗時比較短,可能注冊完節點信息時,zookeeper
還沒來得及刪除重啟之前創建的臨時節點,而本次創建的臨時節點路徑和重啟前的一模一樣,這樣導致的結果是,zookeeper接下來的刪除操作,會把重啟后注冊的節點也刪除
'''
if zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):
zk_client.set('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'))
else:
registry_zookeeper()
second_for_localtime2 = time.mktime(time.localtime()) # UTC時間(秒)
time_difference = second_for_localtime2 - second_for_localtime1
if time_difference < interval:
time.sleep(interval - time_difference)
if __name__ == '__main__':
logger.info('正在啟動代理...')
try:
logger.info('正在讀取zookeeper配置...')
config_parser = configparser.ConfigParser()
config_parser.read('./conf/zookeeper.conf', encoding='utf-8-sig')
zk_hosts = config_parser.get('ZOOKEEPER', 'hosts').replace(',', ',').strip()
node_parent_path = config_parser.get('ZOOKEEPER', 'nodeParentPath').replace(',', ',').strip()
logger.info('正在構建並啟動zookeeper客戶端...')
zk_client = KazooClient(hosts=zk_hosts)
zk_client.add_listener(zk_conn_listener)
zk_client.start()
except Exception as e:
logger.error('初始化zookeeper客戶端失敗: %s' % e)
exit(1)
try:
config_parser.clear()
config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')
host = config_parser.get('TCPSERVER', 'host')
port = int(config_parser.get('TCPSERVER', 'port'))
tcp_server = MyTCPServer(host, port)
thread = threading.Thread(target=start_tcpserver, args=(tcp_server,))
thread.start()
except Exception as e:
logger.error('TCPServer啟動失敗:%s,請檢查配置/conf/tcpserver.conf是否正確' % e)
exit(1)
try:
# 注冊到zookeeper
registry_zookeeper()
config_parser.clear()
config_parser.read('./conf/agent.conf', encoding='utf-8-sig')
interval = int(config_parser.get('AGENT', 'interval'))
proc = config_parser.get('AGENT', 'proc').strip()
# 定時更新服務器節點繁忙狀態
update_server_status(interval, proc)
except Exception as e:
logger.error('zk_client運行失敗:%s,請檢查配置/conf/agent.conf是否正確' % e)
exit(1)
運行效果