基於tornado---異步並發接口


1、目的

  由於有多個程序和腳本需要對mysql進行讀寫數據庫,每次在腳本中進行數據庫的連接、用cursor進行操作過於麻煩,因此希望可以有一個腳本開放接口,只需要傳入sql語句,就可以返回結果回來。因此有需要一個可以支持並發量較大的腳本來進行數據庫操作。以上就要求我的接口具有異步非阻塞、在結果返回前保持長連接、並發大。因此單純的多線程和協程沒辦法滿足要求,就需要用到tornado框架。

2、程序

    1)客戶端代碼(通過requests 調用接口)

import reuqests
POST = requests.post # post請求方式
def db_query(sql, method='query'):
    db_api = 'http://127.0.0.1:8000/db/api'
    db_base = "dbbase"
    db_ret = POST(db_api, data=json_encode({'method': method, 'sql': sql, 'dbbase': db_base, 'pwd': 'password123'}))
    if db_ret.status_code == 200:
        if json_decode(db_ret.text)['status'] == 'True':
            db_ret_data = json_decode(db_ret.text)['data']
            return {'status': 'ok', 'data': db_ret_data, 'err': ''}
        else:
            return {'status': 'False', 'data': [], 'err': json_decode(db_ret.text)['data']}
    else:
        return {'status': 'False', 'data': [], 'err': 'connect error'}

 2)服務端代碼(基於tornado框架)

# coding=utf8
import MySQLdb
import MySQLdb.cursors
import tornado
import tornado.ioloop
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor


class DB_CONFIG:
    config = {
        'dbbase': ('authentic', 'password'), # 數據庫user、password
    }


class DB:
    def __init__(self, dbbase):
        user, password = DB_CONFIG.config.get(dbbase, (None, None))
        if user == None or password == None:
            raise Exception('KeyError', dbbase)
        db_host = '192.168.xx.xx'
        db_port = 1234

        self.db = MySQLdb.connect(db_host, user, password, dbbase, port=db_port, cursorclass=MySQLdb.cursors.DictCursor)

        self.cursor = self.db.cursor()

    def query(self, sql):
        self.cursor.execute(sql)
        return self.cursor.fetchall()

    def commit(self, sql):
        try:
            self.cursor.execute(sql)
            self.db.commit()
            return {'status': True, 'data': ''}
        except Exception as e:
            self.db.rollback()
            return {'status': False, 'data': e}

    def close(self):
        self.db.close()


class ServiceHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(900)  # 必須定義一個executor的屬性,然后run_on_executor裝飾器才會游泳。

    @run_on_executor  # 線程內運行;query函數被run_on_executor包裹(語法糖),將該函數的執行傳遞給線程池executor的線程執行,優化了處理耗時性任務,以致達到不阻塞主線程的效果。
    def query(self, dbname, method, sql):
        db = DB(dbname)
        ret = ''
        if method == 'query':
            ret = db.query(sql)
        elif method == 'commit':
            ret = db.commit(sql)
        db.close()
        return ret

    @tornado.web.asynchronous  # 保持長連接,直到處理后返回
    @tornado.gen.coroutine  # 異步、協程處理;增加並發量
    def post(self):
        data = tornado.escape.json_decode(self.request.body)  # 獲取參數,json.loads()解碼
        r = {'status': '', 'data': ''}
        if not data.get('pwd', None):
            r['status'], r['data'] = ('False', 'not password')
        elif not data.get('dbbase', None):
            r['status'], r['data'] = ('False', 'not DB select')
        else:
            if data['pwd'] != 'password123)': # 接口的密碼認證
                r = {'status': 'False', 'data': 'password error'}
            elif data['method'] == 'query':

                d = yield self.query(data['dbbase'], 'query', data['sql'])
                r = {'status': 'True', 'data': d}
            elif data['method'] == 'commit':

                db_r = yield self.query(data['dbbase'], 'commit', data['sql'])
                if db_r['status']:
                    r = {'status': 'True', 'data': 'commit sucessful'}
                else:
                    r = {'status': 'False', 'data': db_r['data']}
            else:
                r = {'status': 'False', 'data': 'method Invaild'}
        self.write(tornado.escape.json_encode(r))  # 寫入返回信息寫入response
        self.finish()  # 結束服務

    def get(self):
        return self.post()


if __name__ == "__main__":
    application = tornado.web.Application([
        (r"/db/api", ServiceHandler),  # 路由映射
    ])
    application.listen(8000)  # 監聽端口
    tornado.ioloop.IOLoop.instance().start()  # 啟動服務

 

3、請求舉例(post請求)

  1) sql語句

 sql = 'select vid from video where status=1 group by vid ORDER BY num ASC limit 100'

   2)客戶端調用db_query函數

db_ret = db_query(sql, 'query') # 其中query是數據庫操作的方法 query為查詢/commit 為insert/update/delete等數據庫修改操作時使用

  通過調用db_query傳入參數sql語句和操作方式,返回結果或錯誤內容


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM