python實現mysql的讀寫分離及負載均衡


        Oracle數據庫有其公司開發的配套rac來實現負載均衡,目前已知的最大節點數能到128個,但是其帶來的維護成本無疑是很高的,並且rac的穩定性也並不是特別理想,尤其是節點很多的時候。

       但是,相對mysql來說,rac的實用性要比mysql的配套集群軟件mysql-cluster要高很多。因為從網上了解到情況來看,很少公司在使用mysql-cluster,大多數企業都會選擇第三方代理軟件,例如MySQL Proxy、Mycat、haproxy等,但是這會引起另外一個問題:單點故障(包括mysql-cluster:管理節點)。如果要解決這個問題,就需要給代理軟件搭建集群,在訪問量很大的情況下,代理軟件的雙機或三機集群會成為訪問瓶頸,繼續增加其節點數,無疑會帶來各方面的成本。

那么,如何可以解決這個問題呢?

          解決上述問題,最好的方式個人認為應該是在程序中實現。通過和其他mysql DBA的溝通,也證實了這個想法。但是由此帶來的疑問也就產生了:會不會增加開發成本?對現有的應用系統做修改會不會改動很大?會不會增加后期版本升級的難度?等等。

        對於一個架構設計良好的應用系統可以很肯定的回答:不會。

        那么怎么算一個架構設計良好的應用系統呢?

       簡單來說,就是分層合理、功能模塊之間耦合性底。以本人的經驗來說,系統設計基本上可以划分為以下四層:

       1.  實體層:主要定義一些實體類

       2.  數據層:也可以叫SQL處理層。主要負責跟數據庫交互取得數據

       3.  業務處:主要是根據業務流程及功能區分模塊(或者說定義不同的業務類)

       4.  表現層:呈現最終結果給用戶

       實現上述功能(mysql的讀寫分離及負載均衡),在這四個層次中,僅僅涉及到數據層。

嚴格來說,對於設計良好的系統,只涉及到一個類的一個函數:在數據層中,一般都會單獨划分出一個連接類,並且這個連接類中會有一個連接函數,需要改動的就是這個函數:在讀取連接字符串之前加一個功能函數返回需要的主機、ip、端口號等信息(沒有開發經歷的同學可能理解這段話有點費勁)。

       流程圖如下:

        

           代碼如下:

           

import mmap
import json
import random
import mysql.connector
import time

##公有變量
#dbinfos={
#         "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1},
#         "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1}
#         }

dbinfos={}
mmap_file = None
mmap_time=None

##這個函數返回json格式的字符串,也是實現初始化數據庫信息的地方
##使用json格式是為了方便數據轉換,從字符串---》二進制--》字符串---》字典
##如果采用其它方式共享dbinfos的方法,可以不用此方式
##配置庫的地址
def get_json_str1():
    return json.dumps(dbinfos)

##讀取配置庫中的內容
def get_json_str():
    try:
        global dbinfos
        cnx = mysql.connector.connect(user='root', password='Abcd.1234',
                              host='192.168.42.60',
                              database='rwlb')
        cursor = cnx.cursor()
        cmdString="select * from rwlb"
        cnt=-1
        cursor.execute(cmdString)
        for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor:
            cnt=cnt+1
            dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status}
            dbinfos["db"+str(cnt)]=dict_db
        cursor.close()
        cnx.close()
        return json.dumps(dbinfos)
    except:
        cursor.close()
        cnx.close()
        return ""

##判斷是否能正常連接到數據庫
def check_conn_host():
    try:
        cnx = mysql.connector.connect(user='root', password='Abcd.1234',
                              host='192.168.42.60',
                              database='rwlb')
        cursor = cnx.cursor()
        cmdString="select user()"
        cnt=-1
        cursor.execute(cmdString)
        for user in cursor:
            cnt=len(user)
        cursor.close()
        cnx.close()
        return cnt
    except :
        return -1;


##select 屬於讀操作,其他屬於寫操作-----這里可以划分的更詳細,比如執行存儲過程等
def analyze_sql_state(sql):
    if "select" in sql:
        return "R"
    else:
        return "W"

##讀取時間信息
def read_mmap_time():
    global mmap_time,mmap_file
    mmap_time.seek(0)
    ##初始時間
    inittime=int(mmap_time.read().translate(None, b'\x00').decode())
    ##當前時間
    endtime=int(time.time())
    ##時間差
    dis_time=endtime-inittime
    print("dis_time:"+str(dis_time))
    #重新讀取數據
    if dis_time>10:
        ##當配置庫正常的情況下才重新讀取數據
        print(str(check_conn_host()))
        if check_conn_host()>0:           
            print("read data again")
            mmap_time.seek(0)
            mmap_file.seek(0)
            mmap_time.write(b'\x00')
            mmap_file.write(b'\x00')
            get_mmap_time()
            get_mmap_info()
        else:
            print("can not connect to host")            
    #不重新讀取數據
    else:
        print("do not read data again")
        

##從內存中讀取信息,
def read_mmap_info(sql):
    read_mmap_time()
    print("The data is in memory")
    global mmap_file,dict_db
    mmap_file.seek(0)
    ##把二進制轉換為字符串
    info_str=mmap_file.read().translate(None, b'\x00').decode()
    #3把字符串轉成json格式,方便后面轉換為字典使用
    infos=json.loads(info_str)   
    host_count=len(infos)
    ##權重列表
    listw=[]
    ##總的權重數量
    wtotal=0
    ##數據庫角色
    dbrole=analyze_sql_state(sql)
    ##根據權重初始化一個列表。這個是比較簡單的算法,所以權重和控制在100以內比較好----這里可以選擇其他比較好的算法
    for i in range(host_count):
        db="db"+str(i)
        if dbrole in infos[db]["role"]:
            if int(infos[db]["status"])==1:
                w=infos[db]["weight"]
                wtotal=wtotal+w
                for j in range(w):
                    listw.append(i)
    if wtotal >0:
        ##產生一個隨機數
        rad=random.randint(0,wtotal-1)
        ##讀取隨機數所在的列表位置的數據
        dbindex=listw[rad]
        ##確定選擇的是哪個db
        db="db"+str(dbindex)
        ##為dict_db賦值,即選取的db的信息
        dict_db=infos[db]
        return dict_db
    else :
        return {}


##如果內存中沒有時間信息,則向內存紅寫入時間信息
def get_mmap_time():
    global mmap_time
    ##第二個參數1024是設定的內存大小,單位:字節。如果內容較多,可以調大一點
    mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
    ##讀取有效比特數,不包括空比特
    cnt=mmap_time.read_byte()
    if cnt==0:
        print("Load time to memory")
        mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
        inittime=str(int(time.time()))
        mmap_time.write(inittime.encode())


##如果內存中沒有對應信息,則向內存中寫信息以供下次調用使用
def get_mmap_info():
    global mmap_file
    ##第二個參數1024是設定的內存大小,單位:字節。如果內容較多,可以調大一點
    mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
    ##讀取有效比特數,不包括空比特
    cnt=mmap_file.read_byte()
    if cnt==0:
        print("Load data to memory")
        mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
        mmap_file.write(get_json_str().encode())

##測試函數
def test1():
    get_mmap_time()
    get_mmap_info()
    for i in range(10):
        sql="select * from db"
        #sql="update t set col1=a where b=2"
        dbrole=analyze_sql_state(sql)
        dict_db=read_mmap_info(sql)
        print(dict_db["host"])

def test2():
    sql="select * from db"
    res=analyze_sql_state(sql)
    print("select:"+res)
    sql="update t set col1=a where b=2"
    res=analyze_sql_state(sql)
    print("update:"+res)
    sql="insert into t values(1,2)"
    res=analyze_sql_state(sql)
    print("insert:"+res)
    sql="delete from t where b=2"
    res=analyze_sql_state(sql)
    print("delete:"+res)


##類似主函數
if __name__=="__main__":
    test2()

 

測試結果:

 

 

從結果可以看出,只有第一次向內存加載數據,並且按照權重實現了負載均衡。

因為測試函數test1()寫的是固定語句,所以讀寫分離的結果沒有顯示出來。

 

另外:測試使用的數據庫表結構及數據:

 

 desc rwlb;
+---------+-------------+------+-----+---------+-------+
| Field   | Type        | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| host    | varchar(50) | YES  |     | NULL    |       |
| user    | varchar(50) | YES  |     | NULL    |       |
| pwd     | varchar(50) | YES  |     | NULL    |       |
| my_user | varchar(50) | YES  |     | NULL    |       |
| my_pwd  | varchar(50) | YES  |     | NULL    |       |
| role    | varchar(10) | YES  |     | NULL    |       |
| weight  | int(11)     | YES  |     | NULL    |       |
| status  | int(11)     | YES  |     | NULL    |       |
| port    | int(11)     | YES  |     | NULL    |       |
| db      | varchar(50) | YES  |     | NULL    |       |
+---------+-------------+------+-----+---------+-------+
select * from rwlb;
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+
| host          | user | pwd      | my_user | my_pwd    | role | weight | status | port | db   |
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+
| 192.168.42.60 | root | Abcd1234 | root    | Abcd.1234 | RW   |     10 |      1 | NULL | NULL |
| 192.168.42.61 | root | Abcd1234 | root    | Abcd.1234 | R    |     20 |      1 | NULL | NULL |
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM