概念
IO多路復用是指內核一旦發現進程指定的一個或者多個IO條件准備讀取,它就通知該進程
通俗理解(摘自網上一大神)
三個函數
1、select
進程指定內核監聽哪些文件描述符(最多監聽1024個fd)的哪些事件,當沒有文件描述符事件發生時,進程被阻塞;當一個或者多個文件描述符事件發生時,進程被喚醒。
當我們調用select()時:
1 上下文切換轉換為內核態
2 將fd從用戶空間復制到內核空間
3 內核遍歷所有fd,查看其對應事件是否發生
4 如果沒發生,將進程阻塞,當設備驅動產生中斷或者timeout時間后,將進程喚醒,再次進行遍歷
5 返回遍歷后的fd
6 將fd從內核空間復制到用戶空間
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout]) 參數: 可接受四個參數(前三個必須) rlist: wait until ready for reading wlist: wait until ready for writing xlist: wait for an “exceptional condition” timeout: 超時時間 返回值:三個列表 select方法用來監視文件描述符(當文件描述符條件不滿足時,select會阻塞),當某個文件描述符狀態改變后,會返回三個列表 1、當參數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd並添加到fd_r_list中 2、當參數2 序列中含有fd時,則將該序列中所有的fd添加到 fd_w_list中 3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中 4、當超時時間為空,則select會一直阻塞,直到監聽的句柄發生變化 當超時時間 = n(正整數)時,那么如果監聽的句柄均無任何變化,則select會阻塞n秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。
在服務端我們可以看到,我們需要不停的調用select, 這就意味着:
1 當文件描述符過多時,文件描述符在用戶空間與內核空間進行copy會很費時
2 當文件描述符過多時,內核對文件描述符的遍歷也很浪費時間
3 select最大僅僅支持1024個文件描述符
參考:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.html
2、poll
參考:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html
3、epoll
參考:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html
epoll是select和poll改進后的結果,相比下epoll具有以下優點:
1、支持一個進程打開的socket描述符(FD)不受限制(僅受限於操作系統的最大文件句柄數)
select最大的缺陷就是單個進程所打開的FD是有一定限制的,它由FD_SETSIZE設置,默認值是1024,epoll並沒有這個限制,它所支持的FD上限是操作系統的最大文件句柄數,這個數字遠遠大於1024
2、I/O效率不會隨着FD數目的增加而線性下降
epoll的解決方案在epoll_ctl函數中。每次注冊新的事件到epoll句柄中時,會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝一次
傳統的select/poll另一個致命弱點就是當你擁有一個很大的socket集合,由於網絡延時或者鏈路空閑,任一時刻只有少部分的socket是“活躍”的,但是select/poll每次調用都會線性掃描全部集合,導致效率呈現線性下降。epoll不存在這個問題,它只會對“活躍”的socket進行操作-這是因為在內核實現中epoll是根據每個fd上面的callback函數實現的,那么,只有“活躍”的socket才會主動的去調用callback函數,其他idle狀態socket則不會。在這點上,epoll實現了一個偽AIO
3、使用mmap加速內核與用戶空間的消息傳遞
epoll會在epoll_ctl時把指定的fd遍歷一遍(這一遍必不可少)並為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd
無論是select,poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必要的內存復制就顯得非常重要,epoll是通過內核和用戶空間mmap使用同一塊內存實現。
4、epoll的API更加簡單
用來克服select/poll缺點的方法不只有epoll,epoll只是一種Linux的實現方案。在freeBSD下有kqueue,而dev/poll是最古老的Solaris的方案,使用難度依次遞增。但epoll更加簡單。
epoll詳解(python中)
Python中的select模塊專注於I/O多路復用,提供了select poll epoll三個方法(其中后兩個在Linux中可用,windows僅支持select),另外也提供了kqueue方法(freeBSD系統)
select.epoll(sizehint=-1, flags=0) 創建epoll對象
epoll.close() Close the control file descriptor of the epoll object.關閉epoll對象的文件描述符 epoll.closed True if the epoll object is closed.檢測epoll對象是否關閉 epoll.fileno() Return the file descriptor number of the control fd.返回epoll對象的文件描述符 epoll.fromfd(fd) Create an epoll object from a given file descriptor.根據指定的fd創建epoll對象 epoll.register(fd[, eventmask]) Register a fd descriptor with the epoll object.向epoll對象中注冊fd和對應的事件 epoll.modify(fd, eventmask) Modify a registered file descriptor.修改fd的事件 epoll.unregister(fd) Remove a registered file descriptor from the epoll object.取消注冊 epoll.poll(timeout=-1, maxevents=-1) Wait for events. timeout in seconds (float)阻塞,直到注冊的fd事件發生,會返回一個dict,格式為:{(fd1,event1),(fd2,event2),……(fdn,eventn)}
事件:
EPOLLERR = 8 ----發生錯誤 EPOLLET = 2147483648 ----默認為水平觸發,設置該事件后則邊緣觸發 EPOLLHUP = 16 ----掛起狀態 EPOLLIN = 1 ----可讀 EPOLLMSG = 1024 ----忽略 EPOLLONESHOT = 1073741824 ----一次性行為。在退出一個事件后,FD內部禁用 EPOLLOUT = 4 ----可寫 EPOLLPRI = 2 ----緊急可讀 EPOLLRDBAND = 128 ----讀取優先 EPOLLRDNORM = 64 ----相當於epollin EPOLLWRBAND = 512 ----寫入優先 EPOLLWRNORM = 256 ----相當於epollout
水平觸發和邊緣觸發:
Level_triggered(水平觸發,有時也稱條件觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll.poll()會通知處理程序去讀寫。如果這次沒有把數據一次性全部讀寫完(如讀寫緩沖區太小),那么下次調用 epoll.poll()時,它還會通知你在上沒讀寫完的文件描述符上繼續讀寫,當然如果你一直不去讀寫,它會一直通知你!!!如果系統中有大量你不需要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率!!! 優點很明顯:穩定可靠
Edge_triggered(邊緣觸發,有時也稱狀態觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll.poll()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完(如讀寫緩沖區太小),那么下次調用epoll.poll()時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件才會通知你!!!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符!!!缺點:某些條件下不可靠

#!/usr/bin/python # coding:utf-8 import select,socket import time EOL1 = b'\n\n' EOL2 = b'\n\r\n' response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n' response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n' response += b'Hello, world!' sk = socket.socket() sk.bind(('192.168.110.100',8080)) sk.listen(5) sk.setblocking(0) #設置非阻塞模式 epoll = select.epoll() #建立一個epoll對象 epoll.register(sk.fileno(),select.EPOLLIN) #監聽sk文件描述符的讀事件(連接過來,產生讀事件) try: connections = {}; requests = {}; responses = {} while True: events = epoll.poll() #關注是否有關心的事發生 for fileno,event in events: # 返回的events是一個(fileno, event code)tuple列表. fileno是文件描述符, 是一個整型數. if fileno == sk.fileno(): #如果是服務器socket事件(即有新連接),建立一個新 連接 connection, address = sk.accept() #建立的新連接 connection.setblocking(0) #設置socket為非阻塞模式. epoll.register(connection.fileno(), select.EPOLLIN) # 注冊socket的read(EPOLLIN)事件 connections[connection.fileno()] = connection # 保存文件描述符 requests[connection.fileno()] = b'' #發送過來的內容 responses[connection.fileno()] = response # 要發送的內容 elif event & select.EPOLLIN: #如果讀事件發生 requests[fileno] += connections[fileno].recv(1024)# 從客戶端讀取信息 if EOL1 in requests[fileno] or EOL2 in requests[fileno]: #表示信息接收完畢,結束標志 epoll.modify(fileno, select.EPOLLOUT)#一旦完整的http請求接收到,取消注冊讀取事件,注冊寫入事件(EPOLLOUT), 寫入事件在能夠發送數據回客戶端的時候產生 print('-'*40 + '\n' + requests[fileno].decode()[:-2]) elif event & select.EPOLLOUT: #如果寫入事件發生在一個客戶端socket上面, 我們就可以發送新數據到客戶端了. byteswritten = connections[fileno].send(responses[fileno]) #發送數據到客戶端,並返回發送的字節個數 responses[fileno] = responses[fileno][byteswritten:] #對字符串進行切片操作,如果完全切,表面發送完畢 if len(responses[fileno]) == 0: #表明數據發送完畢 epoll.modify(fileno, 0) #一旦所有的返回數據都發送完, 取消監聽讀取和寫入事件. connections[fileno].shutdown(socket.SHUT_RDWR) elif event & select.EPOLLHUP: #表示客戶端斷開連接 epoll.unregister(fileno) #取消注冊 connections[fileno].close() #斷開連接. del connections[fileno] #銷毀對象 finally: epoll.unregister(sk.fileno()) epoll.close() serversocket.close()

#!/usr/bin/python # coding:utf-8 import socket obj = socket.socket() obj.connect(('192.168.110.100',8080)) obj.sendall('hellob\n\r\n') print obj.recv(1024) obj.close()
實戰代碼:

# /usr/bin/python # coding:utf-8 import select import socket import sys import Queue import time import threading import logging import datetime import re, os import hashlib sys.path.append('../') import multiprocessing from SQLdb import SQLdb from mylog import MyLog as Log from communication_packet import Communication_Packet, Communication_Packet_Flags, Error_Info_Flags from encryption import PrpCrypt import pdb ''' Constant Meaning EPOLLIN Available for read EPOLLOUT Available for write EPOLLPRI Urgent data for read EPOLLERR Error condition happened on the assoc. fd EPOLLHUP Hang up happened on the assoc. fd EPOLLET Set Edge Trigger behavior, the default is Level Trigger behavior EPOLLONESHOT Set one-shot behavior. After one event is pulled out, the fd is internally disabled EPOLLRDNORM Equivalent to EPOLLIN EPOLLRDBAND Priority data band can be read. EPOLLWRNORM Equivalent to EPOLLOUT EPOLLWRBAND Priority data may be written. EPOLLMSG Ignored. ''' class Server(object): def __init__(self, server_IP=None, server_port=None): # def __init__(self,server_address = ('112.33.9.154',11366)): ''' 初始化服務器一些全局數據 ''' # pdb.set_trace() # 使用默認模式:debug模式 self.log = Log() self.log.openConsole() # 打開控制端輸出 self.logger = self.log.getLog() self.dbname = 'sync_test' # Defaut, we use local host IP and port:11366 if server_IP is None or server_port is None: if server_IP is None: try: self.server_IP = self.getlocalIP() self.logger.info('Current server_IP: %s' % self.server_IP) except: self.logger.critical('Get server IP Error!') raise if server_port is None: self.server_port = 11366 else: self.server_IP = server_IP self.server_port = server_port self.server_address = (self.server_IP, self.server_port) # 設置server地址 self.ListenNum = 100 # 設置最大監控soket連接數 self.connections = {} # 記錄當前連接 self.requests = {} # 記錄當前連接的請求數據 self.addresses = {} # 記錄客戶端地址 self.errorInfo = {} # 記錄錯誤信息,如果出錯則把錯誤信息返回給客戶端 self.responseInfo = {} self.readthreadRecord = {} self.lock = threading.Lock() # 構造線程鎖用於數據同步 self.db = SQLdb() # 初始化數據庫用於數據庫同步 self.setDB('localhost', 'root', '123456', 'sync_test') self.readthreadlock = threading.Lock() self.EOF = '\n\r\n' self.servernum = 'serverxxxxx' self.key = '91keytest' # set communication user id Communication_Packet.set_userid(self.servernum) self.encryption = PrpCrypt() pass def setServerAddree(self, server_ip, server_port): ''' Set server address ''' self.server_address = (server_ip, server_port) # 設置server地址 def setDB(self, host=None, username=None, password=None, dbname=None): self.db = SQLdb(host, username, password, dbname) def getlocalIP(self): ''' 獲取第一塊網卡i做為綁定IP ''' try: s = os.popen('ifconfig').read() except: raise else: ip = re.findall('inet addr:(?<![\.\d])(?:\d{1,3}\.){3}\d{1,3}(?![\.\d])', s)[0].split(':')[1] return ip def __init_server(self): ''' 初始化server以及epoll監控對象 ''' try: # pdb.set_trace() # Create a TCP/IP socket self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 設置socket為非阻塞狀態 self.server.setblocking(0) # Bind the socket to the port self.logger.info('starting up on %s port %s' % self.server_address) try: self.server.bind(self.server_address) except: echo = os.popen('''lsof -i :11366 |grep "(LISTEN)" | awk '{printf($1)}' ''').read() print '端口%s被%s進程占用!' % (self.server_port, echo) self.logger.error('Bind on %s port %s Error!' % self.server_address) raise # Listen for incoming connections # self.server.listen(self.ListenNum) self.server.listen(1) # Set up the epoll self.epoll = select.epoll() self.epoll.register(self.server.fileno(), select.EPOLLIN | select.EPOLLET) # 為server注冊讀事件,並設置其為邊緣出發模式 except: raise Exception('__init_server Error') def __jionthreads(self): ''' join the threading ''' # self.logger.debug('Current threadtast is %d ' % len(threading.enumerate())) main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue else: t.join(0) # 非阻塞join # self.logger.debug('After joined the threads... %d '% len(threading.enumerate())) def __format_str(self, businessFlag, data, endFlag=True, errorFlag=False, hasnewconf=False, versioninfo=''): ''' 格式化發送數據 ''' formatstr = {'BUSINESS_TYPE': businessFlag, 'DATA': data, 'ENDFLAG': endFlag, 'ERRORFLAG': errorFlag, 'HASNWECONF': hasnewconf, 'VERSIONINFO': versioninfo} return str(formatstr) + self.EOF def get_table_filed(self, table_name, db, lock, db_name): # pdb.set_trace() # from the db get the table field! query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s' and TABLE_SCHEMA = '%s';" % ( table_name, db_name) with self.lock: detection_version_fields = self.db.fechdb(query_detection_version_field_sql) DB_ERROR = self.db.Error if DB_ERROR: # Record Error and end task DB_ERROR = False self.logger.error('----Get %s fileds Error! End this task-----' % table_name) return else: # query result is Unicode,so we need to encode to utf-8 table_field = [field[0].encode('utf-8') for field in detection_version_fields] return table_field def calc_md5(self, data): return hashlib.md5(data).hexdigest() def validating_message_token(self, receving_data): # print receving_data # pdb.set_trace() # print receving_data print len(receving_data) pre_md5 = receving_data[:16] suffix_md5 = receving_data[-16:] message_md5 = pre_md5 + suffix_md5 message = receving_data[16:-16] cur_md5 = self.calc_md5(message) print cur_md5, message_md5 if message_md5 == cur_md5: return True, message else: return False, message pass def validating_content_token(self, content): receive_content = content['Business']['Content'] if not isinstance(receive_content, str): receive_content = str(receive_content) receive_md5 = content['Info_Status']['Security_Token'] receive_time = content['Info_Status']['Time'] cur_md5 = self.calc_md5(receive_content + receive_time + self.key) if cur_md5 == receive_md5: return True else: return False pass def packaging_message(self, Message=None, Error=None, Info_Status=None): # ----Pack send message # Init Communication_Packet cm_packet = Communication_Packet() # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True): cm_packet.set_content_Business(targv=Message) # def set_content_Error(self,error_flag = False,error_type = None,error_info = None): cm_packet.set_content_Error(targv=Error) now_time = str(datetime.datetime.now()) # get current time # Business+time+key calculate token,calculate token security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key) # def set_content_Info_Status(self,info_type,security_token,time,is_end): # Need to replace the security_token Info_Status = list(Info_Status) # 轉化元組到列表 Info_Status[1] = security_token Info_Status[2] = now_time Info_Status = tuple(Info_Status) # 重新轉化列表到元組作為參數 cm_packet.set_content_Info_Status(targv=Info_Status) try: send_data = cm_packet.content except Exception, e: raise e else: # we Encryption data self.logger.debug(type(send_data)) encryption_send_data = self.encryption.encrypt(str(send_data)) # caculate md5 encrypt_send_md5 = self.calc_md5(encryption_send_data) complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF return complete_send_data def unpackaging_message(self, unpacking_str): # pdb.set_trace() if not isinstance(unpacking_str, str): raise exceptions.ValueError else: unpacking_str = unpacking_str.strip(self.EOF) flag, message = self.validating_message_token(unpacking_str) if flag: decrypt_str = self.encryption.decrypt(message) try: message_dict = eval(decrypt_str) except Exception, e: self.logger.error('Eval decrypt_str Error!') raise e else: if self.validating_content_token(message_dict): return message_dict else: self.logger.error('Message is tampered!') return None pass else: self.logger.error('Message is tampered!') return None def init_detection_nums(self): # get detect_point_nums from server db # pdb.set_trace() query_sql = "select distinct(sync_point_no) from sync_control" BD_ERROR = False with self.lock: point_nums = self.db.fechdb(query_sql) BD_ERROR = self.db.Error if BD_ERROR: self.logger.error('-----get detect_point_nums error-----') if point_nums: nums_list = [p[0] for p in point_nums] return nums_list else: return None def verification_sync_point_no(self, detection_num): detect_point_nums = self.init_detection_nums() if not detect_point_nums: self.logger.error('-----db config error!-----') raise if detection_num in detect_point_nums: return True else: return False def read_sync_contol_configure(self, table_name, sync_type, sync_point_no, sync_user='server'): # Read the control configuration from loacal db,if have not,we sync it from server,then read it again # pdb.set_trace() qeury_sql = "select * from sync_control where sync_table = '%s' and sync_type = '%s' and sync_user = '%s' and sync_point_no = '%s';" % ( table_name, sync_type, sync_user, sync_point_no) DB_ERROR = False # set table name sync_table_name = 'sync_control' # get `sync_control` table fields table_field = self.get_table_filed(sync_table_name, self.lock, self.db, self.dbname) if not table_field: self.logger.error('----------' % table_name) return None with self.lock: control_configure = self.db.fechdb(qeury_sql) DB_ERROR = self.db.Error if DB_ERROR: self.logger.error('-----Get control configure Error!-----') return None if control_configure: # Get the configure from db! and On the basis of classification of table name and sync type(uploat or download) # format the configure to a list control_configure_list = [] for iter_conf in control_configure: control_configure_item = dict.fromkeys(table_field) lenth = len(table_field) # set value for everyone key for i in range(lenth): val = iter_conf[i] if isinstance(val, unicode): val = val.encode('utf-8') control_configure_item[table_field[i]] = val control_configure_list.append(control_configure_item) return control_configure_list def parsing_config(self, config): # parsing the config to get the sql # may modify the logic of the code # pdb.set_trace() p_conf = dict() table_name = config['sync_table'] # Get the download table name p_conf['table_name'] = table_name table_field = config['sync_field'] # Get sync table field! p_conf['sync_type'] = config['sync_type'] p_conf['sync_range'] = config['sync_range'] p_conf['sync_range_value'] = config['sync_range_value'] p_conf['sync_is_update_time'] = config['sync_is_update_time'] # if table_field is null,we need sync all the field! if not table_field: table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname) if not table_field: self.logger.error( '-----Terminate this task,becase of getting the %s table fileds fialed!-----' % table_name) return p_conf['table_field'] = table_field # Get this operation's type try: sql_operations = eval(config['sync_operation_type']) except Exception, e: self.logger.error('-----get sync_operation_type error!-----') return upside_operate = sql_operations['upside'] # if have, this download operation need carry db info to the server! p_conf['upside_operate'] = upside_operate downside_operate = sql_operations['downside'] # how to handle the downloaded db info! p_conf['downside_operate'] = downside_operate update_state_operate = sql_operations['update_state'] p_conf['update_state_operate'] = update_state_operate # Get the sync sql of the corresponding operation try: sqls = eval(config['sync_sql']) except Exception, e: self.logger.error('-----get sync_sql error!-----') raise upside_sqls = sqls['upside'] # a tuple or None p_conf['upside_sqls'] = upside_sqls downside_sqls = sqls['downside'] # a tuple or None p_conf['downside_sqls'] = downside_sqls update_state_sqls = sqls['update_state'] # a tuple or None p_conf['update_state_sqls'] = update_state_sqls # Get the sync patch field of the corresponding operation try: if config['sync_patch_field']: patch_fields = eval(config['sync_patch_field']) else: pass except Exception, e: self.logger.error('-----get sync_field error!-----') raise upside_fields = patch_fields['upside'] # a tuple or None p_conf['upside_fields'] = upside_fields downside_fields = patch_fields['downside'] # a tuple or None p_conf['downside_fields'] = downside_fields update_state_fields = patch_fields['update_state'] # a tuple or None p_conf['update_state_fields'] = update_state_fields # Get the sync_field_value of the corresponding operation try: if config['sync_patch_field_value']: patch_field_values = eval(config['sync_patch_field_value']) else: pass except Exception, e: self.logger.error('-----get sync_field_value error!-----') return upside_patch_field_values = patch_field_values['upside'] # a tuple or None p_conf['upside_patch_field_values'] = upside_patch_field_values downside_patch_field_values = patch_field_values['downside'] # a tuple or None p_conf['downside_patch_field_values'] = downside_patch_field_values update_state_patch_field_values = patch_field_values['update_state'] # a tuple or None p_conf['update_state_patch_field_values'] = update_state_patch_field_values is_carry_state = config['sync_is_carry_state'] p_conf['is_carry_state'] = is_carry_state is_update_state = config['sync_is_update_state'] p_conf['is_update_state'] = is_update_state is_use_state_carry_data = config['sync_is_use_state_carry_data'] p_conf['is_use_state_carry_data'] = is_use_state_carry_data return p_conf def __proxy(self, FileNo): # 啟動線程處理寫事件 newthread = threading.Thread(target=self.__handler_write_event, args=(FileNo,)) # newthread.daemon = True newthread.start() def __delevnet(self, FileNo): ''' 注銷不再關注的事件以及刪除相關的資源 ''' self.logger.info('Start to unregister and close %s socket... ' % FileNo) self.epoll.unregister(FileNo) self.connections[FileNo].close() del self.connections[FileNo] del self.requests[FileNo] del self.addresses[FileNo] del self.errorInfo[FileNo] self.logger.info('unregistered and closed the %s socket! ' % FileNo) # def __read_from_socket(self,FileNo): # ''' # Read data from socket # ''' # if self.requests[FileNo]: # return False # else: # try: # while True: # tmpdata = self.connections[FileNo].recv(4096) # if not tmpdata: # return True # self.requests[FileNo] += tmpdata # self.logger.debug(len(tmpdata)) # #print tmpdata # except socket.error: # return True # except Exception,e: # raise e def __read_from_socket(self, FileNo): ''' Read data from socket ''' try: while True: tmpdata = self.connections[FileNo].recv(4096) # print 'tmpdata: %s' % tmpdata # 因為python沒有EPOLLRDHUP,而客戶端主動關閉或者沒有發送數據前ctr+c # 服務器觸發的是EPOLLIN事件,而從socket里面讀取到的數據為空...沒有找到其他解決方案! if not tmpdata: break self.requests[FileNo] += tmpdata # self.logger.debug(len(tmpdata)) except socket.error: pass except Exception, e: raise e # error_flag = False,error_type = None,error_info = None def __deal_business(self, FileNo): # 根據接受到的數據處理客戶端業務 # pdb.set_trace() try: message = self.unpackaging_message(self.requests[FileNo]) # we need reset the requests info self.requests[FileNo] = '' # self.logger.debug(message) except: # 需要設置錯誤標志並卻設置錯誤信息 self.logger.error('unpackaging_message Error!') self.errorInfo[FileNo] = (Error_Info_Flags.Receive_Data_Error, 'Server recieved data Error!') return else: if message: business = message['Business'] client_id = message['userid'] error_info = message['Error'] info_states = message['Info_Status'] verification_result = self.verification_sync_point_no(client_id) if verification_result: # Here we handle the business # 1、get config from the db # read_sync_contol_configure(self,table_name,sync_type,sync_point_no,sync_user): try: table_name = business['Table_Name'] sync_type = business['Type'] is_sync_flag = business['Is_Syncdb'] except: self.errorInfo[FileNo] = ( Error_Info_Flags.Receive_Data_Error, 'Business information is incomplete!') return if sync_type == Communication_Packet_Flags.DOLOAD_DB: s_type = 'download' else: s_type = 'upload' b_config = self.read_sync_contol_configure(table_name, s_type, client_id) # pdb.set_trace() if b_config: p_config = [self.parsing_config(conf) for conf in b_config] self.logger.debug(p_config) else: pass if b_config: self.real_business_processing_functions(FileNo, p_config, business, info_states, error_info) else: # set error info! self.errorInfo[FileNo] = ( Error_Info_Flags.Server_config_Error, 'Server config is None, give up this task!') else: # 用戶信息認證失敗 self.logger.error('-----User authentication failed! userid: %s-----' % client_id) self.errorInfo[FileNo] = (Error_Info_Flags.User_Certification_Error, 'User authentication failed!') else: # if no message,it means information authenticationfailed! self.logger.error('-----Clinet\'s Information authentication failed!-----') self.errorInfo[FileNo] = ( Error_Info_Flags.Info_Certification_Error, 'Information authentication failed!') def calculate_time(self, time_type, time_value): # maybe is str,we need to convert it to int type time_value = int(time_value) # get current time as the end time cur_time = datetime.datetime.now() hours = 0 if time_type == 'hour': hours = time_value * 24 elif time_type == 'day': hours = time_value * 24 elif time_type == 'week': hours = time_value * 24 * 7 elif time_type == 'month': hours = time_value * 24 * 30 else: self.logger.error('-----time_type Error!-----') return None # caculate the start time start_time = cur_time - datetime.timedelta(hours=hours) return (start_time, cur_time) # handle the bussiness from the client def real_business_processing_functions(self, FileNo, business_config, business, info_states, error_info): # pdb.set_trace() # according to the config we handle the business business_config = business_config[0] if info_states['Info_Type'] == Communication_Packet_Flags.REQEST: # get bussiness type request_bussiness_type = business['Type'] if request_bussiness_type == Communication_Packet_Flags.UPLOAD_DB: request_bussiness_type = 'upload' elif request_bussiness_type == Communication_Packet_Flags.DOLOAD_DB: request_bussiness_type = 'download' else: self.errorInfo[FileNo] = ( Error_Info_Flags.Client_Data_Pack_Error, 'Request business type error %s' % request_bussiness_type) return loc_config_sync_type = business_config['sync_type'] if request_bussiness_type == loc_config_sync_type: is_carry_state = business_config['is_carry_state'] is_use_state_carry_data = business_config['is_use_state_carry_data'] is_update_state = business_config['is_update_state'] # handle the download request if request_bussiness_type == 'download': # parsing the loacal config up_sql_list = [] upside_operates = business_config['upside_operate'].split('|') upside_sqls = business_config['upside_sqls'] upside_fields = business_config['upside_fields'] upside_patch_field_values = business_config['upside_patch_field_values'] sync_range = business_config['sync_range'] sync_range_value = business_config['sync_range_value'] lenth = len(upside_sqls) for i in range(lenth): sql_part = upside_sqls[i] # if sync_range is not None,we will ignore the other if sync_range: if sync_range == 'period': t_type, t_value = sync_range_value.split(':') s_time, e_time = self.calculate_time(t_type, t_value) qeury_sql = sql_part % (str(s_time), str(e_time)) else: qeury_sql = sql_part # add it into the list up_sql_list.append(qeury_sql) else: # we need parsing other configurations if is_use_state_carry_data: try: # [((u'update_time', u'1970-01-01 00:00:00'),)] limk this qeury_sql = sql_part % business['Content'][0][0][1] except: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Content Error!') else: qeury_sql = sql_part up_sql_list.append(qeury_sql) query_data = [] for u_sql in up_sql_list: BD_ERROR = False with self.lock: res = self.db.fechdb(u_sql) BD_ERROR = self.db.Error if BD_ERROR: self.errorInfo[FileNo] = ( Error_Info_Flags.Server_DB_Error, 'Server db Error,SQL: %s' % u_sql) break else: query_data.append(res) self.responseInfo[FileNo] = query_data # handle the upload request elif request_bussiness_type == 'upload': # pdb.set_trace() # parsing the loacal config content = business['Content'] try: self.refresh_the_database(business_config, content) except Exception, e: print e self.errorInfo[FileNo] = (Error_Info_Flags.Server_config_Error, 'Server Config Error!') else: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'bussiness type Error!') else: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'server config type is different from client request business type! Error!') else: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Communication_Packet_Flags Error!') # update the db def refresh_the_database(self, handle_config, db_content): ''' refresh the database,maybe insert、update、delete... ''' # parsing the handle config table_name = handle_config['table_name'] table_field = handle_config['table_field'] downside_operate = handle_config['downside_operate'] update_state_operate = handle_config['update_state_operate'] downside_sqls = handle_config['downside_sqls'] update_state_sqls = handle_config['update_state_sqls'] downside_fields = handle_config['downside_fields'] update_state_fields = handle_config['update_state_fields'] downside_patch_field_values = handle_config['downside_patch_field_values'] update_state_patch_field_values = handle_config['update_state_patch_field_values'] is_update_time = handle_config['sync_is_update_time'] # pdb.set_trace() try: table_field = eval(table_field) if not table_field: table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname) first_field = table_field[0] except Exception, e: self.logger.error('-----eval table_field error,config is error!-----') raise e if first_field == 'id': is_id = True else: is_id = False download_oprations = downside_operate.split('|') if 'file' in download_oprations: filename = self.createNewBlackListPath() handle_flag = self.handle_file_func(db_content, filename) return handle_flag # table_field = eval(table_field) try: is_update_time = int(is_update_time) except: self.logger.error('-----is_update_time config value error!-----') raise for db_item in db_content: if is_update_time: time_index = table_field.index('update_time') update_time = (str(datetime.datetime.today()).split('.')[0],) db_item = db_item[:time_index] + update_time + db_item[time_index + 1:] if is_id: rowdata = db_item[1:] else: rowdata = db_item # self.logger.debug(rowdata) # print dict(zip(self.phishing_log_fields,rowdata)) lenth = len(download_oprations) for oper in download_oprations: # here we get all the patched field value # '((fixed,true),(carry,None),(tansfer,None))', myindex = download_oprations.index(oper) fields_value = [] # pdb.set_trace() for i in range(len(downside_patch_field_values[myindex])): val = downside_patch_field_values[myindex][i] if val[0] == 'fixed': pass elif val[0] == 'carry': pass elif val[0] == 'transfer': field_name = downside_fields[myindex][i] v_index = table_field.index(field_name) tf_value = db_item[v_index] fields_value.append(tf_value) pass else: self.logger.error('-----server downside_patch_field_values Error! valuse: %s------' % str( downside_patch_field_values)) # pdb.set_trace() if fields_value: d_sql, f_val = self.pre_handle_None_value(downside_sqls[myindex], self.format_field_value(fields_value)) db_sql = self.format_sql(d_sql, f_val) else: db_sql = downside_sqls[myindex] # pdb.set_trace() BD_ERROR = False with self.lock: if oper == 'insert': self.db.insertdb(db_sql) BD_ERROR = self.db.Error if oper == 'update': self.db.updatedb(db_sql) BD_ERROR = self.db.Error if oper == 'delete': self.db.deldb(db_sql) BD_ERROR = self.db.Error if not BD_ERROR: break else: continue else: return True def format_tuple(self, tup): ''' It is None if field in DB is NULL when we get the data from db use mysqldb! Format the None to NuLL for inserting data to DB ''' vluelist = ['NULL' if t is None else t for t in tup] padlist = ['%s' if t is None else '\'%s\'' for t in tup] padstr = '' for pl in padlist: padstr += pl padstr += ',' else: padstr = padstr[:-1] return padstr % tuple(vluelist) def format_sql(self, patch_sql, patch_field_value): if isinstance(patch_sql, str) and isinstance(patch_field_value, tuple): try: res_sql = patch_sql % patch_field_value except: res_sql = None return res_sql else: self.logger.error('-----formate_sql args type error-----') raise exceptions.TypeError def format_field_value(self, field_value): # we neeed hanle the ' or " in the mysql statement res_list = list() for val in field_value: if isinstance(val, unicode): val = val.encode('utf-8') if isinstance(val, str): f_val = val.replace('\'', '\\\'').replace('\"', '\\\"') else: f_val = val res_list.append(f_val) return tuple(res_list) def get_all_sub_str_index(self, index_str, sub_str, none_indexs): # print index_str index_list = [] start_index = 0 cnt = 0 while True: try: tmp_index = index_str.index(sub_str, start_index) except: break else: if cnt in none_indexs: index_list.append(tmp_index) start_index = tmp_index + len(sub_str) cnt += 1 return tuple(index_list) def pre_handle_None_value(self, patch_sql, field_values): # get all the None value index None_indexs = [] for i in range(len(field_values)): if field_values[i] is None: None_indexs.append(i) if None_indexs: # get '%s' indexs s_indexs = self.get_all_sub_str_index(patch_sql, "'%s'", None_indexs) str_list = list(patch_sql) # pdb.set_trace() subtraction_index = 0 for ix in s_indexs: print subtraction_index str_list.pop(ix - subtraction_index) # print str_list[ix-subtraction_index] str_list.pop(ix - subtraction_index + 2) subtraction_index += 2 replace_str = ''.join(str_list) # pdb.set_trace() # print replace_str # pdb.set_trace() res_field_values = ['NULL' if f_val is None else f_val for f_val in field_values] return replace_str, tuple(res_field_values) else: return patch_sql, field_values def __handler_read_event(self, FileNo): self.logger.info('Start handle the recieved data...') # 對接受到數據做業務處理 try: self.__deal_business(FileNo) except Exception, e: self.logger.error('__deal_business Exception: %s' % e) # self.logger.debug(datetime.datetime.now()) try: self.epoll.modify(FileNo, select.EPOLLOUT | select.EPOLLET | select.EPOLLONESHOT) except: pass self.logger.error('Deal_business ERRor') else: self.modify_revent_to_wevent(FileNo) self.logger.info('Handle the recieved data End!') # content = dict(sync_point=self.detect_piont_name,sync_point_no = self.detect_piont_serial_number) # message_info = (Communication_Packet_Flags.DOLOAD_DB,str(content),True,table_name,table_field) # error_info = (False,None,None) # message_status = (Communication_Packet_Flags.RESPONSE,None,str(datetime.datetime.now()),True) def __handler_write_event(self, FileNo): # if errorInfo is not null,we send Error to Client else handing write business Error_Info = None try: Error_Info = self.errorInfo[FileNo] # reset error info to None self.errorInfo[FileNo] = '' except: # 說明socket已從列表注銷和直接退出程序 self.logger.info('This socket is removed from error info list!') return error_info = (False, None, None) if Error_Info: print Error_Info # using debug error_info = (True, Error_Info[0], Error_Info[1]) response = self.responseInfo[FileNo] # need reset the response info self.responseInfo[FileNo] = '' res_info = (None, response, None, None, False, None) info_states = (Communication_Packet_Flags.RESPONSE, None, None, True) message = self.packaging_message(res_info, error_info, info_states) self.logger.debug(message) self.send_message(FileNo, message, True) # send the message to client self.modify_wevent_to_revent(FileNo) # modify the event def modify_wevent_to_revent(self, FileNo): ''' If we trigger the read envet,we use this function ''' try: # We need modify event to read event! self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLIN | select.EPOLLONESHOT) except: pass def modify_revent_to_wevent(self, FileNo): ''' If we trigger the write envet,we use this function ''' try: self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLOUT | select.EPOLLONESHOT) except: pass def send_message(self, FileNo, message, blocking=True): # if message is big,use noblocking it will occur error! # so, we maybe set it to blocking if blocking: self.connections[FileNo].setblocking(True) if FileNo not in self.connections: self.logger.debug('This socket not in the connections list!') return try: self.connections[FileNo].sendall(message) except Exception, e: pass # last we need to set it to False! we use the noblocking module if blocking: self.connections[FileNo].setblocking(False) def start_server(self): try: self.__init_server() # 初始化服務器 except: # 初始化服務器錯誤 self.logger.critical('Init server Error...') raise while True: # join the thread self.__jionthreads() # Wait for at least one of the sockets to be ready for processing self.logger.info('waiting for the next event') events = self.epoll.poll() for fileno, event in events: # Handle inputs if fileno == self.server.fileno(): try: while True: connection, address = self.server.accept() connection.setblocking(0) # 設置連接為非阻塞模式 # Here we can not use select.EPOLLONESHOT flag.This flag self.epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLET) # 把新來的連接同樣設置為邊緣出發模式 self.connections[connection.fileno()] = connection # 記錄連接 self.requests[connection.fileno()] = '' # 記錄業務請求 self.addresses[connection.fileno()] = address # 記錄連接地址 self.errorInfo[connection.fileno()] = '' self.responseInfo[connection.fileno()] = '' # 設置錯誤信息如果為空串則是無錯誤信息 self.logger.info('========================================') self.logger.info('Client %s:%s connected server' % (address)) except socket.error: pass # elif event & (select.EPOLLIN | select.EPOLLONESHOT): elif event & select.EPOLLIN: # Read data from socket untill data is recieved over! self.logger.debug('EVENT EPOLLIN: %s' % hex(event)) # pdb.set_trace() try: r_flag = self.__read_from_socket(fileno) except socket.error: pass except Exception, e: # if we catch other Exception, it is to say that we recieved data from client Error! # We need send error data to client! self.logger.warning('Catch other exception when recieve data!') self.errorInfo[fileno] = ( Error_Info_Flags.Receive_Data_Error, '-----Server recieved data Error!-----') self.modify_revent_to_wevent(fileno) else: # if it has no exception when eval the data, we think that client data is recieved over. # #then start a new thread to deal with the client data # if not r_flag: # print '#################################' # pass # else: if self.requests[fileno]: # Start a new thread to disposal the client requests # self.logger.debug(self.requests[fileno]) if self.requests[fileno].endswith(self.EOF): newthread = threading.Thread(target=self.__handler_read_event, args=(fileno,)) newthread.daemon = True newthread.start() print 'start %s' % newthread.name # print 'print start new thread' else: # 沒有從客戶端讀取到數據,說明客戶端已經關閉,主動掛斷 # self.logger.info("closing %s %s (HUP)" % self.addresses[fileno]) self.__delevnet(fileno) # elif event & (select.EPOLLOUT | select.EPOLLONESHOT): elif event & select.EPOLLOUT: self.logger.debug('EVENT EPOLLOUT: %s' % bin(event)) # Write event happened,we use proxy to deal # print 'Current file descripter: %d' % fileno self.__proxy( fileno) # We neet a proxy using a function,but not a threadiing! If threading it has bugs(multi trigger event--I think) elif event & select.EPOLLHUP: self.logger.debug('EVENT EPOLLHUP: %s' % bin(event)) # Client hung up, del event! self.logger.info("closing %s %s (HUP)" % self.addresses[fileno]) self.__delevnet(fileno) elif event & select.EPOLLERR: # self.logger.debug('EVENT: %s' % event) self.logger.info(" exception on %s" % connections[fileno].getpeername()) self.__delevnet(fileno) else: # self.logger.debug('EVENT: %s' % bin(event)) # Other event,do not handle pass if __name__ == '__main__': # pdb.set_trace() myserver = Server() myserver.start_server()

# /usr/bin/env python # coding:utf-8 import socket import sys, os import threading import time import logging import multiprocessing import random import datetime import hashlib # hashlib.md5(open(fileName,'rb').read()).hexdigest() import pdb sys.path.append('../') import exceptions # 導入任務調度模塊 from apscheduler.schedulers.blocking import BlockingScheduler from sqldb import SQLdb from mylog import MyLog as Log from communication_packet import Communication_Packet class Client(object): # Handle message type HANDLE_GENERAL = 1 # 處理普通的應答包 HANDLE_INSERT = 2 # 所有數據安裝傳遞過來的進行數據庫插入操作 HANDLE_UPDATE = 3 # 對傳遞過來的數據進行更新操作,需要傳遞更新條件 HANDLE_INERT_UPDATE = 4 HANDLE_FILE = 5 def __init__(self, IP='112.33.9.154', Port=11366, blackdir=None, db=None): self.tasksched = BlockingScheduler() # 任務調度器 self.serverIP = IP # 設置服務器IP self.serverPort = Port # 設置服務器端口 if db is not None: self.db = db else: self.db = SQLdb() # 初始化數據庫用於數據庫同步 self.lock = threading.Lock() # 使用默認模式:debug模式 self.log = Log() self.log.openConsole() # 打開控制端輸出 self.logger = self.log.getLog() self.EOF = '\n\r\n' # Set EOF flag if blackdir is None: self.basepath = './blacklist/' else: self.basepath = blackdir self.sysdbFirstFlag = False self.key = None # using to calculate token self.encryption = None self.detect_piont_name = 'xxxx' self.detect_piont_serial_number = 'xxxxxx' def set_DB(self, host=None, username=None, password=None, dbname=None): self.db = SQLdb(host, username, password, dbname) def set_first_synclog_flag(self, flag): self.synclog_flag = flag def setBlacklistDir(self, filedir=None): # Set blacklist dir if filedir is None: self.basepath = './blacklist/' else: self.basepath = filedir def createNewBlackListPath(self): # blacklistdir if not exists,create it if os.path.exists(self.basepath): pass else: try: os.mkdir(self.basepath) except: raise nowtime = datetime.datetime.now().strftime('%Y_%b_%d_%H_%M_%S') filename = 'blacklist_' + nowtime + '.txt' # 根據文件擴展名 filepath = self.basepath + filename return filepath def handle_file_func(self, content, filename): try: content_data = eval(db_content) except Exception, e: self.logger.error('-----handle_file_func: eval business_content error!-----') return False else: # Open file for write data try: w_file = file(filename, 'w') for data_item in content_data: w_file.write(str(data)) else: w_file.close() except Exception, e: self.logger.error('-----handle_file_func: write data to file Error!------') return False else: return True def reap(self): # 回收可回收的進程,使用多進程的時候調用...可能不用 while True: try: result = os.waitpid(-1, os.WNOHANG) if not result[0]: break except: break self.logger.info("reaped child process %d" % result[0]) def __connect_server(self, IP, Port): ''' 連接遠程服務器反正通訊套接字 ''' # Creat a TCP/IP socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Connet the socket to the port where the server is listening server_address = (IP, Port) self.logger.info('connecting to %s port %s' % server_address) try: sock.connect(server_address) except: self.logger.error('connecting to %s port %s error!' % server_address) raise return sock def __get_tasks(self): # Get task from db,return an list def scheduler_tasks(self, task_list=None): # start scheduler to sched the tasks pass def calculateTime(self, starttime=None, intervalMinutes=None): if not starttime: nowtime = datetime.datetime.now() else: nowtime = starttime if intervalMinutes: interval = datetime.timedelta(minutes=intervalMinutes) return nowtime - interval else: return nowtime def calc_md5(self, data): return hashlib.md5(data)).hexdigest() def validating_message_token(self, receving_data): pre_md5 = receving_data[:16] suffix_md5 = receving_data[-16:] message_md5 = pre_md5 + suffix_md5 message = receving_data.lstrip().rstrip(suffix_md5) cur_md5 = self.calc_md5(message) if message_md5 == cur_md5: return True, message else: return False, message pass def read_sync_state(self): qeury_sql = "select * from sync_state;" DB_ERROR = False with self.lock: sync_status = self.db.fechdb(query_sql) DB_ERROR = self.db.Error if DB_ERROR: raise if sync_status: sync_status_dict = dict() for sync_s in sync_status: sync_status_dict[sync_s[0]] = sync_s return sync_status_dict else: return None def read_sync_contol_configure(self, read_flag=False): # Read the control configuration from loacal db,if have not,we sync it from server,then read it again qeury_sql = "select * from sync_control;" DB_ERROR = False # set table name table_name = 'sync_control' # get `sync_control` table fields table_field = self.get_table_filed(table_name, self.lock, self.db) if not table_field: self.logger.error('----------' % table_name) return None with self.lock: control_configure = self.db.fechdb(qeury_sql) DB_ERROR = self.db.Error if DB_ERROR: self.logger.error('-----Get control configure Error!-----') return None if control_configure: # Get the configure from db! and On the basis of classification of table name and sync type(uploat or download) # format the configure to a list control_configure = [] for iter_conf in control_configure: control_configure_item = dict.fromkeys(table_field) lenth = len(table_field) # set value for everyone key for i in range(lenth): control_configure_item[table_field[i]] = iter_conf[i] control_configure.append(control_configure_item) return control_configure else: # we need get the configuration from the server! reload the configuration! if read_flag: # if we read it again and no configure,return return None # sysnc the sync_control table from the server! self.logger.info('=====Start to init the sync control table from Server...=====') try: socket = self.__connect_server(self.serverIP, self.serverPort) # 連接服務器 except Exception, e: # raise Exception raise e # we need carry the detect point number to server! content = dict(sync_point=self.detect_piont_name, sync_point_no=self.detect_piont_serial_number) message_info = (Communication_Packet.DOLOAD_DB, str(content), True, table_name, table_field) error_info = (False, None, None) message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status) try: socket.sendall(send_message) except, Exception, e: # 發送出現錯誤則直接返回終止此次業務請求 self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----') return self.download_db(socket, self.HANDLE_INSERT) self.logger.info('=====End init the sync control table from Server!=====') # After get control table we need read it from the local table again! self.read_sync_contol_configure(True) def start_tasks(self): control_config = self.read_sync_contol_configure() thread_list = [] if control_config: for config in control_config: newthread = threading.Thread(target=self.thread_task, args=(config)) newthread.setDaemon = True newthread.start() thread_list.append(newthread) for t in thread_list: t.join() else: self.logger.error('-----init the sync control configuration error!-----') raise def thread_task(self, task_config): # init an instance of sheduler my_scheduler = BlockingScheduler() # self.tasksched.add_job(self.synchronous_DB, 'interval', minutes = 15,max_instances = 1) ''' --cron-- Parameters: year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) ''' if task_config['sync_type'] == 'upload': if task_config['sync_time_mode'] == 'period': period_minutes = task_config['sync_time_value'] my_scheduler.add_job(self.syn_upload_db, 'interval', minutes=15, max_instances=1, args=task_config) my_scheduler.start() elif task_config['sync_time_mode'] == 'fixed_time': try: hours, minutes, seconds = task_config['sync_time_value'].split(':') except: self.logger.error('-----get sync_time_value Error!-----') raise e else: my_scheduler.add_job(self.syn_upload_db, 'cron', year='*', month='*', day='*', hour=hours, minute=minutes, second=seconds, max_instances=1, args=task_config) my_scheduler.start() else: self.logger.error('----sysnc control config error-----') raise elif task_config['sync_type'] == 'download': if task_config['sync_time_mode'] == 'period': period_minutes = task_config['sync_time_value'] my_scheduler.add_job(self.sync_download_db, 'interval', minutes=15, max_instances=1, args=task_config) my_scheduler.start() elif task_config['sync_time_mode'] == 'fixed_time': try: hours, minutes, seconds = task_config['sync_time_value'].split(':') except: self.logger.error('-----get sync_time_value Error!-----') raise e else: my_scheduler.add_job(self.sync_download_db, 'cron', year='*', month='*', day='*', hour=hours, minute=minutes, second=seconds, max_instances=1, args=task_config) my_scheduler.start() else: self.logger.error('----sysnc control config error-----') raise else: self.logger.error('----sync_type error-----') raise def sync_download_db(self, config): # parsing the config to get the sql pass def syn_upload_db(self, config): pass def validating_content_token(self, content): receive_content = content['Business']['Content'] receive_md5 = content['Info_Status']['Security_Token'] receive_time = content['Info_Status']['Time'] cur_md5 = self.calc_md5(receive_content + receive_time + self.key) if cur_md5 == receive_md5: return True else: return False pass # encrypt decrypt def packaging_message(self, Message=None, Error=None, Info_Status=None): # ----Pack send message # Init Communication_Packet cm_packet = Communication_Packet() # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True): cm_packet.set_content_Business(targv=Message) # def set_content_Error(self,error_flag = False,error_type = None,error_info = None): cm_packet.set_content_Error(Error) now_time = str(datetime.datetime.now()) # get current time # Business+time+key calculate token,calculate token security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key) # def set_content_Info_Status(self,info_type,security_token,time,is_end): # Need to replace the security_token Info_Status = list(Info_Status) # 轉化元組到列表 Info_Status[1] = security_token Info_Status = tuple(Info_Status) # 重新轉化列表到元組作為參數 cm_packet.set_content_Info_Status(targv=Info_Status) try: send_data = cm_packet.content except Exception, e: raise e else: # we Encryption data encryption_send_data = self.encryption.encrypt(send_data) # caculate md5 encrypt_send_md5 = self.calc_md5(encryption_send_data) complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF return complete_send_data def unpackaging_message(self, unpacking_str): if not instances(unpacking_str, str): raise exceptions.ValueError else: unpacking_str = unpacking_str.strip(self.EOF) flag, message = self.validating_message_token(unpacking_str) if flag: decrypt_str = self.encryption.decrypt(message) try: message_dict = eval(decrypt_str) except, Exception, e: self.logger.error('Eval decrypt_str Error!') raise e else: if validating_content_token(message_dict): return message_dict else: self.logger.error('Message is tampered!') return None pass else: self.logger.error('Message is tampered!') return None pass def recieve_packet(self, communication_socket): # 接收數據從服務器 recieveData = None partdata = None try: while partdata is not None: try: partdata = communication_socket.recv(4096) except Exception, e: self.logger.error(str(e)) break else: recieveData += partdata if partdata.endswith(self.EOF): break recieveData = recieveData.rstrip(self.EOF) return recieveData except Exception, e: raise e pass def insertdb_updatedb(self, handle_type, db_content, table_name, table_field, filter_condition=None): ''' add ''' try: db_data = eval(db_content) except Exception, e: self.logger.error('-----eval business_content error!-----') raise e else: try: first_field = eval(table_field)[0] except Exception, e: self.logger.error('-----eval table_field error!-----') raise e if first_field == 'id': is_id = True else: is_id = False for db_item in db_data: if is_id: rowdata = db_item[1:] else: rowdata = db_item self.logger.debug(rowdata) # print dict(zip(self.phishing_log_fields,rowdata)) if handle_type == self.HANDLE_INSERT or handle_type == self.HANDLE_INERT_UPDATE: insert_sql = 'insert into %s %s values(%s);' % ( table_name, table_field.replace('\'', ''), self.format_tuple(rowdata)) # print insert_sql DB_ERROR = False with self.lock: self.db.insertdb(insert_sql) DB_ERROR = self.db.Error if DB_ERROR: if handle_type == self.HANDLE_INERT_UPDATE: pass elif handle_type == self.HANDLE_INSERT: pass else: pass elif handle_type == self.HANDLE_UPDATE: # update the data pass else: pass else: return True pass def set_filter(self, filter, table_field, filter_key, filter_relationship): ''' Func: set the filter for update the db when when handling the message if the handle_flag is set HANDLE_INERT_UPDATE! Note: all the filter condithion are needed to be ordered! table_field: filter condithion field filter_key: = >= <= != if has like,like|>=... And so on filter_relationship: and、or、None ''' if not filter: filter = dict(filter=[]) else: if 'filter' not in filter.keys(): filter['filter'] = [] filter['filter'].append(table_field, filter_key, filter_relationship) return filter def set_update_field(self, filter, update_field): ''' Set the update fileds when update db! update_field:update filed list! ''' filter['update_field'] = update_field def parsing_filter(self, filter, table_name, table_fields, table_values, filter_field_values=None): ''' Func: return the update sql ''' update_values = [] update_fields = filter['update_field'] for field in update_fields: v_index = table_fields.index[field] up_value = table_values[v_index] update_values.append(up_value) update_sql_part_one = 'update %s set ' % table_name lenth = len(update_values) for i in range(lenth) up_value = update_values[i] up_field = update_fields[i] update_sql_part_one += '%s=\'%s\',' % (up_field, up_value) # strip the last comma result like: update tablea set a = '1',b = '2' update_sql_part_one = update_sql_partone.rstrip(',') update_sql_part_two = 'where ' for tmp_item in filter['filter']: field = tmp_item[0] filter_key = tmp_item[1].splite['|'] relationship = tmp_item[2] # get filter condithion value. For example: where left(ip_loc,2) = 'China' if field in filter_field_values: filter_condition_value = filter_field_values[field] else: v_index = table_fields.index[field] filter_condition_value = table_values[v_index] if not relationship: relationship = '' if '(' in filter_key: update_sql_part_two += "%s%s %s " % (filter_key, filter_condition_value, relationship) else: update_sql_part_two += "%s%s%s %s" % (field, filter_key, filter_condition_value, relationship) # merge the sql statement update_sql = update_sql_part_one + update_sql_part_two.strip() return update_sql def handle_message__packet(self, socket, handle_type=None, filter=None, handle_callback=None, is_recieved=False, message_dict=None): ''' socket: communication socket handle_type: handle message type(HANDLE_GENERAL,HANDLE_INSERT,HANDLE_UPDATE) filter: if handle_type is HANDLE_UPDATE,it is the update filter condithion handle_callback: default is None!if not None,it will use the handle_callback to handle the message is_recieved: default is False,if message is received. message_dict: if is_recieved is True,it is the received message_dict Note: if use some args,you need to use 'key = vale' to passing parameters ''' if not is_recieved: # if not recieved message,we receive the message and unpack the message try: message = self.recieve_packet(socket) except Exception, e: self.logger.error(str(e)) else: message_dict = self.unpackaging_message(message) if message_dict: if handle_callback: # if callback is not None,use the handle_callback to hanle the message handle_callback(message_dict) else: # Pasing packet and get information # get bussiness type business = response_dict['Business'] business_type = business['Type'] business_content = business['Content'] business_is_syncdb = business['Is_Syncdb'] business_table_name = business['Table_Name'] business_table_field = business['Table_Field'] # get error info Error_info = response_dict['Error'] error_flag = Error_info['Is_Error'] error_type = Error_info['Error_Type'] error_info = Error_info['Error_Info'] # get packet info status info_status = response_dict['Info_Status'] info_type = info_status['Info_Type'] is_end = info_status['Is_End'] token = info_status['Security_Token'] info_time = info_status['Time'] if handle_type == self.HANDLE_GENERAL: # This packet is GENERAL communication! hanling upload_db response message to make sure that server handle the business if communication_type == communication_packet.GENERAL_INFO and business_content == communication_packet.HANDLE_OK and info_type == communication_packet.RESPONSE return True, is_end elif error_flag: self.logger.error( 'Message\'type is \'%s\' and message\'error_info is \'%s\'' % (error_type, error_info)) return False, is_end else: self.logger.error('This message packet is not the general message!') return False, is_end pass elif handle_type == self.HANDLE_INSERT and info_type == communication_packet.RESPONSE: handle_flag = self.insertdb_updatedb(self.HANDLE_INSERT, business_content, business_table_name, business_table_field) return handle_flag, is_end pass elif handle_type == self.HANDLE_UPDATE and info_type == communication_packet.RESPONSE: return False, is_end pass elif handle_type == self.HANDLE_INERT_UPDATE and info_type == communication_packet.RESPONSE: handle_flag = self.insertdb_updatedb(self.HANDLE_INERT_UPDATE, business_content, business_table_name, business_table_field, filter) return handle_flag, is_end elif handle_type == self.HANDLE_UPDATE and info_type == communication_packet.RESPONSE: handle_flag = self.insertdb_updatedb(self.HANDLE_UPDATE, business_content, business_table_name, business_table_field) return handle_flag, is_end elif handle_type == self.HANDLE_FILE and info_type == communication_packet.RESPONSE: handle_flag = self.handle_file_func(business_content) return handle_flag, is_end pass else: # 沒有處理請求包,因為服務器不會主動發起請求 return False, is_end pass else: raise Exception('handle_response_packet\'message_dict is None!') pass def upload_db(self, communication_socket, table_name, query_sql=None, ALL=True, table_field=None, Error_info=None): # Get the dbinfo which is sended # if db_data is Relatively large,it will raise exceptions,so you need contol the data DB_ERROR = False DB_CNT_ERROR = False if query_sql: with self.lock: db_data = self.db.fechdb(query_sql) DB_ERROR = self.db.Error # If query db error return None if DB_ERROR: self.logger.error('Query DB Error! Crrent query_sql: %s' % query_sql) return None if not db_data: # 如果沒數據則不需要向服務器傳輸數據,直接返回None self.logger.info('No Data need to be upload! table_name: %s' % table_name) return None # Set packet args # def set_content_Business(self,b_type = Communication_Packet.GENERAL_INFO,b_content = Communication_Packet.HANDLE_OK,table_name = None,table_field = None,b_is_syncdb = False,targv = None): message = (Communication_Packet.UPLOAD_DB, str(db_data), table_name, table_field, True) # def set_content_Error(self,error_flag = False,error_type = None,error_info = None,targv = None): error_info = (False, None, None) # def set_content_Info_Status(self,info_type,security_token,time,is_end,targv = None): info_status = (Communication_Packet.REQEST, None, ALL) complete_send_data = self.packaging_message(message, error_info, info_status) try: communication_socket.sendall(complete_send_data) except, Exception, e: self.logger.error('Send data error when upload_db!') raise e else: if ALL: return True else: # 對應答包進行處理進行處理 try: handle_flag, business_is_end = self.handle_general_response_packet(communication_socket) except Exception, e: raise e else: if handle_flag and business_is_end: return True pass else: return False pass else: raise Exception('query_sql statement is None') def download_db(self, communication_socket, handle_type=None, handle_message_callback=None, filter=None): # We receive data from server try: recieveData = self.recieve_packet(communication_socket) except Exception, e: self.logger.error('Download_db Error when receiving data!') raise e # unpacking message Message_dict = self.unpackaging_message(recieveData) if not message: raise Exception('download_db Message_dict Error!') if handle_message_callback: handle_flag = handle_message_callback(Message_dict) else: is_handle_complete, is_handle_again = self.handle_message__packet(communication_socket, is_recieved=True, message_dict=Message_dict, filter=filter) # parsing received data to dict # 這里需要用遞歸處理數據 if is_handle_complete and not is_handle_again: self.download_db(communication_socket, handle_message_callback, filter) else: # 關閉socket連接,退出函數 communication_socket.close() pass def format_tuple(self, tup): ''' It is None if field in DB is NULL when we get the data from db use mysqldb! Format the None to NuLL for inserting data to DB ''' valuelist = ['NULL' if t is None else t for t in tup] padlist = ['%s' if t is None else '\'%s\'' for t in tup] padstr = '' for pl in padlist: padstr += pl padstr += ',' else: padstr = padstr[:-1] return padstr % tuple(valuelist) def sync_log_db(self, Start_Time=None, End_Time=None, Sync_All=False, ID=None): # pdb.set_trace() ''' sync log db to Server! If error when sync log,it will return but not raise an exception! If raise an exception,it will impact tha main program ''' self.logger.info('=====Start to sysnc log=====') try: # connect to server,get and communication socket sock = self.__connect_server(self.serverIP, self.serverPort) except: raise last_update_id = None # updating update_id # query log db to get some info if Start_Time and End_Time: if datetime.datetime.strptime(Start_Time, '%Y-%m-%d %H:%M:%S') > datetime.datetime.strptime(End_Time, '%Y-%m-%d %H:%M:%S'): self.logger.error('-----sync_log_db argv\'Start_Time and End_Time is error!,End this task-----') # raise Exception('Start_Time and End_Time Error!') return query_cnt_sql = "select count(id),min(id),max(id) from phishing_log where time between '%s' and '%s';" % ( Start_Time, End_Time) # query_log_sql = "select * from phishing_log where time between '%s' and '%s';" %(Start_Time,End_Time) elif Sync_All: query_cnt_sql = "select count(1),min(id),max(id) from phishing_log;" # query_log_sql = 'select * from phishing_log;' elif ID: query_cnt_sql = "select count(1),min(id),max(id) from phishing_log where id > %s;" % ID # query_log_sql = "select * from phishing_log where id > %s;" % ID else: query_update_id_sql = "select last_update_id,last_update_time from ph_log_sync_id;" DB_ERROR = False with self.lock: ID_Tuple = self.db.fechdb(query_update_id_sql) DB_ERROR = self.db.Error if DB_ERROR: DB_ERROR = False self.logger.error('-----Get id from ph_log_sync_id Error. End this task----') last_update_id = ID_Tuple[0][0] query_cnt_sql = "select count(1),min(id),max(id) from phishing_log where id > %s;" % last_update_id # query_log_sql = "select * from phishing_log where id > %s;" % last_update_id # get table fields! log_fields = self.get_table_filed(table_name, self.lock, self.db) if not table_field: self.logger.error( '-----Terminate this task(sync_log_db),becase of getting the %s table fileds fialed!-----' % table_name) return with self.lock: data_cnt = self.db.fechdb(query_cnt_sql) DB_ERROR = self.db.Error if DB_ERROR: # Record Error into log and end task DB_ERROR = False self.logger.error('-----Get log data count Error when sys log db! End this task-----') return else: # 若果數據超過10000條則分批進行同步處理 cnt, min_id, max_id = data_cnt[0] upload_cnt = 1 if cnt >= 10000: upload_cnt = (max_id - min_id + 1) / 1000 + 1 # def upload_db(self,communication_socket,table_name,query_sql = None,ALL = True,table_field = None,Error_info = None): ALL = False log_table_name = 'phishing_log' for i in range(upload_cnt): start_id = min_id end_id = min_id + 10000 if end_id >= max_id: end_id = max_id ALL = True query_log_sql = "select * from phishing_log where id >%s and id <=end_id" % (start_id, end_id) try: handle_ok = self.upload_db(sock, log_table_name, query_log_sql, ALL, log_fields, None) except Exception, e: self.logger.error('-----' + str(e) + 'when sysnc db!-----') return if handle_ok: continue else: self.logger.error("-----upload_db Error! And query_log_sql is '%s'.End this task-----" % query_log_sql) break else: # update update_id if last_update_id: cur_time = datetime.datetime.now() update_id_sql = "update ph_log_sync_id set last_update_id = '%s',last_update_time = '%s' where last_update_id = %s;" % ( max_id, cur_time, last_update_id) BD_ERROR = False with self.lock: self.db.updatedb(update_id_sql) BD_ERROR = self.db.Error if BD_ERROR: inset_id_sql = "insert into ph_log_sync_id valuse(%s,%s)" % (max_id, cur_time) with self.lock: self.db.insertdb(inset_id_sql) BD_ERROR = self.db.Error if BD_ERROR: self.logger.error( '-----This sys log db error when update last_update_id! Old id is %s,new id is %s! End this task' % ( last_update_id, max_id) - ----) else: pass else: pass self.logger.info('=====End to sysnc log=====') def gain_newconfig_from_server(self): ''' Get newest configure which is the fishing website regular expressions from server! ''' self.logger.info('=====Start to gain newcofig from Server...=====') try: socket = self.__connect_server(self.serverIP, self.serverPort) # 連接服務器 except Exception, e: # raise Exception raise e # 查詢數據庫從數據庫獲取當前配置的版本信息 query_version_sql = "select max(version),update_time from detection_version;" BD_ERROR = False with self.lock: version_tuple = self.db.fechdb(query_version_sql) DB_ERROR = self.db.Error if BD_ERROR: self.logger.error('-----Get local configure version Error! End this task-----') return local_version = version_tuple[0][0] local_update_time = version_tuple[0][1] table_name = 'detection_version' # Get detection_version fields # If get fields fialed,it will terminate this task! table_field = self.get_table_filed(table_name, self.lock, self.db) if not table_field: self.logger.error( '-----Terminate this task(gain_newconfig_from_server),becase of getting the %s table fileds fialed!-----' % table_name) return message_info = (Communication_Packet.DOLOAD_DB, str(local_version), True, table_name, table_field) error_info = (False, None, None) message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status) try: socket.sendall(send_message) except, Exception, e: # 發送出現錯誤則直接返回終止此次業務請求 self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----') return self.download_db(socket, self.HANDLE_INSERT) self.logger.info('=====End to gain newcofig from Server!=====') def get_detection_point_num(self): # Get detection_point_num from file or db and return it! # Reading the config file to get he detection_point_num! # wait to handle... detection_point_num = None is_detection_point = None return tdetection_point_num, is_detection_point def get_table_filed(self, table_name, db, lock): # from the db get the table field! query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s';" % table_name with self.lock: detection_version_fields = self.db.fechdb(query_detection_version_field_sql) DB_ERROR = self.db.Error if DB_ERROR: # Record Error and end task DB_ERROR = False self.logger.error('----Get %s fileds Error! End this task-----' % table_name) return else: # query result is Unicode,so we need to encode to utf-8 table_field = [field.encode('utf-8') for field in detection_version_fields[0]] return table_field def sync_info_db(self): ''' 1、Get blacklist from server and write it to file for linkage equipment 2、Sync info table to client point Use a way that depends on the config file ''' self.logger.info('=====Start to get blacklist from Server=====') # Get communication socket try: sock = self.__connect_server(self.serverIP, self.serverPort) # connect to server except Exception, e: self.logger.error(str(e)) raise is_detection_point, point_num = self.get_detection_point_num() # Need to handle again----- table_name = 'phishing_info' if is_detection_point: table_field = ('phishing_site') else: # get the table field from the db table_field = self.get_table_filed(table_name, self.lock, self.db) if not table_field: self.logger.error( '-----Terminate this task(gain_newconfig_from_server),becase of getting the %s table fileds fialed!-----' % table_name) return # packing the request packet message_info = (Communication_Packet.DOLOAD_DB, str(point_num), True, table_name, table_field) error_info = (False, None, None) message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status) try: socket.sendall(send_message) except, Exception, e: # if send data error,end this task! self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----') return if is_detection_point: # If detection piont we update the phishing_info self.download_db(socket, self.HANDLE_INSERT) else: # write the data to file! self.download_db(socket, self.HANDLE_FILE) self.logger.info('=====End to get phishing_info from Server!=====') def sync_white_list(self, sync_flag=0): ''' sync_flag: 0、first upload the white list then download the white list from server 1、only upload the white list 2、only download the white list ''' # Get communication socket try: sock = self.__connect_server(self.serverIP, self.serverPort) # connect to server except Exception, e: self.logger.error(str(e)) raise if sync_flag == 0 or sync_flag == 1: self.sync_client_white_list_to_server(sock) if sync_flag == 0 or sync_flag == 2: self.sync_server_white_list_to_client(sock) # close the network socket,end this task! sock.close() def sync_server_white_list_to_client(self, communication_socket): # pdb.set_trace() ''' Sync server white list to this detection point communication_socket: Network socket ''' self.logger.info('=====Start sync_server_white_list_to_client!=====') # get the last update white list time of this client! qurey_sql = "select max(update_time) from white_list where valid = true;" BD_ERROR = False with self.lock: loacal_last_update_time = self.db.fechdb(qurey_sql) BD_ERROR = self.db.Error if BD_ERROR: self.logger.error('-----Get white list last update time Error! End this task!-----') return if loacal_last_update_time: last_update_time = loacal_last_update_time[0][0] else: last_update_time = None table_name = 'white_list' table_field = self.get_table_filed(table_name, self.lock, self.db) if not table_field: self.logger.error( '-----Terminate this task(sync_server_white_list_to_client),becase of getting the %s table fileds fialed!-----' % table_name) return # packing the request packet message_info = (Communication_Packet.DOLOAD_DB, str(last_update_time), True, table_name, table_field) error_info = (False, None, None) message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status) try: socket.sendall(send_message) except, Exception, e: # if send data error,end this task! self.logger.error('-----When send the sync_server_white_list_to_client message request to server Error!-----') return # After send the request,start to download white list db data # update_sql = "update white_list set domain = '%s',valid = '%s',recorded = '%s',update_time = '%s',website_name = '%s',website_link = '%s',icp_code = '%s',organization = '%s'where domain = '%s';" % fielddata myfilter = dict() update_field = ( 'domain', 'valid', 'recorded', 'update_time', 'website_name', 'website_link', 'icp_code', 'organization') self.set_update_field(myfilter, update_field) # def set_filter(self,filter,table_field,filter_key,filter_relationship): self.set_filter('domain', '=', None) self.logger.debug(str(myfilter)) self.download_db(communication_socket, self.HANDLE_INSERT, filter=myfilter) def sync_client_white_list_to_server(self, communication_socket): # pdb.set_trace() ''' upload the local white list to server! communication_socket: network socket ''' self.logger.debug('=====Start sync_client_white_list_to_server!=====') # qeury the the lacal white list data which is needed to upload query_sql = "select * from white_list where update_time is NULL and valid = 1;" query_modified_sql = "select * from white_list where update_time is not NULL and valid = 0;" # get white_list table fields table_name = 'white_list' table_field = self.get_table_filed(table_name, self.lock, self.db) if not table_field: self.logger.error( '-----Terminate this task(sync_server_white_list_to_client),becase of getting the %s table fileds fialed!-----' % table_name) return # upload the new local added white list handle_ok = self.upload_db(sock, log_table_name, query_sql, False, log_fields, None) if handle_ok: # upload the loacl new modified white list handle_ok = self.upload_db(sock, log_table_name, query_modified_sql, True, log_fields, None) if handle_ok: self.logger.info('upload the loacl new modified white list OK!') self.logger.info('sync_client_white_list_to_server OK!') else: self.logger.error('-----UPLOAD the loacl new modified white list Error!-----') else: self.logger.error('' - ----UPLOAD the new local added white list Error!-----'') def task_scheduled(self): # muti process scheduling the task pass ) def start_client(self): ''' --interval-- Parameters: weeks (int) – number of weeks to wait days (int) – number of days to wait hours (int) – number of hours to wait minutes (int) – number of minutes to wait seconds (int) – number of seconds to wait start_date (datetime|str) – starting point for the interval calculation end_date (datetime|str) – latest possible date/time to trigger on timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations --cron-- Parameters: year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) add_job: Parameters: func – callable (or a textual reference to one) to run at the given time trigger (str|apscheduler.triggers.base.BaseTrigger) – trigger that determines when func is called args (list|tuple) – list of positional arguments to call func with kwargs (dict) – dict of keyword arguments to call func with id (str|unicode) – explicit identifier for the job (for modifying it later) name (str|unicode) – textual description of the job misfire_grace_time (int) – seconds after the designated run time that the job is still allowed to be run coalesce (bool) – run once instead of many times if the scheduler determines that the job should be run more than once in succession max_instances (int) – maximum number of concurrently running instances allowed for this job next_run_time (datetime) – when to first run the job, regardless of the trigger (pass None to add the job as paused) jobstore (str|unicode) – alias of the job store to store the job in executor (str|unicode) – alias of the executor to run the job with replace_existing (bool) – True to replace an existing job with the same id (but retain the number of runs from the existing one) ''' # 根據情況調度任務 # new_task_sched = BlockingScheduler() # new_task_sched.add_job(self.synchronous_DB, 'interval', minutes = 5,max_instances = 1) # new_task_sched.start() # print '====================' ''' self.tasksched.add_job(self.synchronous_DB, 'interval', minutes = 5,max_instances = 1) self.tasksched.add_job(self.get_newconfig, 'interval', days = 1,max_instances = 1) self.tasksched.add_job(self.get__new_blacklist, 'interval', days = 1,max_instances = 1) self.tasksched.add_job(self.upload_white_list, 'interval', days = 1,max_instances = 10) self.tasksched.add_job(self.updatewhitelist, 'interval', days = 1,max_instances = 10) ''' self.tasksched.add_job(self.synchronous_DB, 'interval', minutes=15, max_instances=1) self.tasksched.add_job(self.get_newconfig, 'interval', minutes=30, max_instances=1) self.tasksched.add_job(self.get__new_blacklist, 'interval', minutes=30, max_instances=1) self.tasksched.add_job(self.upload_white_list, 'interval', minutes=30, max_instances=1) self.tasksched.add_job(self.updatewhitelist, 'interval', minutes=30, max_instances=1) self.tasksched.start() def connecttest(self): self.logger.info('Start connect server...') sock = self.__connect_server(self.serverIP, self.serverPort) # 連接服務器 self.logger.info('connect server ok!') time.sleep(10) self.logger.info('close sock') sock.shutdown(socket.SHUT_RDWR) sock.close() def test(self): ''' debug ''' # pdb.set_trace() # self.setsysdbflag(True) # self.updatewhitelist() # self.sysdbFirstFlag = True # self.synchronous_DB() # self.get__new_blacklist() # self.get_newconfig() # self.connecttest() self.upload_white_list() # self.updatewhitelist() def muilti_test(self): jobs = [] get_black_list_thread = threading.Thread(target=self.get__new_blacklist) get_black_list_thread.daemon = True get_config_thread = threading.Thread(target=self.get_newconfig) get_config_thread.daemon = True upload_white_list_thread = threading.Thread(target=self.upload_white_list) upload_white_list_thread.daemon = True update_white_list_thread = threading.Thread(target=self.updatewhitelist) update_white_list_thread.daemon = True get_black_list_thread.start() jobs.append(get_black_list_thread) get_config_thread.start() jobs.append(get_config_thread) upload_white_list_thread.start() jobs.append(upload_white_list_thread) update_white_list_thread.start() jobs.append(update_white_list_thread) for t in jobs: t.join() if __name__ == '__main__': ''' 起進程出問題,所以只能起線程,還沒有找到具體原因 ''' # mydb = SQLdb(dbname = 'tmp') # myclient = Client(db = mydb) myclient = Client() if len(sys.argv) >= 2: try: flag = sys.argv[1] except: pass else: try: flag = flag.title() flag = eval(flag) except: raise 'ARGV ERROR!' else: if isinstance(flag, bool): myclient.setsysdbflag(flag) else: raise 'ARGV ERROR!' # myclient.test() # myclient.muilti_test() myclient.start_client() ''' jobs=[] for i in range(50): tmpclient = Client() newprocess = threading.Thread(target = Client.test,args=(tmpclient,)) #newprocess = multiprocessing.Process(target = Client.test,args=(tmpclient,)) #使用進程就會出錯,查了一些資料好像是說是bug newprocess.start() jobs.append(newprocess) for t in jobs: t.join() '''