背景
平台目前的分析任務主要以Hive為主,分析后的結果存儲在HDFS,用戶通過REST API或者Rsync的方式獲取分析結果,這樣的方式帶來以下幾個問題:
(1)任務執行結束時間未知,用戶必須自行編寫代碼不斷地通過REST API請求分析結果,直至獲取到分析結果為止,其中還需要處理分析結果過大,轉而通過Rsync方式獲取;
(2)受限於Hive SQL的表達能力,用戶的計算邏輯無法完全表述,獲取分析結果后需要再計算,然后入庫;
(3)基於(1)、(2)的原因,用戶編寫大量復雜且冗余的代碼處理上述邏輯。
為了改善上述情況,平台設計的解決方案如下:
(1)使用Spark替換Hive(MapReduce)分析任務,結合Hive SQL和Spark API兩種方式,使用戶的計算邏輯可以得到很好的表述;
(2)用戶無需使用“Pull”的方式獲取結果,轉而使用“Post”的方式將分析結果存入自己的MySQL數據庫中;
(3)提供基於Python的MySQL通用批量寫入工具方便用戶使用。
方案
MySQL批量寫入工具需要解決以下幾個問題:
(1)通用:不受限於具體數據格式的束縛;
用戶需要將不同格式的數據寫入不同的數據庫/表,意味着數據庫的主機名、端口、用戶名、密碼、數據庫實例可以通過參數定制,不同的數據格式使用SQL語句表達,如“insert into students (c_number, c_name) values (%s, %s)”。
(2)多線程:使用多線程的方式,提高寫入的吞吐量;
典型的“生產者——消費者”問題,用戶需要將數據首先寫入一個線程安全的共享隊列中;“消費者”線程不斷的從這個共享隊列中獲取數據並緩存,待緩存數據達到一定數目時,將這批數據一次性寫出;然后繼續上述過程。
除此之外,“消費者”線程需要能夠“正常”結束。
(3)數據庫連接控制:需要避免連接過多或者頻繁連接帶來的性能開銷;
“消費者”線程需要能夠重復利用一定數目的數據庫連接,數據庫連接由專門的連接池提供,工作流程如下:
a. 從連接池中獲取數據庫連接;
b. 通過a中的連接將數據批量寫出並commit;
c. 將數據庫連接歸還給連接池。
(4)API:簡單易用
我們將批量寫入工具定義為一個“存儲引擎”,考慮到MySQL的吞吐量可能在大數據量的寫入下會成為一個瓶頸,后期會考慮擴展其它工具。因此定義一個基類表示抽象的“存儲引擎”,並擴展出本文具體討論的MySQL“存儲引擎”:MySQLStorageEngine。


MySQLStorageEngine的初始化過程涉及以下六個方面:
(1)begin、end用於統計一次批量寫入的耗時;
(2)接收用戶定制的參數;
host:數據庫主機名;
user:數據庫登錄用戶名;
passwd:數據庫登錄密碼;
db:數據庫實例
port:數據庫端口
charset:數據庫字符編碼
sql:寫入數據時使用的SQL語句,如“insert into students (c_number, c_name) values (%s, %s)”
threads:寫入線程數目;
bufferSize:寫入線程內部的緩存區大小,亦即每一次“batch”的大小;
mincached:數據庫連接池內部最小緩存連接數;
maxcached:數據庫連接池內部最大緩存連接數;
maxconnections:數據庫連接池所允許同時建立的最大連接數:目前與寫入線程數目相同。
(3)構建共享隊列queue;
(4)saveNum、storeNum用於統計用戶寫入總數及實際(成功)寫入總數,考慮到多線程使用環境,分別構建相應的鎖對象saveLock、storeLock;
(5)構建數據庫連接池,這里使用的是DBUtils PooledDB;
(6)構建寫入線程(多個)並啟動。
至此,MySQLStorageEngine實例創建完畢,並且啟動內部多個寫入線程用於消費隊列queue中數據。用戶可通過實例方法save寫入數據:

可見,save支持兩種類型的數據,一種是Tuple,另一種是Tuple數組,它們都被保存至隊列queue中,由寫入線程負責處理。
用戶寫入完成之后,可以通過方法close關閉“存儲引擎”,

需要注意的是,用戶寫入的數據實際是保存在隊列queue中的,“用戶寫入完成”並不代表隊列queue中的數據已全部被寫入線程消費且完成入庫,因此必須首先通知寫入線程用戶數據已全部寫入完畢(requestStop),然后等待寫入線程運行完畢(join),最后關閉數據庫連接池。
寫入線程由MySQLSaver實現,它的初始化過程特別簡單:
(1)接收“存儲引擎”實例engine;
(2)定義實例變量stop,初始值為False,用於表示用戶尚有數據寫入;True表示用戶寫入結束。

MySQLSaver的工作流程如下:

(1)初始化緩存區buffer,用於保存從隊列queue消費而來的數據;
(2)循環從隊列queue獲取數據,如果沒有獲取到數據,則執行(3);如果獲取到數據,則執行(4);
(3)如果用戶寫入結束(stop值為True)而且隊列中已經沒有剩余數據,將緩沖區buffer中的數據一次性寫入(__save),結束線程(break);
(4)將(2)中獲取到的數據存入緩存區,如果緩存區的大小達到數目限制,則將緩沖區buffer中的數據一次性寫入(__save),繼續(2)。
__save的工作流程如下:

(1)從連接池pool中獲取數據庫連接db並構建實例cursor;
(2)寫入buffer中的數據(executemany)並提交(commit);
(3)清空buffer、關閉實例cursor、將數據庫連接db“歸還”給連接池(close)。
使用示例如下:
