1、目的
由於有多個程序和腳本需要對mysql進行讀寫數據庫,每次在腳本中進行數據庫的連接、用cursor進行操作過於麻煩,因此希望可以有一個腳本開放接口,只需要傳入sql語句,就可以返回結果回來。因此有需要一個可以支持並發量較大的腳本來進行數據庫操作。以上就要求我的接口具有異步非阻塞、在結果返回前保持長連接、並發大。因此單純的多線程和協程沒辦法滿足要求,就需要用到tornado框架。
2、程序
1)客戶端代碼(通過requests 調用接口)
import reuqests
POST = requests.post # post請求方式
def db_query(sql, method='query'): db_api = 'http://127.0.0.1:8000/db/api' db_base = "dbbase" db_ret = POST(db_api, data=json_encode({'method': method, 'sql': sql, 'dbbase': db_base, 'pwd': 'password123'})) if db_ret.status_code == 200: if json_decode(db_ret.text)['status'] == 'True': db_ret_data = json_decode(db_ret.text)['data'] return {'status': 'ok', 'data': db_ret_data, 'err': ''} else: return {'status': 'False', 'data': [], 'err': json_decode(db_ret.text)['data']} else: return {'status': 'False', 'data': [], 'err': 'connect error'}
2)服務端代碼(基於tornado框架)
# coding=utf8 import MySQLdb import MySQLdb.cursors import tornado import tornado.ioloop import tornado.web import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor class DB_CONFIG: config = { 'dbbase': ('authentic', 'password'), # 數據庫user、password } class DB: def __init__(self, dbbase): user, password = DB_CONFIG.config.get(dbbase, (None, None)) if user == None or password == None: raise Exception('KeyError', dbbase) db_host = '192.168.xx.xx' db_port = 1234 self.db = MySQLdb.connect(db_host, user, password, dbbase, port=db_port, cursorclass=MySQLdb.cursors.DictCursor) self.cursor = self.db.cursor() def query(self, sql): self.cursor.execute(sql) return self.cursor.fetchall() def commit(self, sql): try: self.cursor.execute(sql) self.db.commit() return {'status': True, 'data': ''} except Exception as e: self.db.rollback() return {'status': False, 'data': e} def close(self): self.db.close() class ServiceHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(900) # 必須定義一個executor的屬性,然后run_on_executor裝飾器才會游泳。 @run_on_executor # 線程內運行;query函數被run_on_executor包裹(語法糖),將該函數的執行傳遞給線程池executor的線程執行,優化了處理耗時性任務,以致達到不阻塞主線程的效果。 def query(self, dbname, method, sql): db = DB(dbname) ret = '' if method == 'query': ret = db.query(sql) elif method == 'commit': ret = db.commit(sql) db.close() return ret @tornado.web.asynchronous # 保持長連接,直到處理后返回 @tornado.gen.coroutine # 異步、協程處理;增加並發量 def post(self): data = tornado.escape.json_decode(self.request.body) # 獲取參數,json.loads()解碼 r = {'status': '', 'data': ''} if not data.get('pwd', None): r['status'], r['data'] = ('False', 'not password') elif not data.get('dbbase', None): r['status'], r['data'] = ('False', 'not DB select') else: if data['pwd'] != 'password123)': # 接口的密碼認證 r = {'status': 'False', 'data': 'password error'} elif data['method'] == 'query': d = yield self.query(data['dbbase'], 'query', data['sql']) r = {'status': 'True', 'data': d} elif data['method'] == 'commit': db_r = yield self.query(data['dbbase'], 'commit', data['sql']) if db_r['status']: r = {'status': 'True', 'data': 'commit sucessful'} else: r = {'status': 'False', 'data': db_r['data']} else: r = {'status': 'False', 'data': 'method Invaild'} self.write(tornado.escape.json_encode(r)) # 寫入返回信息寫入response self.finish() # 結束服務 def get(self): return self.post() if __name__ == "__main__": application = tornado.web.Application([ (r"/db/api", ServiceHandler), # 路由映射 ]) application.listen(8000) # 監聽端口 tornado.ioloop.IOLoop.instance().start() # 啟動服務
3、請求舉例(post請求)
1) sql語句
sql = 'select vid from video where status=1 group by vid ORDER BY num ASC limit 100'
2)客戶端調用db_query函數
db_ret = db_query(sql, 'query') # 其中query是數據庫操作的方法 query為查詢/commit 為insert/update/delete等數據庫修改操作時使用
通過調用db_query傳入參數sql語句和操作方式,返回結果或錯誤內容