SQLite數據庫(www.sqlite.org)是一個輕型的數據庫引擎,應用非常廣泛。我們使用SQLite來存儲采集到的通訊數據,一幀通訊原始數據加上時間戳后作為一條記錄存入數據庫中。在這個項目中,只需要用到數據庫的存儲功能,不需要查詢之類的(到本地上位機上解析數據時才會用到),所以下面也只介紹存儲相關的部分,其他功能可以參考這個網址:
https://www.runoob.com/sqlite/sqlite-python.html
程序說明
用下面這些函數就可以插入記錄完成存儲,還需要解決的是SQL語句:sql_create_table、sql_insert_record,以及記錄集合:records_list。
1 import sqlite3 2 3 # 連接到SQLite數據庫:DB_NAME,如果不存在就創建 4 conn = sqlite3.connect(DB_NAME) 5 # 創建一個游標Cursor,可以通過Cursor來執行SQL語句 6 cursor = conn.cursor() 7 # 創建記錄表:records 8 cursor.execute(sql_create_table) 9 # 插入多條記錄 10 cursor.executemany(sql_insert_record, records_list) 11 # 關閉Cursor 12 cursor.close() 13 # 提交事務 14 conn.commit() 15 # 關閉數據庫連接 16 conn.close()
>> SQL語句:sql_create_table、sql_insert_record
sql_create_table
在數據庫中建一個表records,表中記錄的結構為:單條記錄結構:id(自增),時間戳(text),從數據庫創建開始的累計時間(real),通訊原始數據(400個,int)
sql_insert_record
插入記錄時只需要:時間戳、累計時間、通訊數據*400
1 def init_sql_sentences(param_num): 2 global sql_create_table 3 global sql_insert_record 4 5 # sql_create_table 6 sql_create_table = 'create table if not exists records(' 7 sql_create_table += 'id integer primary key autoincrement, ' 8 sql_create_table += 'datetime text, ' 9 sql_create_table += 'runningtime real, ' 10 for i in range(0, param_num - 1): 11 sql_create_table += 'param' + str(i) + ' int, ' 12 sql_create_table += 'param' + str(param_num - 1) + ' int)' 13 14 # sql_insert_record 15 sql_insert_record = 'insert into records values (null, ?, ?, ' 16 for i in range(0, param_num - 1): 17 sql_insert_record += '?, ' 18 sql_insert_record += '?)'
>> 記錄集合:records_list
按照記錄的格式,將時間戳、通訊數據等存入單條記錄record_buff[],然后再把record_buff[]添加到到記錄集records_list[],這里使用了tuple,把record_buff[]轉換成元組,這樣就負荷cursor.executemany()函數要求的格式了。
1 records_list = [] 2 record_buff = [] 3 record_buff.append('') 4 record_buff.append(0.0) 5 for i in range(0, DB_PARAMS_LENGTH): 6 record_buff.append(0) 7 8 #### 9 record_buff[0] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) 10 record_buff[1] = round((time.time() - start_time), 1) 11 for i in range(0, DB_PARAMS_LENGTH): 12 record_buff[i + 2] = comn_data[i] 13 comn_data[i] = 0 14 records_list.append(tuple(record_buff[:]))
用模擬通訊數據,生成100條記錄,插入數據庫,等待0.4s。循環10次以后退出。
pi@raspberrypi:~ $ sudo nano test_sqlite.py
1 # -*- coding:utf-8 -*- 2 import time 3 import sqlite3 4 import os 5 from datetime import datetime 6 7 DEFAULT_DIR = '/home/pi/project/' 8 DB_DIR = '/home/pi/project/data/' 9 DB_TMP_DIR = '/home/pi/project/data/tmp/' 10 DB_UPDATING_DIR = '/home/pi/project/data/updating/' 11 DB_UPDATED_DIR = '/home/pi/project/data/updated/' 12 13 DB_NAME = '' 14 15 PRODUCT_TYPE = 'NULL' 16 DEVICE_ID = 999 17 DB_PARAMS_LENGTH = 400 18 19 sql_create_table = '' 20 sql_insert_record = '' 21 22 def init_sql_sentences(param_num): 23 #### 24 25 def main(): 26 global PRODUCT_TYPE 27 global DEVICE_ID 28 global DB_PARAMS_LENGTH 29 30 record_count = 0 31 insert_count = 0 32 33 record_buff = [] 34 records_list = [] 35 comn_data = [0 for i in range(DB_PARAMS_LENGTH)] 36 37 record_buff.append('') 38 record_buff.append(0.0) 39 for i in range(0, DB_PARAMS_LENGTH): 40 record_buff.append(0) 41 42 if not os.path.exists(DEFAULT_DIR): 43 os.mkdir(DEFAULT_DIR) 44 if not os.path.exists(DB_DIR): 45 os.mkdir(DB_DIR) 46 if not os.path.exists(DB_TMP_DIR): 47 os.mkdir(DB_TMP_DIR) 48 if not os.path.exists(DB_UPDATING_DIR): 49 os.mkdir(DB_UPDATING_DIR) 50 if not os.path.exists(DB_UPDATED_DIR): 51 os.mkdir(DB_UPDATED_DIR) 52 53 init_sql_sentences(DB_PARAMS_LENGTH) 54 55 DB_NAME = DB_TMP_DIR+PRODUCT_TYPE + '_' + str(DEVICE_ID).rjust(3,'0') + '_' \ 56 + time.strftime('%Y-%m-%d_%H%M%S', time.localtime()) + '.db3' 57 58 print('>> sqlite test start...') 59 print(DB_NAME) 60 61 start_time = time.time() 62 while insert_count < 10: 63 # set fake data 64 for i in range(0, DB_PARAMS_LENGTH): 65 comn_data[i] = i 66 comn_data[0] = record_count 67 68 record_buff[0] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) 69 record_buff[1] = round((time.time() - start_time), 1) 70 for i in range(0, DB_PARAMS_LENGTH): 71 record_buff[i + 2] = comn_data[i] 72 comn_data[i] = 0 73 74 records_list.append(tuple(record_buff[:])) 75 record_count = record_count + 1 76 77 if((record_count % 100) == 0): 78 conn = sqlite3.connect(DB_NAME) 79 cursor = conn.cursor() 80 cursor.execute(sql_create_table) 81 cursor.executemany(sql_insert_record, records_list) 82 cursor.close() 83 conn.commit() 84 conn.close() 85 86 records_list = [] 87 records_need_save = False 88 89 print('>> sqlite insert records: ' + \ 90 str(insert_count * 100) + ' - ' + str(insert_count * 100 + 99)) 91 insert_count = insert_count + 1 92 time.sleep(0.4) 93 94 print('>> sqlite test finish...') 95 96 if __name__ == '__main__': 97 main()
運行結果
運行test_sqlite.py,顯示總共插入10次(每次有100條記錄)
pi@raspberrypi:~ $ python test_sqlite.py
查看數據庫是否存在,顯示新生成一個數據庫:NULL_999_2020-04-23_160903.db3
pi@raspberrypi:~ $ cd /home/pi/project/data/tmp/
pi@raspberrypi:~/project/data/tmp $ ls -lh
在同一無線網絡系啊,使用FileZilla軟件登陸樹莓派,把數據庫文件從樹莓派中拷貝到電腦上。
主機:raspberrypi.local,用戶名:pi,密碼:raspberry,端口:22,點擊 快速連接
遠程站點中,進入/home/pi/project/data/tmp目錄
將下面的數據庫文件復制到本地站點的一個目錄下(雙擊或者拖拽都可以):
用SQLite Expert打開這個文件,點擊records表,可以看到里面的數據:
擴展內容:存儲邏輯
>> 存儲位置
創建3個文件夾:<臨時>文件夾、<待上傳>文件夾、<已上傳>文件夾
臨時 | 新收到通訊數據時,在該文件夾下建立數據庫,持續寫入數據; 持續寫入1小時/持續1分鍾未收到新數據,關閉該數據庫,移動至<待上傳>; 上電后首先將該目錄下的數據庫文件移動至<待上傳> |
待上傳 | 數據上傳服務程序每隔30s掃描一次該目錄 如果有文件,則開始上傳第1個文件,上傳完成后將該文件移動至<已上傳> 如果采集裝置離線,則該目錄為實際存儲目錄 |
已上傳 | 如果采集裝置在線,則該目錄為實際存儲目錄 |
>> 批量寫入
為了提高寫入效率,可以把記錄先放在內存里,攢夠N條記錄(100條)后再一次性寫入到數據庫中。當然如果較長的一段時間(10s)沒有收到新的數據幀,就會認為被監聽的設備掉線,這時把已經有的記錄寫入到數據庫中。
>> 定時打包
連續采集較長時間(1個小時),或者連續一段時間(≥1分鍾)沒有收到新的數據幀,就結束往當前數據庫寫入記錄,相當於把這段時間的數據打包成一個數據庫,便於后續使用。把這個數據庫移動到待上傳的文件夾中,然后再新建一個數據庫繼續采集。