我的第一個python web開發框架(25)——定制ORM(一)


  在開始編寫ORM模塊之前,我們需要先對db_helper進行重構,因為ORM最終生成的sql是需要轉給db_helper來執行的,所以擁有一個功能完善、健壯的數據庫操作類是非常必要的。

  這是項目原db_helper.py代碼

#!/usr/bin/env python
# coding=utf-8

import psycopg2
from common import log_helper
from config import const

# 初始化數據庫參數
db_name = const.DB_NAME
db_host = const.DB_HOST
db_port = const.DB_PORT
db_user = const.DB_USER
db_pass = const.DB_PASS


def read(sql):
    """
    連接pg數據庫並進行數據查詢
    如果連接失敗,會把錯誤寫入日志中,並返回false,如果sql執行失敗,也會把錯誤寫入日志中,並返回false
    如果所有執行正常,則返回查詢到的數據,這個數據是經過轉換的,轉成字典格式,方便模板調用,其中字典的key是數據表里的字段名
    """
    try:
        # 連接數據庫
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 獲取游標
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('連接數據庫失敗:' + str(e.args))
        return False
    try:
        # 執行查詢操作
        cursor.execute(sql)
        # 將返回的結果轉換成字典格式
        data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()]
    except Exception as e:
        print(e.args)
        log_helper.error('sql執行失敗:' + str(e.args) + ' sql:' + str(sql))
        return False
    finally:
        # 關閉游標和數據庫鏈接
        cursor.close()
        conn.close()
    # 返回結果(字典格式)
    return data


def write(sql, vars):
    """
    連接pg數據庫並進行寫的操作
    如果連接失敗,會把錯誤寫入日志中,並返回false,如果sql執行失敗,也會把錯誤寫入日志中,並返回false,如果所有執行正常,則返回true
    """
    try:
        # 連接數據庫
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 獲取游標
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('連接數據庫失敗:' + str(e.args))
        return False
    try:
        # 執行sql語句
        cursor.execute(sql, vars)
        # 提交事務
        conn.commit()
    except Exception as e:
        print(e.args)
        # 如果出錯,則事務回滾
        conn.rollback()
        log_helper.error('sql執行失敗:' + str(e.args) + ' sql:' + str(sql))
        return False
    else:
        # 獲取數據
        try:
            data = [dict((cursor.description[i][0], value) for i, value in enumerate(row))
                         for row in cursor.fetchall()]
        except Exception as e:
            # 沒有設置returning或執行修改或刪除語句時,記錄不存在
            data = None
    finally:
        # 關閉游標和數據庫鏈接
        cursor.close()
        conn.close()

    # 如果寫入數據后,將數據庫返回的數據返回給調用者
    return data
View Code

  通過對代碼的簡單分析,可以看到整個模塊在初化時,載入數據庫鏈接配置,對數據庫的操作也只有簡單讀與寫操作。這樣的功能對於一般的數據庫增刪改查操作已經足夠了,但如果業務復雜,有多個庫、需要用到事務或者需要訪問不同類型數據庫時,它就不夠用了。所以首先要做的就是對它進行重構,功能進行完善。

  首先我們需要將配置獨立出來,當有需要鏈接多個數據庫時,可以讀取不同的配置文件,讓程序更加方便靈活。

  在config目錄下創建db_config.py文件(有多個庫時,可以配置多個不同的參數來引用)

#!/usr/bin/env python
# coding=utf-8


### 數據庫鏈接參數 ###
DB = {
    'db_host': '127.0.0.1',
    'db_port': 5432,
    'db_name': 'simple_db',
    'db_user': 'postgres',
    'db_pass': '123456'
}
# 是否將所有要執行的Sql語句輸出到日志里
IS_OUTPUT_SQL = False

  在配置中,我們同樣定義了數據庫連接地址、端口、數據庫名稱、用戶名與密碼。

  另外,為了方便我們進行排錯,檢查sql的生成情況,添加了IS_OUTPUT_SQL是否輸出執行的sql語句到日志中這一個開關項,設置為True時,所有被執行的sql語句都會被寫到日志中,方便下載日志下來進行分析。

 

  對於數據庫操作模塊,我們需要封裝成一個類,在有需要調用時,就可以通過with語句進行初始化操作,設置對應的數據庫鏈接配置,靈活的連接不同的數據庫。

  在設計操作類時,我們需要思考幾個問題:

  1.它可以支持多數據庫操作,即讀取不同的配置能連接操作不同的數據庫(可以通過類初始化時進行注入配置信息)

  2.它需要支持with語句,當我們忘記關閉數據庫游標和連接時,自動幫我們關閉(需要實現__enter__()與__exit__()方法)

  3.它需要支持數據庫事務,當執行失敗時,可以回滾數據,當所有sql執行都成功時,可以統一提交事務(需要創建rollback()與commit()方法)

  4.它需要支持查詢、添加、修改、刪除等操作,方便我們操作關系型數據庫記錄(需要創建sql執行方法)

  5.它需要支持sql執行優化,將超出指定執行時間的sql語句記錄到日志中,方便開發人員進行分析(需要記錄sql執行起始時間與結束時間,並進行計算,當這個時間大於指定值時執行日志寫入程序)

  根據這些要求,我們初步設計出數據庫操作類的基本模型

class PgHelper(object):
    """postgresql數據庫操作類"""

    def __init__(self, db, is_output_sql):
        """初始化數據庫操作類配置信息"""

    def open_conn(self):
        """連接數據庫,並建立游標"""

    def close_conn(self):
        """關閉postgresql數據庫鏈接"""

    def __enter__(self):
        """初始化數據庫鏈接"""

    def __exit__(self, type, value, trace):
        """關閉postgresql數據庫鏈接"""

    def rollback(self):
        """回滾操作"""

    def commit(self):
        """提交事務"""

    def execute(self, query, vars=None):
        """執行sql語句查詢,返回結果集或影響行數"""

    def write_log(self, start_time, end_time, sql):
        """記錄Sql執行超時日志"""

 

 

  接下來,我們來一一實現上面的各個方法

  首先是完成初始化方法,我們可以通過注入的方法,將db_config配置信息里的參數注入進來初始化。連接不同的數據庫時,可以注入不同的配置信息。

class PgHelper(object):
    """postgresql數據庫操作類"""

    def __init__(self, db, is_output_sql):
        self.connect = None
        self.cursor = None
        # 初始化數據庫參數
        self.db_name = db.get('db_name')
        self.db_user = db.get('db_user')
        self.db_pass = db.get('db_pass')
        self.db_host = db.get('db_host')
        self.db_port = db.get('db_port')
        # 是否將所有要執行的Sql語句輸出到日志里
        self.is_output_sql = is_output_sql

 

  然后我們來創建數據庫打開連接方法與關閉連接的方法,當數據庫連接失敗時會拋出異常,程序會自動調用log_helper.error()方法,將異常寫入日志當中,並第一時間發送郵件通知開發人員,方便開發人員即時排錯。

    def open_conn(self):
        """連接數據庫,並建立游標"""
        try:
            if not self.connect:
                self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port)
            return self.connect
        except Exception as e:
            log_helper.error('連接數據庫失敗:' + str(e.args))
            return False

    def close_conn(self):
        """關閉postgresql數據庫鏈接"""
        # 關閉游標
        try:
            if self.cursor:
                self.cursor.close()
        except Exception:
            pass
        # 關閉數據庫鏈接
        try:
            if self.connect:
                self.connect.close()
        except Exception:
            pass

 

  通過重寫內置__enter__()與__exit__()方法,來實現with語句調用本類時,會自動對類進行初始化操作,自動創建數據庫連接。當代碼執行完畢后(程序退出with語句時),程序會自動調用對應的方法,將游標與數據庫連接的關閉,避免手動操作時,忘記關閉連接出現異常。

    def __enter__(self):
        """初始化數據庫鏈接"""
        self.open_conn()
        return self

    def __exit__(self, type, value, trace):
        """關閉postgresql數據庫鏈接"""
        self.close_conn()

 

  為了方便事務處理,增加回滾方法。用於事務中執行操作失敗時,調用回滾方法執行回滾操作。

    def rollback(self):
        """回滾操作"""
        try:
            # 異常時,進行回滾操作
            if self.connect:
                self.connect.rollback()
                self.close_conn()
        except Exception as e:
            log_helper.error('回滾操作失敗:' + str(e.args))

 

  還需要增加事務提交方法,方便sql執行增刪改成功以后,提交事務更新數據。在開發中很多朋友經常會忘記執行提交事務操作,一直以為代碼有問題沒有執行成功。

    def commit(self):
        """提交事務"""
        try:
            if self.connect:
                self.connect.commit()
                self.close_conn()
        except Exception as e:
            log_helper.error('提交事務失敗:' + str(e.args))

 

  為了方便查看sql語句轉換效果,我們還可以增加獲取sql語句生成方法,當然這個方法並沒有太大的用途。

    def get_sql(self, query, vars=None):
        """獲取編譯后的sql語句"""
        # 記錄程序執行開始時間
        start_time = time.clock()
        try:
            # 判斷是否記錄sql執行語句
            if self.is_output_sql:
                log_helper.info('sql:' + str(query))
            # 建立游標
            self.cursor = self.connect.cursor()
            # 執行SQL
            self.data = self.cursor.mogrify(query, vars)
        except Exception as e:
            # 將異常寫入到日志中
            log_helper.error('sql生成失敗:' + str(e.args) + ' query:' + str(query))
            self.data = '獲取編譯sql失敗'
        finally:
            # 關閉游標
            self.cursor.close()
        # 記錄程序執行結束時間
        end_time = time.clock()
        # 寫入日志
        self.write_log(start_time, end_time, query)

        return self.data

  因為,當你直接使用完整的sql語句執行時,並不需要這個方法。但是,你使用的是下面方式,執行后就會生成組合好的sql語句,幫助我們分析sql語句生成情況

# 使用with方法,初始化數據庫連接
with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
    # 設置sql執行語句
    sql = """insert into product (name, code) values (%s, %s) returning id"""
    # 設置提交參數
    vars = ('zhangsan', '201807251234568')
    # 生成sql語句,並打印到控制台
    print(db.get_sql(sql, vars))

  輸出結果:

b"insert into product (name, code) values ('zhangsan', '201807251234568') returning id"

 

  數據庫最常見的操作就是增刪改查操作,由於postgresql有個非常好用的特殊參數:returning,它可以在sql執行增改刪結束后,返回我們想要的字段值,方便我們進行相應的判斷與操作,所以增改刪操作我們不需要將它與查詢操作分離成兩個方法,統一使用這個方法來獲取數據庫中返回的記錄值。

  在實現這個方法之前,我們設計時要思考這幾個問題:

  1.需要記錄程序執行的起始與結束時間,計算sql語句執行時長,用來判斷是否記錄到日志中,方便開發人員進行分析優化sql語句

  2.需要根據參數判斷,是否需要將所有執行的sql語句記錄到日志中,方便開發人員有需要時,查看執行了哪些sql語句,進行數據與功能分析

  3.由於類在加載時就已經自動連接數據庫了,所以在方法中不需要進行打開數據庫鏈接操作

  5.在執行sql語句時,需要創建游標,然后執行sql語句

  6.為了讓用戶更好的體驗,減少異常的直接拋出,需要進行異常捕捉,並將捕捉到的異常進行處理,記錄到日志中方便開發人員分析錯誤,同時同步發送推送給相關人員,即時提醒錯誤

  7.sql執行成功以后,需要對返回的數據進行處理,組合成字典類型,方便前端使用

  8.完成數據處理后,需要及時關閉游標

  9.對返回的數據需要進行處理后,返回給上一級程序

 1     def execute(self, query, vars=None):
 2         """執行sql語句查詢,返回結果集或影響行數"""
 3         if not query:
 4             return None
 5         # 記錄程序執行開始時間
 6         start_time = time.clock()
 7         try:
 8             # 判斷是否記錄sql執行語句
 9             if self.is_output_sql:
10                 log_helper.info('sql:' + str(query))
11             # 建立游標
12             self.cursor = self.connect.cursor()
13             # 執行SQL
14             result = self.cursor.execute(query, vars)
15             print(str(result))
16         except Exception as e:
17             # 將異常寫入到日志中
18             log_helper.error('sql執行失敗:' + str(e.args) + ' query:' + str(query))
19             self.data = None
20         else:
21             # 獲取數據
22             try:
23                 if self.cursor.description:
24                     # 在執行insert/update/delete等更新操作時,如果添加了returning,則讀取返回數據組合成字典返回
25                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
26                 else:
27                     # 如果執行insert/update/delete等更新操作時沒有添加returning,則返回影響行數,值為0時表時沒有數據被更新
28                     self.data = self.cursor.rowcount
29             except Exception as e:
30                 # 將異常寫入到日志中
31                 log_helper.error('數據獲取失敗:' + str(e.args) + ' query:' + str(query))
32                 self.data = None
33         finally:
34             # 關閉游標
35             self.cursor.close()
36         # 記錄程序執行結束時間
37         end_time = time.clock()
38         # 寫入日志
39         self.write_log(start_time, end_time, query)
40 
41         # 如果有返回數據,則把該數據返回給調用者
42         return self.data

 

   最后一個是記錄超時sql語句到日志方法,這里我將大於0.1秒的sql語句都記錄下來

    def write_log(self, start_time, end_time, sql):
        """記錄Sql執行超時日志"""
        t = end_time - start_time
        if (t) > 0.1:
            content = ' '.join(('run time:', str(t), 's sql:', sql))
            log_helper.info(content)

 

 

  完成的db_helper.py代碼

  1 #!/usr/bin/env python
  2 # coding=utf-8
  3 
  4 import psycopg2
  5 import time
  6 from io import StringIO
  7 from common import log_helper, file_helper
  8 
  9 
 10 class PgHelper(object):
 11     """postgresql數據庫操作類"""
 12 
 13     def __init__(self, db, is_output_sql):
 14         self.connect = None
 15         self.cursor = None
 16         # 初始化數據庫參數
 17         self.db_name = db.get('db_name', '')
 18         self.db_user = db.get('db_user', '')
 19         self.db_pass = db.get('db_pass', '')
 20         self.db_host = db.get('db_host', '')
 21         self.db_port = db.get('db_port', '')
 22         # 是否將所有要執行的Sql語句輸出到日志里
 23         self.is_output_sql = is_output_sql
 24 
 25     def open_conn(self):
 26         """連接數據庫,並建立游標"""
 27         try:
 28             if not self.connect:
 29                 self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass,
 30                                                 host=self.db_host, port=self.db_port)
 31             return self.connect
 32         except Exception as e:
 33             log_helper.error('連接數據庫失敗:' + str(e.args))
 34             return False
 35 
 36     def close_conn(self):
 37         """關閉postgresql數據庫鏈接"""
 38         # 關閉游標
 39         try:
 40             if self.cursor:
 41                 self.cursor.close()
 42         except Exception:
 43             pass
 44         # 關閉數據庫鏈接
 45         try:
 46             if self.connect:
 47                 self.connect.close()
 48         except Exception:
 49             pass
 50 
 51     def __enter__(self):
 52         """初始化數據庫鏈接"""
 53         self.open_conn()
 54         return self
 55 
 56     def __exit__(self, type, value, trace):
 57         """關閉postgresql數據庫鏈接"""
 58         self.close_conn()
 59 
 60     def rollback(self):
 61         """回滾操作"""
 62         try:
 63             # 異常時,進行回滾操作
 64             if self.connect:
 65                 self.connect.rollback()
 66         except Exception as e:
 67             log_helper.error('回滾操作失敗:' + str(e.args))
 68 
 69     def commit(self):
 70         """提交事務"""
 71         try:
 72             if self.connect:
 73                 self.connect.commit()
 74                 self.close_conn()
 75         except Exception as e:
 76             log_helper.error('提交事務失敗:' + str(e.args))
 77 
 78     def get_sql(self, query, vars=None):
 79         """獲取編譯后的sql語句"""
 80         # 記錄程序執行開始時間
 81         start_time = time.clock()
 82         try:
 83             # 判斷是否記錄sql執行語句
 84             if self.is_output_sql:
 85                 log_helper.info('sql:' + str(query))
 86             # 建立游標
 87             self.cursor = self.connect.cursor()
 88             # 執行SQL
 89             self.data = self.cursor.mogrify(query, vars)
 90         except Exception as e:
 91             # 將異常寫入到日志中
 92             log_helper.error('sql生成失敗:' + str(e.args) + ' query:' + str(query))
 93             self.data = '獲取編譯sql失敗'
 94         finally:
 95             # 關閉游標
 96             self.cursor.close()
 97         # 記錄程序執行結束時間
 98         end_time = time.clock()
 99         # 寫入日志
100         self.write_log(start_time, end_time, query)
101 
102         return self.data
103 
104     def copy(self, values, table_name, columns):
105         """
106         百萬級數據更新函數
107         :param values: 更新內容,字段之間用\t分隔,記錄之間用\n分隔 "1\taaa\tabc\n2\bbb\abc\n"
108         :param table_name: 要更新的表名稱
109         :param columns: 需要更新的字段名稱:例:('id','userame','passwd')
110         :return:
111         """
112         try:
113             # 建立游標
114             self.cursor = self.connect.cursor()
115             self.cursor.copy_from(StringIO(values), table_name, columns=columns)
116             self.connect.commit()
117             return True
118         except Exception as e:
119             # 將異常寫入到日志中
120             log_helper.error('批量更新失敗:' + str(e.args) + ' table:' + table_name)
121         finally:
122             # 關閉游標
123             self.cursor.close()
124 
125     def execute(self, query, vars=None):
126         """執行sql語句查詢,返回結果集或影響行數"""
127         if not query:
128             return None
129         # 記錄程序執行開始時間
130         start_time = time.clock()
131         try:
132             # 判斷是否記錄sql執行語句
133             if self.is_output_sql:
134                 log_helper.info('sql:' + str(query))
135             # 建立游標
136             self.cursor = self.connect.cursor()
137             # 執行SQL
138             result = self.cursor.execute(query, vars)
139             print(str(result))
140         except Exception as e:
141             # 將異常寫入到日志中
142             log_helper.error('sql執行失敗:' + str(e.args) + ' query:' + str(query))
143             self.data = None
144         else:
145             # 獲取數據
146             try:
147                 if self.cursor.description:
148                     # 在執行insert/update/delete等更新操作時,如果添加了returning,則讀取返回數據組合成字典返回
149                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
150                 else:
151                     # 如果執行insert/update/delete等更新操作時沒有添加returning,則返回影響行數,值為0時表時沒有數據被更新
152                     self.data = self.cursor.rowcount
153             except Exception as e:
154                 # 將異常寫入到日志中
155                 log_helper.error('數據獲取失敗:' + str(e.args) + ' query:' + str(query))
156                 self.data = None
157         finally:
158             # 關閉游標
159             self.cursor.close()
160         # 記錄程序執行結束時間
161         end_time = time.clock()
162         # 寫入日志
163         self.write_log(start_time, end_time, query)
164 
165         # 如果有返回數據,則把該數據返回給調用者
166         return self.data
167 
168 
169     def write_log(self, start_time, end_time, sql):
170         """記錄Sql執行超時日志"""
171         t = end_time - start_time
172         if (t) > 0.1:
173             content = ' '.join(('run time:', str(t), 's sql:', sql))
174             log_helper.info(content)
View Code

  測試代碼

 1 #!/usr/bin/evn python
 2 # coding=utf-8
 3 
 4 import unittest
 5 from common import db_helper
 6 from config import db_config
 7 
 8 class DbHelperTest(unittest.TestCase):
 9     """數據庫操作包測試類"""
10 
11     def setUp(self):
12         """初始化測試環境"""
13         print('------ini------')
14 
15     def tearDown(self):
16         """清理測試環境"""
17         print('------clear------')
18 
19     def test(self):
20         # 使用with方法,初始化數據庫連接
21         with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
22             # 設置sql執行語句
23             sql = """insert into product (name, code) values (%s, %s) returning id"""
24             # 設置提交參數
25             vars = ('張三', '201807251234568')
26             # 生成sql語句,並打印到控制台
27             print(db.get_sql(sql, vars))
28 
29             db.execute('select * from product where id=1000')
30             db.execute('insert into product (name, code) values (%s, %s) returning id', ('張三', '201807251234568'))
31             db.commit()
32 
33 if __name__ == '__main__':
34     unittest.main()
View Code

 

 

 

版權聲明:本文原創發表於 博客園,作者為 AllEmpty 本文歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則視為侵權。

python開發QQ群:669058475(本群已滿)、733466321(可以加2群)    作者博客:http://www.cnblogs.com/EmptyFS/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM