上一篇文章簡單的實現了ORM(對象關系模型),這一篇文章主要實現簡單的MySQL數據庫操作。
想要操作數據庫,首先要建立一個數據庫連接。下面定義一個創建數據庫連接的函數,得到一個連接叫做engine。
def create_engine(user,password,database,host='127.0.0.1',port=3306,**kw):
import mysql.connector
global engine
if engine is not None:
raise DBError('Engine is already initialized.')
params = dict(user=user,password=password,database=database,host=host,port=port)
defaults = dict(use_unicode=True,charset='utf8',collation='utf8_general_ci',autocommit=False)
#print ('%s %s %s %s %s') % (user,password,database,host,port)
for k,v in defaults.iteritems():
params[k] = kw.pop(k,v)
params.update(kw)
params['buffered'] = True
engine = mysql.connector.connect(**params)
cursor = engine.cursor()
有了連接就可以對數據庫進行操作了。下面寫了幾個函數,可以對數據庫進行查詢和插入操作。
def _select(sql,first,*args):
cursor = None
sql = sql.replace('?','%s')
global engine
try:
cursor = engine.cursor()
cursor.execute(sql,args)
if cursor.description:
names = [x[0] for x in cursor.description]
if first:
values = cursor.fetchone()
if not values:
return None
return Dict(names,values)
return [Dict(names,x) for x in cursor.fetchall()]
finally:
if cursor:
cursor.close()
def select_one(sql,*args):
return _select(sql,True,*args)
def select(sql,*args):
return _select(sql,False,*args)
def _update(sql,*args):
cursor = None
global engine
sql = sql.replace('?','%s')
print sql
try:
cursor = engine.cursor()
cursor.execute(sql,args)
r = cursor.rowcount
engine.commit()
return r
finally:
if cursor:
cursor.close()
def insert(table,**kw):
cols, args = zip(*kw.iteritems())
sql = 'insert into %s (%s) values(%s)' % (table,','.join(['%s' % col for col in cols]),','.join(['?' for i in range(len(cols))]))
print ('sql %s args %s' % (sql, str(args)))
return _update(sql,*args)
到這里,基本的數據庫操作已經完成了。但是,根據廖雪峰的教程,這還遠遠不夠。
- 如果要在一個數據庫連接中實現多個操作,上面的代碼效率很低,沒次執行玩一條語句,就需要重新分配一個連接。
- 在一次事務中執行多條操作也是一樣效率低下。
- 如果服務器為不同用戶數據庫請求都分配一個線程來建立連接,但是在進程中,連接是可供享使用的。這樣問題就來了,導致數據庫操作可能異常。
針對第三個問題,應該使每個連接是每個線程擁有的,其它線程不能訪問,使用threading.local。首先定義一個類,來保存數據庫的上下文:
class _DbCtx(threading.local):
def __init__(self):
self.connection = None
self.transactions = 0
def is_init(self):
return not self.connection is None
def init(self):
self.connection = engine # 創建數據庫連接
self.transactions = 0
def cleanup(self):
self.connection.cleanup()
self.connection = None
def cursor(self):
return self.connection.cursor()
上面的代碼有一個錯誤。因為Python的賦值語句只是將一個對象的引用傳給一個變量,就如上面代碼中 init函數中 self.connection = engine。表明self.connection和engine都指向一個數據庫連接的對象。如果將self.connection給cleanup了,那么engine指向的對象也被cleanup了。下圖是一個例子:
a是類foo實例的一個引用,執行b=a后,在執行b.clean(),此時應該只是b的v值被更改為0,但是執行a.v卻發現v的值也變為0了。
下面是最后的代碼,只是封裝了最底層的數據庫操作,代碼也寫的很漲,雖然是模仿廖雪峰的代碼。
# -*- coding: utf-8 -*-
import time, uuid, functools, threading, logging
class Dict(dict):
'''
Simple dict but support access as x.y style.
'''
def __init__(self, names=(), values=(), **kw):
super(Dict, self).__init__(**kw)
for k, v in zip(names, values):
self[k] = v
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Dict' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
class DBError(Exception):
pass
class MultiColumnsError(Exception):
pass
engine = None
class _DbCtx(threading.local):
def __init__(self):
self.connection = None
self.transactions = 0
def is_init(self):
return not self.connection is None
def init(self):
self.connection = engine
self.transactions = 0
def cleanup(self):
self.connection = None
def cursor(self):
return self.connection.cursor()
def create_engine(user,password,database,host='127.0.0.1',port=3306,**kw):
import mysql.connector
global engine
if engine is not None:
raise DBError('Engine is already initialized.')
params = dict(user=user,password=password,database=database,host=host,port=port)
defaults = dict(use_unicode=True,charset='utf8',collation='utf8_general_ci',autocommit=False)
#print ('%s %s %s %s %s') % (user,password,database,host,port)
for k,v in defaults.iteritems():
params[k] = kw.pop(k,v)
params.update(kw)
params['buffered'] = True
engine = mysql.connector.connect(**params)
print type(engine)
_db_ctx = _DbCtx()
class _ConnectionCtx(object):
def __enter__(self):
self.should_cleanuo = False
if not _db_ctx.is_init():
cursor = engine.cursor()
_db_ctx.init()
self.should_cleanup = True
return self
def __exit__(self,exctype,excvalue,traceback):
if self.should_cleanup:
_db_ctx.cleanup()
def with_connection(func):
@functools.wraps(func)
def _wrapper(*args,**kw):
with _ConnectionCtx():
return func(*args, **kw)
return _wrapper
def _select(sql,first,*args):
cursor = None
sql = sql.replace('?','%s')
global _db_ctx
try:
cursor = _db_ctx.cursor()
cursor.execute(sql,args)
if cursor.description:
names = [x[0] for x in cursor.description]
if first:
values = cursor.fetchone()
if not values:
return None
return Dict(names,values)
return [Dict(names,x) for x in cursor.fetchall()]
finally:
if cursor:
cursor.close()
@with_connection
def select_one(sql,*args):
return _select(sql,True,*args)
@with_connection
def select_int(sql,*args):
d = _select(sql,True,*args)
if len(d) != 1:
raise MultoColumnsError('Except only one column.')
return d.values()[0]
@with_connection
def select(sql,*args):
global engine
print type(engine)
return _select(sql,False,*args)
@with_connection
def _update(sql,*args):
cursor = None
global _db_ctx
sql = sql.replace('?','%s')
print sql
try:
cursor = _db_ctx.cursor()
cursor.execute(sql,args)
r = cursor.rowcount
engine.commit()
return r
finally:
if cursor:
cursor.close()
def insert(table,**kw):
cols, args = zip(*kw.iteritems())
sql = 'insert into %s (%s) values(%s)' % (table,','.join(['%s' % col for col in cols]),','.join(['?' for i in range(len(cols))]))
print ('sql %s args %s' % (sql, str(args)))
return _update(sql,*args)
create_engine(user='root',password='z5201314',database='test')
u1 = select_one('select * from user where id=?',1)
print 'u1'
print u1
print 'start selet()...'
u2 = select('select * from user')
for item in u2:
print ('%s %s' % (item.name,item.id))
print 'name:%s id: %s' % (u1.name,u1.id)
