多線程批量插入數據小結


       在測試的過程中,無法避免的需要做一些性能壓測,造數據的時長在此時就會備受關注。比如,造數據的時候用多線程還是多進程,用直接插入DB方式還是用先寫文件后導入mysql的方式,寫文件是寫批量sql后面source導入,還是寫文本格式后面load ito file導入,使用不同的方法耗時結果肯定是不一樣的。除此之外,還有mysql的版本,DB的引擎,表的結構設計這些都會影響大量數據插入的時間。

       這次導入數據做了一個小試驗:導入2000w筆數據到DB內。使用多線程的線程池技術,首先寫2000w筆數據分成M個文件,然后使用N個線程去並發處理寫好M個文件,並把M個文件導入到MYSQL中。之前同事寫的文件后面都是用load data from file命令導入的,官網也有說明這個命令字跟insert比,性能會高出20倍,於是把這個命令字跟我常用的source命令字做了下對比,結論如下:

       

 

一、Insert命令字詳解

       基於這些試驗和現象,確定load ito file加載數據的性能確實要比source導入大批量數據的性能更穩定,更快。但這又是為什么呢?查閱了下MYSQL的官網,insert命令字的插入過程如下,括號內的數字幾乎表示可能占用的耗時比:

      A: 打開連接:(3)

      B: 向服務器發送查詢:(2)

      C:解析查詢:(2)

      D:插入行:(1×行大小)

      E:插入索引:(1×數索引)

      F:關閉連接(1)

       以上步驟還不包含連接時,打開表的開銷。 當大量的insert批量文件被多線程執行插入時,每一個線程都需要經過6步才能完成數據的插入,表的索引結構,表當前數據的行數對insert的每次插入都會影響。如果想提升寫入大數據的性能,可以嘗試批量insert(即insert后的值有多個values),這在一般情況下會單個insert要快,但是要注意設置mysql的bulk_insert_buffer_size參數的大小,之前開發有一些經驗值,一般情況下是設置300-500一批插入性能最佳。但是想要性能更快更穩定,可以使用“LOAD DATA INFILE”,這個命令比單insert要快近20倍。

 

二、提升導入數據性能----mysql服務器端

     1、當將數據導入到INNODB中時,關閉自動提交模式,因為在自動提交模式下,每一次插入都會刷新一次日志到磁盤。可以使用如下語句:

SET autocommit=0;
... SQL import statements ...
COMMIT;

     2、關閉唯一索引。減少索引的插入和唯一性的校驗。

SET unique_checks=0;
... SQL import statements ...
SET unique_checks=1;

     3、關閉外鍵檢查來加速表導入。

SET foreign_key_checks=0;
... SQL import statements ...
SET foreign_key_checks=1;

     4、將innodb_autoinc_lock_mode設置為2,而不是默認值1。

     5、在執行批量插入時,以主鍵順序插入行更快。

     6、字符串的拼接用 .join > a += b ,因為 +=方式每次要重新計算內存/分配。

     7、python的多線程在IO密集的應用場景下,可以寫多個文件,讓多線程的優勢得到更充分的發揮。

     8、批量插入時,多利用字段的默認值,字段值如果使用默認值,會縮端插入過程中對數據解析的時間。

 

 三、過程遇到的問題以及解決辦法

     1、怎么快速刪除2000w筆數據?

        使用truncate 命令字,幾秒內就能刪除數據。如 :truncate table t_dc;

     2、測試機器上網速很慢,無法安裝python的第三方庫怎么辦?

        在已經安裝好的python的site-packages下復制使用的py腳本,然后放到對應機器上。如threadpool.py無法安裝時,最快速的方法就是手工復制文件Python27\Lib\site-packages\threadpool.py使用。

     3、mysqldb執行報錯:Lock wait timeout exceeded; try restarting transaction

        設置全局等待事務鎖超時時間 :SET GLOBAL innodb_lock_wait_timeout=100;

        查詢全局等待事務鎖超時時間 :SHOW GLOBAL VARIABLES LIKE 'innodb_lock_wait_timeout';

     4、mysql事務鎖如何查看:

       在information_schema下面有三張表:INNODB_TRX、INNODB_LOCKS、INNODB_LOCK_WAITS(解決問題方法),通過這三張表,可以更簡單地監控當前的事務並分析可能存在的問題。

       當前運行的所有事務 :select * from information_schema.innodb_trx;

       當前出現的鎖 :select * from information_schema.innodb_locks;

       鎖等待的對應關系:select * from information_schema.innodb_lock_waits;

 

 

 四:批量腳本(線程池+lLOAD DATA LOCAL INFILE)

#!/usr/bin/env python
#coding=GBK
import threadpool
import time,sys
from subprocess import call,Popen,PIPE

COUNT = 1000
process_num = 5
seperate = 100
dbHost='100.92.174.16'
dbUser='root'
dbPasswd='root1234'
dbOperater=None
baseData = {}
thread_arr=[]
detail_data={}
listid = 16080802011100100001
Ftde_id = 1
Fbank_list = 20171217761623447701
tablename = "epcc_check.t_dc_list"
resultDir = "/data/home/loleinaliao/loleinatext/"


#columns
order_columns="Fcheck_bank, Facc_day, Fbankaccno, Fbankusername, Famount, Fdc_type_id, Fori_accno, Fbank_status,Fid,Fbank_listid,Fbatchno"

#data
order_base="'4251','2018022721','6225425177777777800004','wltest','1','16','6225425177777777800004','00'"
tablename ="epcc_check_201810.t_dc_list_06"


def writeDownSqlData(fileName,content):
    fo = open(fileName,"w")
    fo.write(content)
    fo.close()

def make_t_tcpay_list(deal_num,Flistid,Ftde_id,Fbank_list):

 
    filename ="data_order_"+Fbank_list+".text"

    Fbatchno = 'B201810070011'
    orderDataList = []

    for i in range(int(deal_num)):
        orderData=""
        orderData = "%s,'%s','%s','%s'" % (order_base, Ftde_id, Fbank_list, Fbatchno)
        orderDataList.append(orderData)
        Ftde_id = int(Ftde_id) + 1
        Flistid = int(Flistid) + 1
        Fbank_list = int(Fbank_list) + 1

    writeDownSqlData(resultDir+filename, "\n".join(orderDataList) + "\n")

    loadDataIntoDB(resultDir+filename,tablename,order_columns)

def loadDataIntoDB(filename,tableName,order_columns):
    mysqlCmd = r"LOAD DATA LOCAL INFILE '%s' into table %s FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\''  LINES TERMINATED BY '\n' (%s"%(filename,tableName,order_columns)+")"

    mysqlConCmd = r"mysql -u%s -p%s -h%s"%(dbUser,dbPasswd,dbHost)
    runCmd = mysqlConCmd+' -e "'+mysqlCmd+'"'
    result = call(runCmd,shell=True)
    if result != 0:
        print "load local data into database failed,exit "
        sys.exit()


if __name__ == "__main__":
    start = time.time()

    begin_Flistid ='110180809100012153304210311120'
    begin_Ftde_id ="1"
    begin_Fbank_seq ="2018100800000110734321790770100"
    total_num  =20000000
    threadpool_num = 20
    func_var=[]
    seperate =2000

    pool = threadpool.ThreadPool(threadpool_num)

    for i  in range(seperate):
        list_temp =[]
        list_temp =[str(total_num/seperate),begin_Flistid,begin_Ftde_id,begin_Fbank_seq]
        func_var.append((list_temp,None))
        begin_Flistid = str(int(begin_Flistid)+ total_num/threadpool_num)
        begin_Ftde_id = str(int(begin_Ftde_id)+ total_num/threadpool_num)
        begin_Fbank_seq = str(int(begin_Fbank_seq)+ total_num/threadpool_num)

    pool = threadpool.ThreadPool(threadpool_num)
    requests = threadpool.makeRequests(make_t_tcpay_list, func_var)
    for req in requests:
        pool.putRequest(req)
    pool.wait()

    end = time.time()
    print end - start

 

 

       參考文檔:

      https://dev.mysql.com/doc/refman/5.7/en/insert-optimization.html

      http://mysql.taobao.org/monthly/2017/09/10/

      https://dbahire.com/testing-the-fastest-way-to-import-a-table-into-mysql-and-some-interesting-5-7-performance-results/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM