I/O多路復用之epoll實戰


概念

IO多路復用是指內核一旦發現進程指定的一個或者多個IO條件准備讀取,它就通知該進程

通俗理解(摘自網上一大神)

這些名詞比較繞口,理解涵義就好。一個epoll場景:一個酒吧服務員(一個線程),前面趴了一群醉漢,突然一個吼一聲“倒酒”(事件),你小跑過去給他倒一杯,然后隨他去吧,突然又一個要倒酒,你又過去倒上,就這樣一個服務員服務好多人,有時沒人喝酒,服務員處於空閑狀態,可以干點別的玩玩手機。至於epoll與select,poll的區別在於后兩者的場景中醉漢不說話,你要挨個問要不要酒,沒時間玩手機了。io多路復用大概就是指這幾個醉漢共用一個服務員。

三個函數

1、select

進程指定內核監聽哪些文件描述符(最多監聽1024個fd)的哪些事件,當沒有文件描述符事件發生時,進程被阻塞;當一個或者多個文件描述符事件發生時,進程被喚醒。

當我們調用select()時:

  1 上下文切換轉換為內核態

  2 將fd從用戶空間復制到內核空間

  3  內核遍歷所有fd,查看其對應事件是否發生

  4  如果沒發生,將進程阻塞,當設備驅動產生中斷或者timeout時間后,將進程喚醒,再次進行遍歷

  5 返回遍歷后的fd

  6  將fd從內核空間復制到用戶空間

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
 
參數: 可接受四個參數(前三個必須)
rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition”
timeout: 超時時間

返回值:三個列表
 
select方法用來監視文件描述符(當文件描述符條件不滿足時,select會阻塞),當某個文件描述符狀態改變后,會返回三個列表
1、當參數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd並添加到fd_r_list中
2、當參數2 序列中含有fd時,則將該序列中所有的fd添加到 fd_w_list中
3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中
4、當超時時間為空,則select會一直阻塞,直到監聽的句柄發生變化
   當超時時間 = n(正整數)時,那么如果監聽的句柄均無任何變化,則select會阻塞n秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。

在服務端我們可以看到,我們需要不停的調用select, 這就意味着:

  1  當文件描述符過多時,文件描述符在用戶空間與內核空間進行copy會很費時

  2  當文件描述符過多時,內核對文件描述符的遍歷也很浪費時間

  3  select最大僅僅支持1024個文件描述符

參考:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.html

2、poll

參考:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html

3、epoll

參考:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html

epoll是select和poll改進后的結果,相比下epoll具有以下優點:

1、支持一個進程打開的socket描述符(FD)不受限制(僅受限於操作系統的最大文件句柄數)

select最大的缺陷就是單個進程所打開的FD是有一定限制的,它由FD_SETSIZE設置,默認值是1024,epoll並沒有這個限制,它所支持的FD上限是操作系統的最大文件句柄數,這個數字遠遠大於1024

2、I/O效率不會隨着FD數目的增加而線性下降

 epoll的解決方案在epoll_ctl函數中。每次注冊新的事件到epoll句柄中時,會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝一次

傳統的select/poll另一個致命弱點就是當你擁有一個很大的socket集合,由於網絡延時或者鏈路空閑,任一時刻只有少部分的socket是“活躍”的,但是select/poll每次調用都會線性掃描全部集合,導致效率呈現線性下降。epoll不存在這個問題,它只會對“活躍”的socket進行操作-這是因為在內核實現中epoll是根據每個fd上面的callback函數實現的,那么,只有“活躍”的socket才會主動的去調用callback函數,其他idle狀態socket則不會。在這點上,epoll實現了一個偽AIO

3、使用mmap加速內核與用戶空間的消息傳遞

epoll會在epoll_ctl時把指定的fd遍歷一遍(這一遍必不可少)並為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd

無論是select,poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必要的內存復制就顯得非常重要,epoll是通過內核和用戶空間mmap使用同一塊內存實現。

4、epoll的API更加簡單

用來克服select/poll缺點的方法不只有epoll,epoll只是一種Linux的實現方案。在freeBSD下有kqueue,而dev/poll是最古老的Solaris的方案,使用難度依次遞增。但epoll更加簡單。

epoll詳解(python中)

 Python中的select模塊專注於I/O多路復用,提供了select  poll  epoll三個方法(其中后兩個在Linux中可用,windows僅支持select),另外也提供了kqueue方法(freeBSD系統)

select.epoll(sizehint=-1, flags=0) 創建epoll對象

epoll.close()
Close the control file descriptor of the epoll object.關閉epoll對象的文件描述符

epoll.closed
True if the epoll object is closed.檢測epoll對象是否關閉

epoll.fileno()
Return the file descriptor number of the control fd.返回epoll對象的文件描述符

epoll.fromfd(fd)
Create an epoll object from a given file descriptor.根據指定的fd創建epoll對象

epoll.register(fd[, eventmask])
Register a fd descriptor with the epoll object.向epoll對象中注冊fd和對應的事件

epoll.modify(fd, eventmask)
Modify a registered file descriptor.修改fd的事件

epoll.unregister(fd)
Remove a registered file descriptor from the epoll object.取消注冊

epoll.poll(timeout=-1, maxevents=-1)
Wait for events. timeout in seconds (float)阻塞,直到注冊的fd事件發生,會返回一個dict,格式為:{(fd1,event1),(fd2,event2),……(fdn,eventn)}

事件:

    EPOLLERR = 8               ----發生錯誤
    EPOLLET = 2147483648       ----默認為水平觸發,設置該事件后則邊緣觸發
    EPOLLHUP = 16              ----掛起狀態
    EPOLLIN = 1                ----可讀
    EPOLLMSG = 1024            ----忽略
    EPOLLONESHOT = 1073741824  ----一次性行為。在退出一個事件后,FD內部禁用
    EPOLLOUT = 4               ----可寫
    EPOLLPRI = 2               ----緊急可讀
    EPOLLRDBAND = 128          ----讀取優先
    EPOLLRDNORM = 64           ----相當於epollin
    EPOLLWRBAND = 512          ----寫入優先
    EPOLLWRNORM = 256          ----相當於epollout

水平觸發和邊緣觸發:

Level_triggered(水平觸發,有時也稱條件觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll.poll()會通知處理程序去讀寫。如果這次沒有把數據一次性全部讀寫完(如讀寫緩沖區太小),那么下次調用 epoll.poll()時,它還會通知你在上沒讀寫完的文件描述符上繼續讀寫,當然如果你一直不去讀寫,它會一直通知你!!!如果系統中有大量你不需要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率!!! 優點很明顯:穩定可靠

Edge_triggered(邊緣觸發,有時也稱狀態觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll.poll()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完(如讀寫緩沖區太小),那么下次調用epoll.poll()時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件才會通知你!!!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符!!!缺點:某些條件下不可靠

#!/usr/bin/python
# coding:utf-8

import select,socket
import time

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

sk = socket.socket()
sk.bind(('192.168.110.100',8080))
sk.listen(5)
sk.setblocking(0)   #設置非阻塞模式

epoll = select.epoll() #建立一個epoll對象
epoll.register(sk.fileno(),select.EPOLLIN) #監聽sk文件描述符的讀事件(連接過來,產生讀事件)
try:
    connections = {}; requests = {}; responses = {}
    while True:
        events = epoll.poll()     #關注是否有關心的事發生
        for fileno,event in events:  # 返回的events是一個(fileno, event code)tuple列表. fileno是文件描述符, 是一個整型數.
            if fileno == sk.fileno(): #如果是服務器socket事件(即有新連接),建立一個新 連接
                connection, address = sk.accept() #建立的新連接
                connection.setblocking(0) #設置socket為非阻塞模式. 
                epoll.register(connection.fileno(), select.EPOLLIN) # 注冊socket的read(EPOLLIN)事件
                connections[connection.fileno()] = connection # 保存文件描述符
                requests[connection.fileno()] = b''           #發送過來的內容
                responses[connection.fileno()] = response     # 要發送的內容
                
            elif event & select.EPOLLIN:   #如果讀事件發生
                requests[fileno] += connections[fileno].recv(1024)# 從客戶端讀取信息
                if EOL1 in requests[fileno] or EOL2 in requests[fileno]: #表示信息接收完畢,結束標志
                    epoll.modify(fileno, select.EPOLLOUT)#一旦完整的http請求接收到,取消注冊讀取事件,注冊寫入事件(EPOLLOUT), 寫入事件在能夠發送數據回客戶端的時候產生
                    print('-'*40 + '\n' + requests[fileno].decode()[:-2])
        
            elif event & select.EPOLLOUT: #如果寫入事件發生在一個客戶端socket上面, 我們就可以發送新數據到客戶端了. 
                byteswritten = connections[fileno].send(responses[fileno]) #發送數據到客戶端,並返回發送的字節個數
                responses[fileno] = responses[fileno][byteswritten:]   #對字符串進行切片操作,如果完全切,表面發送完畢
                if len(responses[fileno]) == 0: #表明數據發送完畢
                    epoll.modify(fileno, 0)     #一旦所有的返回數據都發送完, 取消監聽讀取和寫入事件. 
                    connections[fileno].shutdown(socket.SHUT_RDWR)
                        
            elif event & select.EPOLLHUP:   #表示客戶端斷開連接
                epoll.unregister(fileno)    #取消注冊
                connections[fileno].close() #斷開連接.
                del connections[fileno]     #銷毀對象
                
finally:
    epoll.unregister(sk.fileno())
    epoll.close()
    serversocket.close()
            
服務端
#!/usr/bin/python
# coding:utf-8

import socket
obj = socket.socket()   
obj.connect(('192.168.110.100',8080))
obj.sendall('hellob\n\r\n')
print obj.recv(1024)
obj.close()
客戶端

 

實戰代碼:

# /usr/bin/python
# coding:utf-8

import select
import socket
import sys
import Queue
import time
import threading
import logging
import datetime
import re, os
import hashlib

sys.path.append('../')

import multiprocessing

from SQLdb import SQLdb
from mylog import MyLog as Log

from communication_packet import Communication_Packet, Communication_Packet_Flags, Error_Info_Flags
from encryption import PrpCrypt
import pdb

'''
Constant Meaning
EPOLLIN Available for read
EPOLLOUT Available for write
EPOLLPRI Urgent data for read
EPOLLERR Error condition happened on the assoc. fd
EPOLLHUP Hang up happened on the assoc. fd
EPOLLET Set Edge Trigger behavior, the default is Level Trigger behavior
EPOLLONESHOT Set one-shot behavior. After one event is pulled out, the fd is internally disabled
EPOLLRDNORM Equivalent to EPOLLIN
EPOLLRDBAND Priority data band can be read.
EPOLLWRNORM Equivalent to EPOLLOUT
EPOLLWRBAND Priority data may be written.
EPOLLMSG Ignored.

'''


class Server(object):
    def __init__(self, server_IP=None, server_port=None):
        # def __init__(self,server_address = ('112.33.9.154',11366)):
        '''
        初始化服務器一些全局數據
        '''
        # pdb.set_trace()

        # 使用默認模式:debug模式
        self.log = Log()
        self.log.openConsole()  # 打開控制端輸出
        self.logger = self.log.getLog()
        self.dbname = 'sync_test'

        # Defaut, we use local host IP and port:11366
        if server_IP is None or server_port is None:
            if server_IP is None:
                try:
                    self.server_IP = self.getlocalIP()
                    self.logger.info('Current server_IP:    %s' % self.server_IP)
                except:
                    self.logger.critical('Get server IP Error!')
                    raise

            if server_port is None:
                self.server_port = 11366
        else:
            self.server_IP = server_IP
            self.server_port = server_port

        self.server_address = (self.server_IP, self.server_port)  # 設置server地址
        self.ListenNum = 100  # 設置最大監控soket連接數
        self.connections = {}  # 記錄當前連接
        self.requests = {}  # 記錄當前連接的請求數據
        self.addresses = {}  # 記錄客戶端地址
        self.errorInfo = {}  # 記錄錯誤信息,如果出錯則把錯誤信息返回給客戶端
        self.responseInfo = {}
        self.readthreadRecord = {}

        self.lock = threading.Lock()  # 構造線程鎖用於數據同步
        self.db = SQLdb()  # 初始化數據庫用於數據庫同步
        self.setDB('localhost', 'root', '123456', 'sync_test')

        self.readthreadlock = threading.Lock()

        self.EOF = '\n\r\n'
        self.servernum = 'serverxxxxx'
        self.key = '91keytest'
        # set communication user id
        Communication_Packet.set_userid(self.servernum)
        self.encryption = PrpCrypt()
        pass

    def setServerAddree(self, server_ip, server_port):
        '''
        Set server address
        '''
        self.server_address = (server_ip, server_port)  # 設置server地址

    def setDB(self, host=None, username=None, password=None, dbname=None):
        self.db = SQLdb(host, username, password, dbname)

    def getlocalIP(self):
        '''
        獲取第一塊網卡i做為綁定IP
        '''
        try:
            s = os.popen('ifconfig').read()
        except:
            raise
        else:
            ip = re.findall('inet addr:(?<![\.\d])(?:\d{1,3}\.){3}\d{1,3}(?![\.\d])', s)[0].split(':')[1]
        return ip

    def __init_server(self):
        '''
        初始化server以及epoll監控對象
        '''
        try:
            # pdb.set_trace()
            # Create a TCP/IP socket
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

            # 設置socket為非阻塞狀態
            self.server.setblocking(0)

            # Bind the socket to the port
            self.logger.info('starting up on %s port %s' % self.server_address)
            try:
                self.server.bind(self.server_address)
            except:
                echo = os.popen('''lsof -i :11366 |grep "(LISTEN)" | awk '{printf($1)}' ''').read()
                print '端口%s被%s進程占用!' % (self.server_port, echo)
                self.logger.error('Bind on %s port %s Error!' % self.server_address)
                raise

            # Listen for incoming connections
            # self.server.listen(self.ListenNum)
            self.server.listen(1)

            # Set up the epoll
            self.epoll = select.epoll()
            self.epoll.register(self.server.fileno(), select.EPOLLIN | select.EPOLLET)  # 為server注冊讀事件,並設置其為邊緣出發模式
        except:
            raise Exception('__init_server Error')

    def __jionthreads(self):
        '''
        join the threading
        '''
        # self.logger.debug('Current threadtast is  %d ' % len(threading.enumerate()))
        main_thread = threading.currentThread()
        for t in threading.enumerate():
            if t is main_thread:
                continue
            else:
                t.join(0)  # 非阻塞join

                # self.logger.debug('After joined the threads... %d '% len(threading.enumerate()))

    def __format_str(self, businessFlag, data, endFlag=True, errorFlag=False, hasnewconf=False, versioninfo=''):
        '''
        格式化發送數據
        '''
        formatstr = {'BUSINESS_TYPE': businessFlag, 'DATA': data, 'ENDFLAG': endFlag, 'ERRORFLAG': errorFlag,
                     'HASNWECONF': hasnewconf, 'VERSIONINFO': versioninfo}
        return str(formatstr) + self.EOF

    def get_table_filed(self, table_name, db, lock, db_name):
        # pdb.set_trace()
        # from the db get the table field!
        query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s' and TABLE_SCHEMA = '%s';" % (
        table_name, db_name)
        with self.lock:
            detection_version_fields = self.db.fechdb(query_detection_version_field_sql)
            DB_ERROR = self.db.Error
        if DB_ERROR:
            # Record Error and end task
            DB_ERROR = False
            self.logger.error('----Get %s fileds Error! End this task-----' % table_name)
            return
        else:
            # query result is Unicode,so we need to encode to utf-8
            table_field = [field[0].encode('utf-8') for field in detection_version_fields]
            return table_field

    def calc_md5(self, data):
        return hashlib.md5(data).hexdigest()

    def validating_message_token(self, receving_data):
        # print receving_data
        # pdb.set_trace()
        # print receving_data
        print len(receving_data)
        pre_md5 = receving_data[:16]
        suffix_md5 = receving_data[-16:]
        message_md5 = pre_md5 + suffix_md5
        message = receving_data[16:-16]
        cur_md5 = self.calc_md5(message)
        print cur_md5, message_md5
        if message_md5 == cur_md5:
            return True, message
        else:
            return False, message
        pass

    def validating_content_token(self, content):
        receive_content = content['Business']['Content']
        if not isinstance(receive_content, str):
            receive_content = str(receive_content)
        receive_md5 = content['Info_Status']['Security_Token']
        receive_time = content['Info_Status']['Time']
        cur_md5 = self.calc_md5(receive_content + receive_time + self.key)
        if cur_md5 == receive_md5:
            return True
        else:
            return False
        pass

    def packaging_message(self, Message=None, Error=None, Info_Status=None):
        # ----Pack send message
        # Init Communication_Packet
        cm_packet = Communication_Packet()

        # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True):
        cm_packet.set_content_Business(targv=Message)

        # def set_content_Error(self,error_flag = False,error_type = None,error_info = None):
        cm_packet.set_content_Error(targv=Error)

        now_time = str(datetime.datetime.now())  # get current time
        # Business+time+key calculate token,calculate token
        security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key)
        # def set_content_Info_Status(self,info_type,security_token,time,is_end):
        # Need to replace the security_token
        Info_Status = list(Info_Status)  # 轉化元組到列表
        Info_Status[1] = security_token
        Info_Status[2] = now_time
        Info_Status = tuple(Info_Status)  # 重新轉化列表到元組作為參數

        cm_packet.set_content_Info_Status(targv=Info_Status)
        try:
            send_data = cm_packet.content
        except Exception, e:
            raise e
        else:
            # we Encryption data
            self.logger.debug(type(send_data))
            encryption_send_data = self.encryption.encrypt(str(send_data))
            # caculate md5
            encrypt_send_md5 = self.calc_md5(encryption_send_data)
            complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF
            return complete_send_data

    def unpackaging_message(self, unpacking_str):
        # pdb.set_trace()
        if not isinstance(unpacking_str, str):
            raise exceptions.ValueError
        else:
            unpacking_str = unpacking_str.strip(self.EOF)
            flag, message = self.validating_message_token(unpacking_str)
            if flag:
                decrypt_str = self.encryption.decrypt(message)
                try:
                    message_dict = eval(decrypt_str)
                except Exception, e:
                    self.logger.error('Eval decrypt_str Error!')
                    raise e
                else:
                    if self.validating_content_token(message_dict):
                        return message_dict
                    else:
                        self.logger.error('Message is tampered!')
                        return None
                pass
            else:
                self.logger.error('Message is tampered!')
                return None

    def init_detection_nums(self):
        # get detect_point_nums from server db
        # pdb.set_trace()
        query_sql = "select distinct(sync_point_no) from sync_control"
        BD_ERROR = False
        with self.lock:
            point_nums = self.db.fechdb(query_sql)
            BD_ERROR = self.db.Error
        if BD_ERROR:
            self.logger.error('-----get detect_point_nums error-----')

        if point_nums:
            nums_list = [p[0] for p in point_nums]
            return nums_list
        else:
            return None

    def verification_sync_point_no(self, detection_num):
        detect_point_nums = self.init_detection_nums()
        if not detect_point_nums:
            self.logger.error('-----db config error!-----')
            raise
        if detection_num in detect_point_nums:
            return True
        else:
            return False

    def read_sync_contol_configure(self, table_name, sync_type, sync_point_no, sync_user='server'):
        # Read the control configuration from loacal db,if have not,we sync it from server,then read it again
        # pdb.set_trace()
        qeury_sql = "select * from sync_control where sync_table = '%s' and sync_type = '%s' and sync_user = '%s' and sync_point_no = '%s';" % (
        table_name, sync_type, sync_user, sync_point_no)
        DB_ERROR = False

        # set table name
        sync_table_name = 'sync_control'

        # get `sync_control` table fields
        table_field = self.get_table_filed(sync_table_name, self.lock, self.db, self.dbname)
        if not table_field:
            self.logger.error('----------' % table_name)
            return None

        with self.lock:
            control_configure = self.db.fechdb(qeury_sql)
            DB_ERROR = self.db.Error
        if DB_ERROR:
            self.logger.error('-----Get control configure Error!-----')
            return None
        if control_configure:
            # Get the configure from db! and On the basis of classification of table name and sync type(uploat or download)
            # format the configure to a list
            control_configure_list = []
            for iter_conf in control_configure:
                control_configure_item = dict.fromkeys(table_field)
                lenth = len(table_field)
                # set value for everyone key
                for i in range(lenth):
                    val = iter_conf[i]
                    if isinstance(val, unicode):
                        val = val.encode('utf-8')
                    control_configure_item[table_field[i]] = val
                control_configure_list.append(control_configure_item)
            return control_configure_list

    def parsing_config(self, config):
        # parsing the config to get the sql
        # may modify the logic of the code
        # pdb.set_trace()
        p_conf = dict()
        table_name = config['sync_table']  # Get the download table name
        p_conf['table_name'] = table_name
        table_field = config['sync_field']  # Get sync table field!
        p_conf['sync_type'] = config['sync_type']
        p_conf['sync_range'] = config['sync_range']
        p_conf['sync_range_value'] = config['sync_range_value']
        p_conf['sync_is_update_time'] = config['sync_is_update_time']

        # if table_field is null,we need sync all the field!
        if not table_field:
            table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname)
            if not table_field:
                self.logger.error(
                    '-----Terminate this task,becase of getting the %s table fileds fialed!-----' % table_name)
                return
        p_conf['table_field'] = table_field
        # Get this operation's type
        try:
            sql_operations = eval(config['sync_operation_type'])
        except Exception, e:
            self.logger.error('-----get sync_operation_type error!-----')
            return

        upside_operate = sql_operations['upside']  # if have, this download operation need carry db info to the server!
        p_conf['upside_operate'] = upside_operate
        downside_operate = sql_operations['downside']  # how to handle the downloaded db info!
        p_conf['downside_operate'] = downside_operate
        update_state_operate = sql_operations['update_state']
        p_conf['update_state_operate'] = update_state_operate

        # Get the sync sql of the corresponding operation
        try:
            sqls = eval(config['sync_sql'])
        except Exception, e:
            self.logger.error('-----get sync_sql error!-----')
            raise

        upside_sqls = sqls['upside']  # a tuple or None
        p_conf['upside_sqls'] = upside_sqls

        downside_sqls = sqls['downside']  # a tuple or None
        p_conf['downside_sqls'] = downside_sqls

        update_state_sqls = sqls['update_state']  # a tuple or None
        p_conf['update_state_sqls'] = update_state_sqls

        # Get the sync patch field of the corresponding operation
        try:
            if config['sync_patch_field']:
                patch_fields = eval(config['sync_patch_field'])
            else:
                pass
        except Exception, e:
            self.logger.error('-----get sync_field error!-----')
            raise

        upside_fields = patch_fields['upside']  # a tuple or None
        p_conf['upside_fields'] = upside_fields

        downside_fields = patch_fields['downside']  # a tuple or None
        p_conf['downside_fields'] = downside_fields

        update_state_fields = patch_fields['update_state']  # a tuple or None
        p_conf['update_state_fields'] = update_state_fields

        # Get the sync_field_value of the corresponding operation
        try:
            if config['sync_patch_field_value']:
                patch_field_values = eval(config['sync_patch_field_value'])
            else:
                pass
        except Exception, e:
            self.logger.error('-----get sync_field_value error!-----')
            return

        upside_patch_field_values = patch_field_values['upside']  # a tuple or None
        p_conf['upside_patch_field_values'] = upside_patch_field_values

        downside_patch_field_values = patch_field_values['downside']  # a tuple or None
        p_conf['downside_patch_field_values'] = downside_patch_field_values

        update_state_patch_field_values = patch_field_values['update_state']  # a tuple or None
        p_conf['update_state_patch_field_values'] = update_state_patch_field_values

        is_carry_state = config['sync_is_carry_state']
        p_conf['is_carry_state'] = is_carry_state

        is_update_state = config['sync_is_update_state']
        p_conf['is_update_state'] = is_update_state

        is_use_state_carry_data = config['sync_is_use_state_carry_data']
        p_conf['is_use_state_carry_data'] = is_use_state_carry_data

        return p_conf

    def __proxy(self, FileNo):
        # 啟動線程處理寫事件
        newthread = threading.Thread(target=self.__handler_write_event, args=(FileNo,))
        # newthread.daemon = True
        newthread.start()

    def __delevnet(self, FileNo):
        '''
        注銷不再關注的事件以及刪除相關的資源
        '''
        self.logger.info('Start to unregister and close %s socket... ' % FileNo)
        self.epoll.unregister(FileNo)
        self.connections[FileNo].close()
        del self.connections[FileNo]
        del self.requests[FileNo]
        del self.addresses[FileNo]
        del self.errorInfo[FileNo]
        self.logger.info('unregistered and closed the %s socket! ' % FileNo)

    # def __read_from_socket(self,FileNo):
    # '''
    # Read data from socket
    # '''
    # if self.requests[FileNo]:
    # return False
    # else:
    # try:
    # while True:
    # tmpdata = self.connections[FileNo].recv(4096)
    # if not tmpdata:
    # return True
    # self.requests[FileNo] += tmpdata
    # self.logger.debug(len(tmpdata))
    # #print tmpdata
    # except socket.error:
    # return True
    # except Exception,e:
    # raise e
    def __read_from_socket(self, FileNo):
        '''
        Read data from socket
        '''
        try:
            while True:
                tmpdata = self.connections[FileNo].recv(4096)
                # print 'tmpdata: %s' % tmpdata
                # 因為python沒有EPOLLRDHUP,而客戶端主動關閉或者沒有發送數據前ctr+c
                # 服務器觸發的是EPOLLIN事件,而從socket里面讀取到的數據為空...沒有找到其他解決方案!
                if not tmpdata:
                    break
                self.requests[FileNo] += tmpdata
                # self.logger.debug(len(tmpdata))
        except socket.error:
            pass
        except Exception, e:
            raise e

    # error_flag = False,error_type = None,error_info = None
    def __deal_business(self, FileNo):
        # 根據接受到的數據處理客戶端業務
        # pdb.set_trace()
        try:
            message = self.unpackaging_message(self.requests[FileNo])
            # we need reset the requests info
            self.requests[FileNo] = ''
        # self.logger.debug(message)
        except:
            # 需要設置錯誤標志並卻設置錯誤信息
            self.logger.error('unpackaging_message Error!')
            self.errorInfo[FileNo] = (Error_Info_Flags.Receive_Data_Error, 'Server recieved data Error!')
            return
        else:
            if message:
                business = message['Business']
                client_id = message['userid']
                error_info = message['Error']
                info_states = message['Info_Status']
                verification_result = self.verification_sync_point_no(client_id)
                if verification_result:
                    # Here we handle the business
                    # 1、get config from the db
                    # read_sync_contol_configure(self,table_name,sync_type,sync_point_no,sync_user):
                    try:
                        table_name = business['Table_Name']
                        sync_type = business['Type']
                        is_sync_flag = business['Is_Syncdb']
                    except:
                        self.errorInfo[FileNo] = (
                        Error_Info_Flags.Receive_Data_Error, 'Business information is incomplete!')
                        return
                    if sync_type == Communication_Packet_Flags.DOLOAD_DB:
                        s_type = 'download'
                    else:
                        s_type = 'upload'
                    b_config = self.read_sync_contol_configure(table_name, s_type, client_id)
                    # pdb.set_trace()
                    if b_config:
                        p_config = [self.parsing_config(conf) for conf in b_config]
                        self.logger.debug(p_config)
                    else:
                        pass
                    if b_config:
                        self.real_business_processing_functions(FileNo, p_config, business, info_states, error_info)
                    else:
                        # set error info!
                        self.errorInfo[FileNo] = (
                        Error_Info_Flags.Server_config_Error, 'Server config is None, give up this task!')

                else:
                    # 用戶信息認證失敗
                    self.logger.error('-----User authentication failed! userid: %s-----' % client_id)
                    self.errorInfo[FileNo] = (Error_Info_Flags.User_Certification_Error, 'User authentication failed!')
            else:
                # if no message,it means information authenticationfailed!
                self.logger.error('-----Clinet\'s Information authentication failed!-----')
                self.errorInfo[FileNo] = (
                Error_Info_Flags.Info_Certification_Error, 'Information authentication failed!')

    def calculate_time(self, time_type, time_value):
        # maybe is str,we need to convert it to int type
        time_value = int(time_value)

        # get current time as the end time
        cur_time = datetime.datetime.now()
        hours = 0
        if time_type == 'hour':
            hours = time_value * 24
        elif time_type == 'day':
            hours = time_value * 24
        elif time_type == 'week':
            hours = time_value * 24 * 7
        elif time_type == 'month':
            hours = time_value * 24 * 30
        else:
            self.logger.error('-----time_type Error!-----')
            return None
        # caculate the start time
        start_time = cur_time - datetime.timedelta(hours=hours)
        return (start_time, cur_time)

    # handle the bussiness from the client
    def real_business_processing_functions(self, FileNo, business_config, business, info_states, error_info):
        # pdb.set_trace()
        # according to the config we handle the business
        business_config = business_config[0]
        if info_states['Info_Type'] == Communication_Packet_Flags.REQEST:
            # get bussiness type
            request_bussiness_type = business['Type']
            if request_bussiness_type == Communication_Packet_Flags.UPLOAD_DB:
                request_bussiness_type = 'upload'
            elif request_bussiness_type == Communication_Packet_Flags.DOLOAD_DB:
                request_bussiness_type = 'download'
            else:
                self.errorInfo[FileNo] = (
                Error_Info_Flags.Client_Data_Pack_Error, 'Request business type error %s' % request_bussiness_type)
                return

            loc_config_sync_type = business_config['sync_type']
            if request_bussiness_type == loc_config_sync_type:

                is_carry_state = business_config['is_carry_state']
                is_use_state_carry_data = business_config['is_use_state_carry_data']
                is_update_state = business_config['is_update_state']
                # handle the download request
                if request_bussiness_type == 'download':
                    # parsing the loacal config
                    up_sql_list = []
                    upside_operates = business_config['upside_operate'].split('|')
                    upside_sqls = business_config['upside_sqls']
                    upside_fields = business_config['upside_fields']
                    upside_patch_field_values = business_config['upside_patch_field_values']
                    sync_range = business_config['sync_range']
                    sync_range_value = business_config['sync_range_value']

                    lenth = len(upside_sqls)
                    for i in range(lenth):
                        sql_part = upside_sqls[i]

                        # if sync_range is not None,we will ignore the other
                        if sync_range:
                            if sync_range == 'period':
                                t_type, t_value = sync_range_value.split(':')
                                s_time, e_time = self.calculate_time(t_type, t_value)
                                qeury_sql = sql_part % (str(s_time), str(e_time))
                            else:
                                qeury_sql = sql_part
                            # add it into the list
                            up_sql_list.append(qeury_sql)
                        else:
                            # we need parsing other configurations
                            if is_use_state_carry_data:
                                try:
                                    # [((u'update_time', u'1970-01-01 00:00:00'),)] limk this
                                    qeury_sql = sql_part % business['Content'][0][0][1]
                                except:
                                    self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Content Error!')
                            else:
                                qeury_sql = sql_part
                            up_sql_list.append(qeury_sql)
                    query_data = []
                    for u_sql in up_sql_list:
                        BD_ERROR = False
                        with self.lock:
                            res = self.db.fechdb(u_sql)
                            BD_ERROR = self.db.Error
                        if BD_ERROR:
                            self.errorInfo[FileNo] = (
                            Error_Info_Flags.Server_DB_Error, 'Server db Error,SQL: %s' % u_sql)
                            break
                        else:
                            query_data.append(res)
                    self.responseInfo[FileNo] = query_data
                # handle the upload request
                elif request_bussiness_type == 'upload':
                    # pdb.set_trace()
                    # parsing the loacal config
                    content = business['Content']
                    try:
                        self.refresh_the_database(business_config, content)
                    except Exception, e:
                        print e
                        self.errorInfo[FileNo] = (Error_Info_Flags.Server_config_Error, 'Server Config Error!')

                else:
                    self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'bussiness type Error!')
            else:
                self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error,
                                          'server config type is different from client request business type! Error!')
        else:
            self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Communication_Packet_Flags Error!')

    # update the db
    def refresh_the_database(self, handle_config, db_content):
        '''
        refresh the database,maybe insert、update、delete...
        '''
        # parsing the handle config
        table_name = handle_config['table_name']
        table_field = handle_config['table_field']
        downside_operate = handle_config['downside_operate']
        update_state_operate = handle_config['update_state_operate']
        downside_sqls = handle_config['downside_sqls']
        update_state_sqls = handle_config['update_state_sqls']
        downside_fields = handle_config['downside_fields']
        update_state_fields = handle_config['update_state_fields']
        downside_patch_field_values = handle_config['downside_patch_field_values']
        update_state_patch_field_values = handle_config['update_state_patch_field_values']
        is_update_time = handle_config['sync_is_update_time']
        # pdb.set_trace()

        try:
            table_field = eval(table_field)
            if not table_field:
                table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname)
            first_field = table_field[0]
        except Exception, e:
            self.logger.error('-----eval table_field error,config is error!-----')
            raise e
        if first_field == 'id':
            is_id = True
        else:
            is_id = False

        download_oprations = downside_operate.split('|')
        if 'file' in download_oprations:
            filename = self.createNewBlackListPath()
            handle_flag = self.handle_file_func(db_content, filename)
            return handle_flag
        # table_field = eval(table_field)
        try:
            is_update_time = int(is_update_time)
        except:
            self.logger.error('-----is_update_time config value error!-----')
            raise
        for db_item in db_content:
            if is_update_time:
                time_index = table_field.index('update_time')
                update_time = (str(datetime.datetime.today()).split('.')[0],)
                db_item = db_item[:time_index] + update_time + db_item[time_index + 1:]
            if is_id:
                rowdata = db_item[1:]
            else:
                rowdata = db_item
            # self.logger.debug(rowdata)
            # print dict(zip(self.phishing_log_fields,rowdata))
            lenth = len(download_oprations)
            for oper in download_oprations:
                # here we get all the patched field value
                # '((fixed,true),(carry,None),(tansfer,None))',
                myindex = download_oprations.index(oper)
                fields_value = []
                # pdb.set_trace()
                for i in range(len(downside_patch_field_values[myindex])):
                    val = downside_patch_field_values[myindex][i]
                    if val[0] == 'fixed':
                        pass
                    elif val[0] == 'carry':
                        pass
                    elif val[0] == 'transfer':
                        field_name = downside_fields[myindex][i]
                        v_index = table_field.index(field_name)
                        tf_value = db_item[v_index]
                        fields_value.append(tf_value)
                        pass
                    else:
                        self.logger.error('-----server downside_patch_field_values Error! valuse:    %s------' % str(
                            downside_patch_field_values))
                # pdb.set_trace()
                if fields_value:
                    d_sql, f_val = self.pre_handle_None_value(downside_sqls[myindex],
                                                              self.format_field_value(fields_value))
                    db_sql = self.format_sql(d_sql, f_val)
                else:
                    db_sql = downside_sqls[myindex]

                # pdb.set_trace()
                BD_ERROR = False
                with self.lock:
                    if oper == 'insert':
                        self.db.insertdb(db_sql)
                        BD_ERROR = self.db.Error
                    if oper == 'update':
                        self.db.updatedb(db_sql)
                        BD_ERROR = self.db.Error
                    if oper == 'delete':
                        self.db.deldb(db_sql)
                        BD_ERROR = self.db.Error
                if not BD_ERROR:
                    break
                else:
                    continue
        else:
            return True

    def format_tuple(self, tup):
        '''
        It is None if field in DB is NULL when we get the data from db use mysqldb!
        Format the None to NuLL for inserting data to DB
        '''
        vluelist = ['NULL' if t is None else t for t in tup]
        padlist = ['%s' if t is None else '\'%s\'' for t in tup]
        padstr = ''
        for pl in padlist:
            padstr += pl
            padstr += ','
        else:
            padstr = padstr[:-1]
        return padstr % tuple(vluelist)

    def format_sql(self, patch_sql, patch_field_value):
        if isinstance(patch_sql, str) and isinstance(patch_field_value, tuple):
            try:
                res_sql = patch_sql % patch_field_value
            except:
                res_sql = None
            return res_sql
        else:
            self.logger.error('-----formate_sql args type error-----')
            raise exceptions.TypeError

    def format_field_value(self, field_value):
        # we neeed hanle the ' or " in the mysql statement
        res_list = list()
        for val in field_value:
            if isinstance(val, unicode):
                val = val.encode('utf-8')
            if isinstance(val, str):
                f_val = val.replace('\'', '\\\'').replace('\"', '\\\"')
            else:
                f_val = val
            res_list.append(f_val)
        return tuple(res_list)

    def get_all_sub_str_index(self, index_str, sub_str, none_indexs):
        # print index_str
        index_list = []
        start_index = 0
        cnt = 0
        while True:
            try:
                tmp_index = index_str.index(sub_str, start_index)
            except:
                break
            else:
                if cnt in none_indexs:
                    index_list.append(tmp_index)
                start_index = tmp_index + len(sub_str)
                cnt += 1
        return tuple(index_list)

    def pre_handle_None_value(self, patch_sql, field_values):

        # get all the None value index
        None_indexs = []
        for i in range(len(field_values)):
            if field_values[i] is None:
                None_indexs.append(i)

        if None_indexs:
            # get '%s' indexs
            s_indexs = self.get_all_sub_str_index(patch_sql, "'%s'", None_indexs)

            str_list = list(patch_sql)
            # pdb.set_trace()
            subtraction_index = 0
            for ix in s_indexs:
                print subtraction_index
                str_list.pop(ix - subtraction_index)
                # print str_list[ix-subtraction_index]
                str_list.pop(ix - subtraction_index + 2)
                subtraction_index += 2
            replace_str = ''.join(str_list)
            # pdb.set_trace()
            # print replace_str
            # pdb.set_trace()
            res_field_values = ['NULL' if f_val is None else f_val for f_val in field_values]

            return replace_str, tuple(res_field_values)
        else:
            return patch_sql, field_values

    def __handler_read_event(self, FileNo):
        self.logger.info('Start handle the recieved data...')
        # 對接受到數據做業務處理
        try:
            self.__deal_business(FileNo)
        except Exception, e:
            self.logger.error('__deal_business Exception:    %s' % e)
            # self.logger.debug(datetime.datetime.now())
            try:
                self.epoll.modify(FileNo, select.EPOLLOUT | select.EPOLLET | select.EPOLLONESHOT)
            except:
                pass
            self.logger.error('Deal_business ERRor')
        else:
            self.modify_revent_to_wevent(FileNo)
            self.logger.info('Handle the recieved data End!')

    # content = dict(sync_point=self.detect_piont_name,sync_point_no = self.detect_piont_serial_number)
    # message_info = (Communication_Packet_Flags.DOLOAD_DB,str(content),True,table_name,table_field)
    # error_info = (False,None,None)
    # message_status = (Communication_Packet_Flags.RESPONSE,None,str(datetime.datetime.now()),True)

    def __handler_write_event(self, FileNo):

        # if errorInfo is not null,we send Error to Client else handing write business
        Error_Info = None
        try:
            Error_Info = self.errorInfo[FileNo]
            # reset error info to None
            self.errorInfo[FileNo] = ''
        except:
            # 說明socket已從列表注銷和直接退出程序
            self.logger.info('This socket is removed from error info list!')
            return
        error_info = (False, None, None)
        if Error_Info:
            print Error_Info  # using debug
            error_info = (True, Error_Info[0], Error_Info[1])

        response = self.responseInfo[FileNo]
        # need reset the response info
        self.responseInfo[FileNo] = ''

        res_info = (None, response, None, None, False, None)

        info_states = (Communication_Packet_Flags.RESPONSE, None, None, True)

        message = self.packaging_message(res_info, error_info, info_states)
        self.logger.debug(message)
        self.send_message(FileNo, message, True)  # send the message to client

        self.modify_wevent_to_revent(FileNo)  # modify the event

    def modify_wevent_to_revent(self, FileNo):
        '''
        If we trigger the read envet,we use this function
        '''
        try:
            # We need modify event to read event!
            self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLIN | select.EPOLLONESHOT)
        except:
            pass

    def modify_revent_to_wevent(self, FileNo):
        '''
        If we trigger the write envet,we use this function
        '''
        try:
            self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLOUT | select.EPOLLONESHOT)
        except:
            pass

    def send_message(self, FileNo, message, blocking=True):
        # if message is big,use noblocking it will occur error!
        # so, we maybe set it to blocking
        if blocking:
            self.connections[FileNo].setblocking(True)
        if FileNo not in self.connections:
            self.logger.debug('This socket not in the connections list!')
            return
        try:
            self.connections[FileNo].sendall(message)
        except Exception, e:
            pass
        # last we need to set it to False! we use the noblocking module
        if blocking:
            self.connections[FileNo].setblocking(False)

    def start_server(self):
        try:

            self.__init_server()  # 初始化服務器
        except:
            # 初始化服務器錯誤
            self.logger.critical('Init server Error...')
            raise

        while True:
            # join the thread
            self.__jionthreads()

            # Wait for at least one of the sockets to be ready for processing
            self.logger.info('waiting for the next event')
            events = self.epoll.poll()

            for fileno, event in events:
                # Handle inputs
                if fileno == self.server.fileno():
                    try:
                        while True:
                            connection, address = self.server.accept()
                            connection.setblocking(0)  # 設置連接為非阻塞模式
                            # Here we can not use select.EPOLLONESHOT flag.This flag
                            self.epoll.register(connection.fileno(),
                                                select.EPOLLIN | select.EPOLLET)  # 把新來的連接同樣設置為邊緣出發模式
                            self.connections[connection.fileno()] = connection  # 記錄連接
                            self.requests[connection.fileno()] = ''  # 記錄業務請求
                            self.addresses[connection.fileno()] = address  # 記錄連接地址
                            self.errorInfo[connection.fileno()] = ''
                            self.responseInfo[connection.fileno()] = ''
                            # 設置錯誤信息如果為空串則是無錯誤信息
                            self.logger.info('========================================')
                            self.logger.info('Client %s:%s connected server' % (address))
                    except socket.error:
                        pass

                # elif event & (select.EPOLLIN | select.EPOLLONESHOT):
                elif event & select.EPOLLIN:

                    # Read data from socket untill data is recieved over!
                    self.logger.debug('EVENT EPOLLIN: %s' % hex(event))
                    # pdb.set_trace()
                    try:
                        r_flag = self.__read_from_socket(fileno)
                    except socket.error:
                        pass
                    except Exception, e:
                        # if we catch other Exception, it is to say that we recieved data from client Error!
                        # We need send error data to client!
                        self.logger.warning('Catch other exception when recieve data!')

                        self.errorInfo[fileno] = (
                        Error_Info_Flags.Receive_Data_Error, '-----Server recieved data Error!-----')
                        self.modify_revent_to_wevent(fileno)
                    else:
                        # if it has no exception when eval the data, we think that client data is recieved over.
                        # #then start a new thread to deal with the client data
                        # if not r_flag:
                        # print '#################################'
                        # pass
                        # else:
                        if self.requests[fileno]:
                            # Start a new thread to disposal the client requests
                            # self.logger.debug(self.requests[fileno])
                            if self.requests[fileno].endswith(self.EOF):
                                newthread = threading.Thread(target=self.__handler_read_event, args=(fileno,))
                                newthread.daemon = True
                                newthread.start()
                                print 'start %s' % newthread.name
                                # print 'print start new thread'
                        else:
                            # 沒有從客戶端讀取到數據,說明客戶端已經關閉,主動掛斷
                            # self.logger.info("closing    %s    %s    (HUP)" % self.addresses[fileno])
                            self.__delevnet(fileno)

                # elif event & (select.EPOLLOUT | select.EPOLLONESHOT):
                elif event & select.EPOLLOUT:
                    self.logger.debug('EVENT EPOLLOUT: %s' % bin(event))
                    # Write event happened,we use proxy to deal
                    # print 'Current file descripter: %d' % fileno
                    self.__proxy(
                        fileno)  # We neet a proxy using a function,but not a threadiing! If threading it has bugs(multi trigger event--I think)

                elif event & select.EPOLLHUP:
                    self.logger.debug('EVENT EPOLLHUP: %s' % bin(event))
                    # Client hung up, del event!
                    self.logger.info("closing    %s    %s    (HUP)" % self.addresses[fileno])
                    self.__delevnet(fileno)

                elif event & select.EPOLLERR:
                    # self.logger.debug('EVENT: %s' % event)
                    self.logger.info("    exception on %s" % connections[fileno].getpeername())
                    self.__delevnet(fileno)

                else:
                    # self.logger.debug('EVENT: %s' % bin(event))
                    # Other event,do not handle
                    pass


if __name__ == '__main__':
    # pdb.set_trace()
    myserver = Server()
    myserver.start_server()
server
# /usr/bin/env python
# coding:utf-8

import socket
import sys, os
import threading
import time
import logging
import multiprocessing
import random

import datetime

import hashlib
# hashlib.md5(open(fileName,'rb').read()).hexdigest()

import pdb

sys.path.append('../')
import exceptions

# 導入任務調度模塊
from apscheduler.schedulers.blocking import BlockingScheduler

from sqldb import SQLdb
from mylog import MyLog as Log

from communication_packet import Communication_Packet


class Client(object):
    # Handle message type
    HANDLE_GENERAL = 1  # 處理普通的應答包
    HANDLE_INSERT = 2  # 所有數據安裝傳遞過來的進行數據庫插入操作
    HANDLE_UPDATE = 3  # 對傳遞過來的數據進行更新操作,需要傳遞更新條件
    HANDLE_INERT_UPDATE = 4
    HANDLE_FILE = 5

    def __init__(self, IP='112.33.9.154', Port=11366, blackdir=None, db=None):
        self.tasksched = BlockingScheduler()  # 任務調度器
        self.serverIP = IP  # 設置服務器IP
        self.serverPort = Port  # 設置服務器端口
        if db is not None:
            self.db = db
        else:
            self.db = SQLdb()  # 初始化數據庫用於數據庫同步

        self.lock = threading.Lock()

        # 使用默認模式:debug模式
        self.log = Log()
        self.log.openConsole()  # 打開控制端輸出
        self.logger = self.log.getLog()

        self.EOF = '\n\r\n'  # Set EOF flag

        if blackdir is None:
            self.basepath = './blacklist/'
        else:
            self.basepath = blackdir

        self.sysdbFirstFlag = False
        self.key = None  # using to calculate token
        self.encryption = None
        self.detect_piont_name = 'xxxx'
        self.detect_piont_serial_number = 'xxxxxx'

    def set_DB(self, host=None, username=None, password=None, dbname=None):
        self.db = SQLdb(host, username, password, dbname)

    def set_first_synclog_flag(self, flag):
        self.synclog_flag = flag

    def setBlacklistDir(self, filedir=None):
        # Set blacklist dir
        if filedir is None:
            self.basepath = './blacklist/'
        else:
            self.basepath = filedir

    def createNewBlackListPath(self):

        # blacklistdir if not exists,create it
        if os.path.exists(self.basepath):
            pass
        else:
            try:
                os.mkdir(self.basepath)
            except:
                raise
        nowtime = datetime.datetime.now().strftime('%Y_%b_%d_%H_%M_%S')
        filename = 'blacklist_' + nowtime + '.txt'  # 根據文件擴展名
        filepath = self.basepath + filename
        return filepath

    def handle_file_func(self, content, filename):
        try:
            content_data = eval(db_content)
        except Exception, e:
            self.logger.error('-----handle_file_func:    eval business_content error!-----')
            return False
        else:
            # Open file for write data
            try:
                w_file = file(filename, 'w')
                for data_item in content_data:
                    w_file.write(str(data))
                else:
                    w_file.close()
            except Exception, e:
                self.logger.error('-----handle_file_func:    write data to file Error!------')
                return False
            else:
                return True

    def reap(self):
        # 回收可回收的進程,使用多進程的時候調用...可能不用
        while True:
            try:
                result = os.waitpid(-1, os.WNOHANG)
                if not result[0]: break
            except:
                break
            self.logger.info("reaped child process %d" % result[0])

    def __connect_server(self, IP, Port):
        '''
        連接遠程服務器反正通訊套接字
        '''
        # Creat a TCP/IP socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Connet the socket to the port where the server is listening
        server_address = (IP, Port)
        self.logger.info('connecting to %s port %s' % server_address)
        try:
            sock.connect(server_address)
        except:
            self.logger.error('connecting to %s port %s error!' % server_address)
            raise
        return sock

    def __get_tasks(self):

    # Get task from db,return an list

    def scheduler_tasks(self, task_list=None):
        # start scheduler to sched the tasks
        pass

    def calculateTime(self, starttime=None, intervalMinutes=None):
        if not starttime:
            nowtime = datetime.datetime.now()
        else:
            nowtime = starttime
        if intervalMinutes:
            interval = datetime.timedelta(minutes=intervalMinutes)
            return nowtime - interval
        else:
            return nowtime

    def calc_md5(self, data):
        return hashlib.md5(data)).hexdigest()

    def validating_message_token(self, receving_data):
        pre_md5 = receving_data[:16]
        suffix_md5 = receving_data[-16:]
        message_md5 = pre_md5 + suffix_md5
        message = receving_data.lstrip().rstrip(suffix_md5)
        cur_md5 = self.calc_md5(message)
        if message_md5 == cur_md5:
            return True, message
        else:
            return False, message
        pass

    def read_sync_state(self):
        qeury_sql = "select * from sync_state;"
        DB_ERROR = False
        with self.lock:
            sync_status = self.db.fechdb(query_sql)
            DB_ERROR = self.db.Error
        if DB_ERROR:
            raise
        if sync_status:
            sync_status_dict = dict()
            for sync_s in sync_status:
                sync_status_dict[sync_s[0]] = sync_s
            return sync_status_dict
        else:
            return None

    def read_sync_contol_configure(self, read_flag=False):
        # Read the control configuration from loacal db,if have not,we sync it from server,then read it again

        qeury_sql = "select * from sync_control;"
        DB_ERROR = False

        # set table name
        table_name = 'sync_control'

        # get `sync_control` table fields
        table_field = self.get_table_filed(table_name, self.lock, self.db)
        if not table_field:
            self.logger.error('----------' % table_name)
            return None

    with self.lock:
        control_configure = self.db.fechdb(qeury_sql)
        DB_ERROR = self.db.Error
    if DB_ERROR:
        self.logger.error('-----Get control configure Error!-----')
        return None
    if control_configure:
        # Get the configure from db! and On the basis of classification of table name and sync type(uploat or download)
        # format the configure to a list
        control_configure = []
        for iter_conf in control_configure:
            control_configure_item = dict.fromkeys(table_field)
            lenth = len(table_field)
            # set value for everyone key
            for i in range(lenth):
                control_configure_item[table_field[i]] = iter_conf[i]
            control_configure.append(control_configure_item)
        return control_configure
    else# we need get the configuration from the server! reload the configuration!
    if read_flag:  # if we read it again and no configure,return
        return None

    # sysnc the sync_control table from the server!
    self.logger.info('=====Start to init the sync control table from Server...=====')
    try:
        socket = self.__connect_server(self.serverIP, self.serverPort)  # 連接服務器
    except Exception, e:
        # raise Exception
        raise e

    # we need carry the detect point number to server!
    content = dict(sync_point=self.detect_piont_name, sync_point_no=self.detect_piont_serial_number)

    message_info = (Communication_Packet.DOLOAD_DB, str(content), True, table_name, table_field)
    error_info = (False, None, None)
    message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True)

    send_message = self.packaging_message(message_info, error_info, message_status)
    try:
        socket.sendall(send_message)
    except, Exception, e:
        # 發送出現錯誤則直接返回終止此次業務請求
        self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----')
        return
    self.download_db(socket, self.HANDLE_INSERT)
    self.logger.info('=====End init the sync control table from Server!=====')

    # After get control table we need read it from the local table again!
    self.read_sync_contol_configure(True)


def start_tasks(self):
    control_config = self.read_sync_contol_configure()
    thread_list = []
    if control_config:
        for config in control_config:
            newthread = threading.Thread(target=self.thread_task, args=(config))
            newthread.setDaemon = True
            newthread.start()
            thread_list.append(newthread)
        for t in thread_list:
            t.join()
    else:
        self.logger.error('-----init the sync control configuration error!-----')
        raise


def thread_task(self, task_config):
    # init an instance of sheduler
    my_scheduler = BlockingScheduler()
    # self.tasksched.add_job(self.synchronous_DB, 'interval', minutes = 15,max_instances = 1)
    '''
    --cron--
        Parameters:
            year (int|str) – 4-digit year
            month (int|str) – month (1-12)
            day (int|str) – day of the (1-31)
            week (int|str) – ISO week (1-53)
            day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
            hour (int|str) – hour (0-23)
            minute (int|str) – minute (0-59)
            second (int|str) – second (0-59)
            start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
            end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
            timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
    '''
    if task_config['sync_type'] == 'upload':
        if task_config['sync_time_mode'] == 'period':
            period_minutes = task_config['sync_time_value']
            my_scheduler.add_job(self.syn_upload_db, 'interval', minutes=15, max_instances=1, args=task_config)
            my_scheduler.start()
        elif task_config['sync_time_mode'] == 'fixed_time':
            try:
                hours, minutes, seconds = task_config['sync_time_value'].split(':')
            except:
                self.logger.error('-----get sync_time_value Error!-----')
                raise e
            else:
                my_scheduler.add_job(self.syn_upload_db, 'cron', year='*', month='*', day='*', hour=hours,
                                     minute=minutes, second=seconds, max_instances=1, args=task_config)
                my_scheduler.start()
        else:
            self.logger.error('----sysnc control config error-----')
            raise
    elif task_config['sync_type'] == 'download':
        if task_config['sync_time_mode'] == 'period':
            period_minutes = task_config['sync_time_value']
            my_scheduler.add_job(self.sync_download_db, 'interval', minutes=15, max_instances=1, args=task_config)
            my_scheduler.start()
        elif task_config['sync_time_mode'] == 'fixed_time':
            try:
                hours, minutes, seconds = task_config['sync_time_value'].split(':')
            except:
                self.logger.error('-----get sync_time_value Error!-----')
                raise e
            else:
                my_scheduler.add_job(self.sync_download_db, 'cron', year='*', month='*', day='*', hour=hours,
                                     minute=minutes, second=seconds, max_instances=1, args=task_config)
                my_scheduler.start()
        else:
            self.logger.error('----sysnc control config error-----')
            raise
    else:
        self.logger.error('----sync_type error-----')
        raise


def sync_download_db(self, config):
    # parsing the config to get the sql
    pass


def syn_upload_db(self, config):
    pass


def validating_content_token(self, content):
    receive_content = content['Business']['Content']
    receive_md5 = content['Info_Status']['Security_Token']
    receive_time = content['Info_Status']['Time']
    cur_md5 = self.calc_md5(receive_content + receive_time + self.key)
    if cur_md5 == receive_md5:
        return True
    else:
        return False
    pass


# encrypt decrypt
def packaging_message(self, Message=None, Error=None, Info_Status=None):
    # ----Pack send message
    # Init Communication_Packet
    cm_packet = Communication_Packet()

    # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True):
    cm_packet.set_content_Business(targv=Message)

    # def set_content_Error(self,error_flag = False,error_type = None,error_info = None):
    cm_packet.set_content_Error(Error)

    now_time = str(datetime.datetime.now())  # get current time
    # Business+time+key calculate token,calculate token
    security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key)
    # def set_content_Info_Status(self,info_type,security_token,time,is_end):
    # Need to replace the security_token
    Info_Status = list(Info_Status)  # 轉化元組到列表
    Info_Status[1] = security_token
    Info_Status = tuple(Info_Status)  # 重新轉化列表到元組作為參數

    cm_packet.set_content_Info_Status(targv=Info_Status)
    try:
        send_data = cm_packet.content
    except Exception, e:
        raise e
    else:
        # we Encryption data
        encryption_send_data = self.encryption.encrypt(send_data)
        # caculate md5
        encrypt_send_md5 = self.calc_md5(encryption_send_data)
        complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF
        return complete_send_data


def unpackaging_message(self, unpacking_str):
    if not instances(unpacking_str, str):
        raise exceptions.ValueError
    else:
        unpacking_str = unpacking_str.strip(self.EOF)
        flag, message = self.validating_message_token(unpacking_str)
        if flag:
            decrypt_str = self.encryption.decrypt(message)
            try:
                message_dict = eval(decrypt_str)
            except, Exception, e:
                self.logger.error('Eval decrypt_str Error!')
                raise e
            else:
                if validating_content_token(message_dict):
                    return message_dict
                else:
                    self.logger.error('Message is tampered!')
                    return None
            pass
        else:
            self.logger.error('Message is tampered!')
            return None
    pass


def recieve_packet(self, communication_socket):
    # 接收數據從服務器
    recieveData = None
    partdata = None
    try:
        while partdata is not None:
            try:
                partdata = communication_socket.recv(4096)
            except Exception, e:
                self.logger.error(str(e))
                break
            else:
                recieveData += partdata
                if partdata.endswith(self.EOF):
                    break
        recieveData = recieveData.rstrip(self.EOF)
        return recieveData
    except Exception, e:
        raise e
    pass


def insertdb_updatedb(self, handle_type, db_content, table_name, table_field, filter_condition=None):
    '''
    add
    '''
    try:
        db_data = eval(db_content)
    except Exception, e:
        self.logger.error('-----eval business_content error!-----')
        raise e
    else:
        try:
            first_field = eval(table_field)[0]
        except Exception, e:
            self.logger.error('-----eval table_field error!-----')
            raise e
        if first_field == 'id':
            is_id = True
        else:
            is_id = False
        for db_item in db_data:
            if is_id:
                rowdata = db_item[1:]
            else:
                rowdata = db_item
            self.logger.debug(rowdata)
            # print dict(zip(self.phishing_log_fields,rowdata))
            if handle_type == self.HANDLE_INSERT or handle_type == self.HANDLE_INERT_UPDATE:
                insert_sql = 'insert into %s %s values(%s);' % (
                table_name, table_field.replace('\'', ''), self.format_tuple(rowdata))

                # print insert_sql
                DB_ERROR = False
                with self.lock:
                    self.db.insertdb(insert_sql)
                    DB_ERROR = self.db.Error
                if DB_ERROR:
                    if handle_type == self.HANDLE_INERT_UPDATE:

                        pass
                    elif handle_type == self.HANDLE_INSERT:
                        pass
                    else:
                        pass
            elif handle_type == self.HANDLE_UPDATE:
                # update the data
                pass
            else:
                pass

        else:
            return True
    pass


def set_filter(self, filter, table_field, filter_key, filter_relationship):
    '''
    Func: set the filter for update the db when when handling the message if the handle_flag is  set HANDLE_INERT_UPDATE!
    Note: all the filter condithion are needed to be ordered!
    table_field: filter condithion field
    filter_key: = >= <= !=  if has like,like|>=... And so on
    filter_relationship: and、or、None
    '''
    if not filter:
        filter = dict(filter=[])
    else:
        if 'filter' not in filter.keys():
            filter['filter'] = []
    filter['filter'].append(table_field, filter_key, filter_relationship)

    return filter


def set_update_field(self, filter, update_field):
    '''
    Set the update fileds when update db!
    update_field:update filed list!
    '''
    filter['update_field'] = update_field


def parsing_filter(self, filter, table_name, table_fields, table_values, filter_field_values=None):
    '''
    Func: return the update sql
    '''
    update_values = []
    update_fields = filter['update_field']
    for field in update_fields:
        v_index = table_fields.index[field]
        up_value = table_values[v_index]
        update_values.append(up_value)

    update_sql_part_one = 'update %s set ' % table_name
    lenth = len(update_values)
    for i in range(lenth)
        up_value = update_values[i]
        up_field = update_fields[i]
        update_sql_part_one += '%s=\'%s\',' % (up_field, up_value)

    # strip the last comma    result like: update tablea set a = '1',b = '2'
    update_sql_part_one = update_sql_partone.rstrip(',')

    update_sql_part_two = 'where '
    for tmp_item in filter['filter']:
        field = tmp_item[0]
        filter_key = tmp_item[1].splite['|']
        relationship = tmp_item[2]

        # get filter condithion value. For example: where left(ip_loc,2) = 'China'
        if field in filter_field_values:
            filter_condition_value = filter_field_values[field]
        else:
            v_index = table_fields.index[field]
            filter_condition_value = table_values[v_index]
        if not relationship:
            relationship = ''
        if '(' in filter_key:
            update_sql_part_two += "%s%s %s " % (filter_key, filter_condition_value, relationship)
        else:
            update_sql_part_two += "%s%s%s %s" % (field, filter_key, filter_condition_value, relationship)

    # merge the sql statement
    update_sql = update_sql_part_one + update_sql_part_two.strip()
    return update_sql


def handle_message__packet(self, socket, handle_type=None, filter=None, handle_callback=None, is_recieved=False,
                           message_dict=None):
    '''
    socket: communication socket
    handle_type: handle message type(HANDLE_GENERAL,HANDLE_INSERT,HANDLE_UPDATE)
    filter: if handle_type is HANDLE_UPDATE,it is the update filter condithion
    handle_callback: default is None!if not None,it will use the handle_callback to handle the message
    is_recieved: default is False,if message is received.
    message_dict: if is_recieved is True,it is the received message_dict
    Note:    if use some args,you need to use 'key = vale' to passing parameters
    '''
    if not is_recieved:
        # if not recieved message,we receive the message and unpack the message
        try:
            message = self.recieve_packet(socket)
        except Exception, e:
            self.logger.error(str(e))
        else:
            message_dict = self.unpackaging_message(message)

    if message_dict:
        if handle_callback:
            # if callback is not None,use the handle_callback to hanle the message
            handle_callback(message_dict)
        else:
            # Pasing packet and get information
            # get bussiness type
            business = response_dict['Business']
            business_type = business['Type']
            business_content = business['Content']
            business_is_syncdb = business['Is_Syncdb']
            business_table_name = business['Table_Name']
            business_table_field = business['Table_Field']

            # get error info
            Error_info = response_dict['Error']
            error_flag = Error_info['Is_Error']
            error_type = Error_info['Error_Type']
            error_info = Error_info['Error_Info']

            # get packet info status
            info_status = response_dict['Info_Status']
            info_type = info_status['Info_Type']
            is_end = info_status['Is_End']
            token = info_status['Security_Token']
            info_time = info_status['Time']

            if handle_type == self.HANDLE_GENERAL:
                # This packet is GENERAL communication! hanling upload_db response message to make sure that server handle the business
                if communication_type == communication_packet.GENERAL_INFO and business_content == communication_packet.HANDLE_OK and info_type == communication_packet.RESPONSE
                    return True, is_end
                elif error_flag:
                    self.logger.error(
                        'Message\'type is \'%s\' and message\'error_info is \'%s\'' % (error_type, error_info))
                    return False, is_end
                else:
                    self.logger.error('This message packet is not the general message!')
                    return False, is_end
                pass
            elif handle_type == self.HANDLE_INSERT and info_type == communication_packet.RESPONSE:
                handle_flag = self.insertdb_updatedb(self.HANDLE_INSERT, business_content, business_table_name,
                                                     business_table_field)
                return handle_flag, is_end
                pass
            elif handle_type == self.HANDLE_UPDATE and info_type == communication_packet.RESPONSE:
                return False, is_end
                pass
            elif handle_type == self.HANDLE_INERT_UPDATE and info_type == communication_packet.RESPONSE:
                handle_flag = self.insertdb_updatedb(self.HANDLE_INERT_UPDATE, business_content, business_table_name,
                                                     business_table_field, filter)
                return handle_flag, is_end
            elif handle_type == self.HANDLE_UPDATE and info_type == communication_packet.RESPONSE:
                handle_flag = self.insertdb_updatedb(self.HANDLE_UPDATE, business_content, business_table_name,
                                                     business_table_field)
                return handle_flag, is_end
            elif handle_type == self.HANDLE_FILE and info_type == communication_packet.RESPONSE:
                handle_flag = self.handle_file_func(business_content)
                return handle_flag, is_end
                pass
            else:
                # 沒有處理請求包,因為服務器不會主動發起請求
                return False, is_end
                pass

    else:
        raise Exception('handle_response_packet\'message_dict is None!')

    pass


def upload_db(self, communication_socket, table_name, query_sql=None, ALL=True, table_field=None, Error_info=None):
    # Get the dbinfo which is sended
    # if db_data is Relatively large,it will raise exceptions,so you need contol the data
    DB_ERROR = False
    DB_CNT_ERROR = False
    if query_sql:
        with self.lock:
            db_data = self.db.fechdb(query_sql)
            DB_ERROR = self.db.Error

        # If query db error return None
        if DB_ERROR:
            self.logger.error('Query DB Error! Crrent query_sql: %s' % query_sql)
            return None

        if not db_data:
            # 如果沒數據則不需要向服務器傳輸數據,直接返回None
            self.logger.info('No Data need to be upload! table_name:    %s' % table_name)
            return None

        # Set packet args
        # def set_content_Business(self,b_type = Communication_Packet.GENERAL_INFO,b_content = Communication_Packet.HANDLE_OK,table_name = None,table_field = None,b_is_syncdb = False,targv = None):
        message = (Communication_Packet.UPLOAD_DB, str(db_data), table_name, table_field, True)
        # def set_content_Error(self,error_flag = False,error_type = None,error_info = None,targv = None):
        error_info = (False, None, None)
        # def set_content_Info_Status(self,info_type,security_token,time,is_end,targv = None):
        info_status = (Communication_Packet.REQEST, None, ALL)
        complete_send_data = self.packaging_message(message, error_info, info_status)
        try:
            communication_socket.sendall(complete_send_data)
        except, Exception, e:
            self.logger.error('Send data error when upload_db!')
            raise e
        else:
            if ALL:
                return True
            else:
                # 對應答包進行處理進行處理
                try:
                    handle_flag, business_is_end = self.handle_general_response_packet(communication_socket)
                except Exception, e:
                    raise e
                else:
                    if handle_flag and business_is_end:
                        return True
                        pass
                    else:
                        return False
                pass
    else:
        raise Exception('query_sql statement is None')


def download_db(self, communication_socket, handle_type=None, handle_message_callback=None, filter=None):
    # We receive data from server
    try:
        recieveData = self.recieve_packet(communication_socket)
    except Exception, e:
        self.logger.error('Download_db Error when receiving data!')
        raise e
    # unpacking message
    Message_dict = self.unpackaging_message(recieveData)

    if not message:
        raise Exception('download_db Message_dict Error!')
    if handle_message_callback:
        handle_flag = handle_message_callback(Message_dict)
    else:
        is_handle_complete, is_handle_again = self.handle_message__packet(communication_socket, is_recieved=True,
                                                                          message_dict=Message_dict, filter=filter)

    # parsing received data to dict
    # 這里需要用遞歸處理數據
    if is_handle_complete and not is_handle_again:
        self.download_db(communication_socket, handle_message_callback, filter)
    else:
        # 關閉socket連接,退出函數
        communication_socket.close()
    pass


def format_tuple(self, tup):
    '''
    It is None if field in DB is NULL when we get the data from db use mysqldb!
    Format the None to NuLL for inserting data to DB
    '''
    valuelist = ['NULL' if t is None else t for t in tup]
    padlist = ['%s' if t is None else '\'%s\'' for t in tup]
    padstr = ''
    for pl in padlist:
        padstr += pl
        padstr += ','
    else:
        padstr = padstr[:-1]
    return padstr % tuple(valuelist)


def sync_log_db(self, Start_Time=None, End_Time=None, Sync_All=False, ID=None):
    # pdb.set_trace()
    '''
    sync log db to Server!
    If error when sync log,it will return but not raise an exception!
    If raise an exception,it will impact tha main program
    '''
    self.logger.info('=====Start to sysnc log=====')
    try:
        # connect to server,get and communication socket
        sock = self.__connect_server(self.serverIP, self.serverPort)
    except:
        raise
    last_update_id = None  # updating update_id
    # query log db to get some info
    if Start_Time and End_Time:
        if datetime.datetime.strptime(Start_Time, '%Y-%m-%d %H:%M:%S') > datetime.datetime.strptime(End_Time,
                                                                                                    '%Y-%m-%d %H:%M:%S'):
            self.logger.error('-----sync_log_db argv\'Start_Time and End_Time is error!,End this task-----')
            # raise Exception('Start_Time and End_Time Error!')
            return
        query_cnt_sql = "select count(id),min(id),max(id) from phishing_log where time between '%s' and '%s';" % (
        Start_Time, End_Time)
    # query_log_sql = "select * from phishing_log where time between '%s' and '%s';" %(Start_Time,End_Time)
    elif Sync_All:
        query_cnt_sql = "select count(1),min(id),max(id) from phishing_log;"
    # query_log_sql = 'select * from phishing_log;'
    elif ID:
        query_cnt_sql = "select count(1),min(id),max(id) from phishing_log where id > %s;" % ID
    # query_log_sql = "select * from phishing_log where id > %s;" % ID
    else:
        query_update_id_sql = "select last_update_id,last_update_time from ph_log_sync_id;"
        DB_ERROR = False
        with self.lock:
            ID_Tuple = self.db.fechdb(query_update_id_sql)
            DB_ERROR = self.db.Error
        if DB_ERROR:
            DB_ERROR = False
            self.logger.error('-----Get id from ph_log_sync_id Error. End this task----')
        last_update_id = ID_Tuple[0][0]
        query_cnt_sql = "select count(1),min(id),max(id) from phishing_log where id > %s;" % last_update_id
    # query_log_sql = "select * from phishing_log where id > %s;" % last_update_id

    # get table fields!
    log_fields = self.get_table_filed(table_name, self.lock, self.db)
    if not table_field:
        self.logger.error(
            '-----Terminate this task(sync_log_db),becase of getting the %s table fileds fialed!-----' % table_name)
        return

    with self.lock:
        data_cnt = self.db.fechdb(query_cnt_sql)
        DB_ERROR = self.db.Error
    if DB_ERROR:
        # Record Error into log and end task
        DB_ERROR = False
        self.logger.error('-----Get log data count Error when sys log db! End this task-----')
        return
    else:
        # 若果數據超過10000條則分批進行同步處理
        cnt, min_id, max_id = data_cnt[0]
        upload_cnt = 1
        if cnt >= 10000:
            upload_cnt = (max_id - min_id + 1) / 1000 + 1
        # def upload_db(self,communication_socket,table_name,query_sql = None,ALL = True,table_field = None,Error_info = None):
        ALL = False
        log_table_name = 'phishing_log'
        for i in range(upload_cnt):
            start_id = min_id
            end_id = min_id + 10000
            if end_id >= max_id:
                end_id = max_id
                ALL = True
            query_log_sql = "select * from phishing_log where id >%s and id <=end_id" % (start_id, end_id)
            try:
                handle_ok = self.upload_db(sock, log_table_name, query_log_sql, ALL, log_fields, None)
            except Exception, e:
                self.logger.error('-----' + str(e) + 'when sysnc db!-----')
                return
            if handle_ok:
                continue
            else:
                self.logger.error("-----upload_db Error! And query_log_sql is '%s'.End this task-----" % query_log_sql)
                break
        else:
            # update update_id
            if last_update_id:
            cur_time = datetime.datetime.now()
            update_id_sql = "update ph_log_sync_id set last_update_id = '%s',last_update_time = '%s' where last_update_id = %s;" % (
            max_id, cur_time, last_update_id)
            BD_ERROR = False
            with self.lock:
                self.db.updatedb(update_id_sql)
                BD_ERROR = self.db.Error
            if BD_ERROR:
                inset_id_sql = "insert into ph_log_sync_id valuse(%s,%s)" % (max_id, cur_time)
                with self.lock:
                    self.db.insertdb(inset_id_sql)
                    BD_ERROR = self.db.Error
                if BD_ERROR:
                    self.logger.error(
                        '-----This sys log db error when update last_update_id! Old id is %s,new id is %s! End this task' % (
                        last_update_id, max_id) - ----)
            else:
                pass
        else:
        pass


self.logger.info('=====End to sysnc log=====')


def gain_newconfig_from_server(self):
    '''
    Get newest configure which is the fishing website regular expressions from server!
    '''

    self.logger.info('=====Start to gain newcofig from Server...=====')
    try:
        socket = self.__connect_server(self.serverIP, self.serverPort)  # 連接服務器
    except Exception, e:
        # raise Exception
        raise e

    # 查詢數據庫從數據庫獲取當前配置的版本信息
    query_version_sql = "select max(version),update_time from detection_version;"
    BD_ERROR = False
    with self.lock:
        version_tuple = self.db.fechdb(query_version_sql)
        DB_ERROR = self.db.Error
    if BD_ERROR:
        self.logger.error('-----Get local configure version Error! End this task-----')
        return
    local_version = version_tuple[0][0]
    local_update_time = version_tuple[0][1]
    table_name = 'detection_version'

    # Get detection_version fields
    # If get fields fialed,it will terminate this task!
    table_field = self.get_table_filed(table_name, self.lock, self.db)
    if not table_field:
        self.logger.error(
            '-----Terminate this task(gain_newconfig_from_server),becase of getting the %s table fileds fialed!-----' % table_name)
        return

    message_info = (Communication_Packet.DOLOAD_DB, str(local_version), True, table_name, table_field)
    error_info = (False, None, None)
    message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True)

    send_message = self.packaging_message(message_info, error_info, message_status)
    try:
        socket.sendall(send_message)
    except, Exception, e:
        # 發送出現錯誤則直接返回終止此次業務請求
        self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----')
        return
    self.download_db(socket, self.HANDLE_INSERT)
    self.logger.info('=====End to gain newcofig from Server!=====')


def get_detection_point_num(self):
    # Get detection_point_num from file or db and return it!
    # Reading the config file to get he detection_point_num!
    # wait to handle...
    detection_point_num = None
    is_detection_point = None

    return tdetection_point_num, is_detection_point


def get_table_filed(self, table_name, db, lock):
    # from the db get the table field!
    query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s';" % table_name
    with self.lock:
        detection_version_fields = self.db.fechdb(query_detection_version_field_sql)
        DB_ERROR = self.db.Error
    if DB_ERROR:
        # Record Error and end task
        DB_ERROR = False
        self.logger.error('----Get %s fileds Error! End this task-----' % table_name)
        return
    else:
        # query result is Unicode,so we need to encode to utf-8
        table_field = [field.encode('utf-8') for field in detection_version_fields[0]]
        return table_field


def sync_info_db(self):
    '''
    1、Get blacklist from server and write it to file for linkage equipment
    2、Sync info table to client point
    Use a way that depends on the config file
    '''
    self.logger.info('=====Start to get blacklist from Server=====')
    # Get communication socket
    try:
        sock = self.__connect_server(self.serverIP, self.serverPort)  # connect to server
    except Exception, e:
        self.logger.error(str(e))
        raise
    is_detection_point, point_num = self.get_detection_point_num()  # Need to handle again-----
    table_name = 'phishing_info'
    if is_detection_point:
        table_field = ('phishing_site')
    else:
        # get the table field from the db
        table_field = self.get_table_filed(table_name, self.lock, self.db)
        if not table_field:
            self.logger.error(
                '-----Terminate this task(gain_newconfig_from_server),becase of getting the %s table fileds fialed!-----' % table_name)
            return

            # packing the request packet
    message_info = (Communication_Packet.DOLOAD_DB, str(point_num), True, table_name, table_field)
    error_info = (False, None, None)
    message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True)

    send_message = self.packaging_message(message_info, error_info, message_status)
    try:
        socket.sendall(send_message)
    except, Exception, e:
        # if send data error,end this task!
        self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----')
        return

    if is_detection_point:
        # If detection piont we update the phishing_info
        self.download_db(socket, self.HANDLE_INSERT)
    else:
        # write the data to file!
        self.download_db(socket, self.HANDLE_FILE)
    self.logger.info('=====End to get phishing_info from Server!=====')


def sync_white_list(self, sync_flag=0):
    '''
    sync_flag:
        0、first upload the white list then download the white list from server
        1、only upload the white list
        2、only download the white list
    '''
    # Get communication socket
    try:
        sock = self.__connect_server(self.serverIP, self.serverPort)  # connect to server
    except Exception, e:
        self.logger.error(str(e))
        raise
    if sync_flag == 0 or sync_flag == 1:
        self.sync_client_white_list_to_server(sock)
    if sync_flag == 0 or sync_flag == 2:
        self.sync_server_white_list_to_client(sock)

    # close the network socket,end this task!
    sock.close()


def sync_server_white_list_to_client(self, communication_socket):
    # pdb.set_trace()
    '''
    Sync  server white list to this detection point
    communication_socket: Network socket
    '''
    self.logger.info('=====Start sync_server_white_list_to_client!=====')

    # get the last update white list time of this client!
    qurey_sql = "select max(update_time) from white_list where valid = true;"
    BD_ERROR = False
    with self.lock:
        loacal_last_update_time = self.db.fechdb(qurey_sql)
        BD_ERROR = self.db.Error
    if BD_ERROR:
        self.logger.error('-----Get white list last update time Error! End this task!-----')
        return
    if loacal_last_update_time:
        last_update_time = loacal_last_update_time[0][0]
    else:
        last_update_time = None

    table_name = 'white_list'
    table_field = self.get_table_filed(table_name, self.lock, self.db)
    if not table_field:
        self.logger.error(
            '-----Terminate this task(sync_server_white_list_to_client),becase of getting the %s table fileds fialed!-----' % table_name)
        return

    # packing the request packet
    message_info = (Communication_Packet.DOLOAD_DB, str(last_update_time), True, table_name, table_field)
    error_info = (False, None, None)
    message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True)
    send_message = self.packaging_message(message_info, error_info, message_status)
    try:
        socket.sendall(send_message)
    except, Exception, e:
        # if send data error,end this task!
        self.logger.error('-----When send the sync_server_white_list_to_client message request to server Error!-----')
        return

    # After send the request,start to download white list db data
    # update_sql = "update white_list set domain = '%s',valid = '%s',recorded = '%s',update_time = '%s',website_name = '%s',website_link = '%s',icp_code = '%s',organization = '%s'where domain = '%s';" % fielddata
    myfilter = dict()
    update_field = (
    'domain', 'valid', 'recorded', 'update_time', 'website_name', 'website_link', 'icp_code', 'organization')
    self.set_update_field(myfilter, update_field)

    # def set_filter(self,filter,table_field,filter_key,filter_relationship):
    self.set_filter('domain', '=', None)
    self.logger.debug(str(myfilter))
    self.download_db(communication_socket, self.HANDLE_INSERT, filter=myfilter)


def sync_client_white_list_to_server(self, communication_socket):
    # pdb.set_trace()
    '''
    upload the local white list to server!
    communication_socket: network socket
    '''
    self.logger.debug('=====Start sync_client_white_list_to_server!=====')
    # qeury the the lacal white list data which is needed to upload
    query_sql = "select * from white_list where update_time is NULL and valid = 1;"
    query_modified_sql = "select * from white_list where update_time is not NULL and valid = 0;"

    # get white_list table fields
    table_name = 'white_list'
    table_field = self.get_table_filed(table_name, self.lock, self.db)
    if not table_field:
        self.logger.error(
            '-----Terminate this task(sync_server_white_list_to_client),becase of getting the %s table fileds fialed!-----' % table_name)
        return

    # upload the new local added white list
    handle_ok = self.upload_db(sock, log_table_name, query_sql, False, log_fields, None)
    if handle_ok:
        # upload the loacl new modified white list
        handle_ok = self.upload_db(sock, log_table_name, query_modified_sql, True, log_fields, None)
        if handle_ok:
            self.logger.info('upload the loacl new modified white list OK!')
            self.logger.info('sync_client_white_list_to_server OK!')
        else:
            self.logger.error('-----UPLOAD the loacl new modified white list Error!-----')
    else:
        self.logger.error('' - ----UPLOAD
        the
        new
        local
        added
        white
        list
        Error!-----'')


        def task_scheduled(self):
            # muti process scheduling the task
            pass
    )

    def start_client(self):
        '''
    --interval--
        Parameters:
            weeks (int) – number of weeks to wait
            days (int) – number of days to wait
            hours (int) – number of hours to wait
            minutes (int) – number of minutes to wait
            seconds (int) – number of seconds to wait
            start_date (datetime|str) – starting point for the interval calculation
            end_date (datetime|str) – latest possible date/time to trigger on
            timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
    --cron--
        Parameters:
            year (int|str) – 4-digit year
            month (int|str) – month (1-12)
            day (int|str) – day of the (1-31)
            week (int|str) – ISO week (1-53)
            day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
            hour (int|str) – hour (0-23)
            minute (int|str) – minute (0-59)
            second (int|str) – second (0-59)
            start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
            end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
            timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)

    add_job:
        Parameters:
            func – callable (or a textual reference to one) to run at the given time
            trigger (str|apscheduler.triggers.base.BaseTrigger) – trigger that determines when func is called
            args (list|tuple) – list of positional arguments to call func with
            kwargs (dict) – dict of keyword arguments to call func with
            id (str|unicode) – explicit identifier for the job (for modifying it later)
            name (str|unicode) – textual description of the job
            misfire_grace_time (int) – seconds after the designated run time that the job is still allowed to be run
            coalesce (bool) – run once instead of many times if the scheduler determines that the job should be run more than once in succession
            max_instances (int) – maximum number of concurrently running instances allowed for this job
            next_run_time (datetime) – when to first run the job, regardless of the trigger (pass None to add the job as paused)
            jobstore (str|unicode) – alias of the job store to store the job in
            executor (str|unicode) – alias of the executor to run the job with
            replace_existing (bool) – True to replace an existing job with the same id (but retain the number of runs from the existing one)
    '''

        # 根據情況調度任務
        # new_task_sched = BlockingScheduler()
        # new_task_sched.add_job(self.synchronous_DB, 'interval', minutes = 5,max_instances = 1)
        # new_task_sched.start()
        # print '===================='
        '''
    self.tasksched.add_job(self.synchronous_DB, 'interval', minutes = 5,max_instances = 1)
    self.tasksched.add_job(self.get_newconfig, 'interval', days = 1,max_instances = 1)
    self.tasksched.add_job(self.get__new_blacklist, 'interval', days = 1,max_instances = 1)
    self.tasksched.add_job(self.upload_white_list, 'interval', days = 1,max_instances = 10)
    self.tasksched.add_job(self.updatewhitelist, 'interval', days = 1,max_instances = 10)
    '''
        self.tasksched.add_job(self.synchronous_DB, 'interval', minutes=15, max_instances=1)
        self.tasksched.add_job(self.get_newconfig, 'interval', minutes=30, max_instances=1)
        self.tasksched.add_job(self.get__new_blacklist, 'interval', minutes=30, max_instances=1)
        self.tasksched.add_job(self.upload_white_list, 'interval', minutes=30, max_instances=1)
        self.tasksched.add_job(self.updatewhitelist, 'interval', minutes=30, max_instances=1)
        self.tasksched.start()

    def connecttest(self):
        self.logger.info('Start connect server...')
        sock = self.__connect_server(self.serverIP, self.serverPort)  # 連接服務器
        self.logger.info('connect server ok!')
        time.sleep(10)

        self.logger.info('close sock')
        sock.shutdown(socket.SHUT_RDWR)
        sock.close()

    def test(self):
        '''
    debug
    '''
        # pdb.set_trace()
        # self.setsysdbflag(True)
        # self.updatewhitelist()
        # self.sysdbFirstFlag = True
        # self.synchronous_DB()
        # self.get__new_blacklist()
        # self.get_newconfig()
        # self.connecttest()
        self.upload_white_list()
        # self.updatewhitelist()

    def muilti_test(self):
        jobs = []
        get_black_list_thread = threading.Thread(target=self.get__new_blacklist)
        get_black_list_thread.daemon = True

        get_config_thread = threading.Thread(target=self.get_newconfig)
        get_config_thread.daemon = True

        upload_white_list_thread = threading.Thread(target=self.upload_white_list)
        upload_white_list_thread.daemon = True

        update_white_list_thread = threading.Thread(target=self.updatewhitelist)
        update_white_list_thread.daemon = True

        get_black_list_thread.start()
        jobs.append(get_black_list_thread)

        get_config_thread.start()
        jobs.append(get_config_thread)

        upload_white_list_thread.start()
        jobs.append(upload_white_list_thread)

        update_white_list_thread.start()
        jobs.append(update_white_list_thread)

        for t in jobs:
            t.join()

    if __name__ == '__main__':
        '''
    起進程出問題,所以只能起線程,還沒有找到具體原因
    '''
        # mydb = SQLdb(dbname = 'tmp')
        # myclient = Client(db = mydb)
        myclient = Client()
        if len(sys.argv) >= 2:
            try:
                flag = sys.argv[1]
            except:
                pass
            else:
                try:
                    flag = flag.title()
                    flag = eval(flag)
                except:
                    raise 'ARGV ERROR!'
                else:
                    if isinstance(flag, bool):
                        myclient.setsysdbflag(flag)
                    else:
                        raise 'ARGV ERROR!'
        # myclient.test()
        # myclient.muilti_test()
        myclient.start_client()

        '''
    jobs=[]
    for i in range(50):
        tmpclient = Client()
        newprocess = threading.Thread(target = Client.test,args=(tmpclient,))
        #newprocess = multiprocessing.Process(target = Client.test,args=(tmpclient,))  #使用進程就會出錯,查了一些資料好像是說是bug
        newprocess.start()
        jobs.append(newprocess)

    for t in jobs:
        t.join()
    '''
client

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM