python 操作 sqlite 的示例代碼:
import time
import threading
import sqlite3
def nomal_producer(conn):
'''
@summary: producer defination
'''
counter = 0
conn.isolation_level = None
conn.row_factory = sqlite3.Row
while True:
# insert to db
cur = conn.cursor()
cur.execute("INSERT INTO datas(content, flag) VALUES (?, ?);", ("content %s"%counter, False))
counter = counter + 1
# conn.commit()
time.sleep(0.1)
def nomal_consumer(conn):
'''
@summary: consumer defination
'''
conn.isolation_level = None
conn.row_factory = sqlite3.Row
while True:
# select data
cur = conn.cursor()
cur.execute("SELECT * FROM datas ORDER BY id LIMIT 10;")
records = cur.fetchall()
if len(records) > 0:
print "begin to delete: "
print records
# delete records
for r in records:
conn.execute("DELETE FROM datas WHERE id = ?;", (r["id"], ))
time.sleep(0.5)
if __name__ == "__main__":
# init db
conn = sqlite3.connect('./db.sqlite', check_same_thread = False)
# conn = sqlite3.connect('./db.sqlite')
# init thread
producer = threading.Thread(target = nomal_producer, args = (conn,))
consumer = threading.Thread(target = nomal_consumer, args = (conn,))
# start threads
producer.start()
consumer.start()
在多進程操作 sqlite 的示例代碼中,采用 producer 和 consumer 的模式來處理,沒有特殊之處,但需要注意的是:在建立 sqlite3 的 connection 的時候,需要設置 check_same_thread = False。
另外,為了達到真正的 thread-safe,可以對 python 的 sqlite3 做進一步封裝,以達到僅有一個 thread 在操作 sqlite,原理很簡單,就是使用 queue 來處理所有操作請求並同時將結果返回到另外一個 queue 中去,示例代碼如下:
import sqlite3
from Queue import Queue
from threading import Thread
class SqliteMultithread(Thread):
"""
Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
This is done by internally queueing the requests and processing them sequentially
in a separate thread (in the same order they arrived).
"""
def __init__(self, filename, autocommit, journal_mode):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
self.journal_mode = journal_mode
self.reqs = Queue() # use request queue of unlimited size
self.setDaemon(True) # python2.5-compatible
self.start()
def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
cursor.execute('PRAGMA synchronous=OFF')
while True:
req, arg, res = self.reqs.get()
if req == '--close--':
break
elif req == '--commit--':
conn.commit()
else:
cursor.execute(req, arg)
if res:
for rec in cursor:
res.put(rec)
res.put('--no more--')
if self.autocommit:
conn.commit()
conn.close()
def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
"""
self.reqs.put((req, arg or tuple(), res))
def executemany(self, req, items):
for item in items:
self.execute(req, item)
def select(self, req, arg=None):
"""
Unlike sqlite's native select, this select doesn't handle iteration efficiently.
The result of `select` starts filling up with values as soon as the
request is dequeued, and although you can iterate over the result normally
(`for res in self.select(): ...`), the entire result will be in memory.
"""
res = Queue() # results of the select will appear as items in this queue
self.execute(req, arg, res)
while True:
rec = res.get()
if rec == '--no more--':
break
yield rec
def select_one(self, req, arg=None):
"""Return only the first row of the SELECT, or None if there are no matching rows."""
try:
return iter(self.select(req, arg)).next()
except StopIteration:
return None
def commit(self):
self.execute('--commit--')
def close(self):
self.execute('--close--')
#endclass SqliteMultithread
