MySQL集群讀寫分離的自定義實現


基於MySQL Router可以實現高可用,讀寫分離,負載均衡之類的,MySQL Router可以說是非常輕量級的一個中間件了。
看了一下MySQL Router的原理,其實並不復雜,原理也並不難理解,其實就是一個類似於VIP的代理功能,其中一個MySQL Router有兩個端口號,分別是對讀和寫的轉發。
至於選擇哪個端口號,需要在申請連接的時候自定義選擇,換句話說就是在生成連接字符串的時候,要指明是讀操作還是寫操作,然后由MySQL Router轉發到具體的服務器上。

引用這里的話說就是
一般來說,通過不同端口實現讀/寫分離,並非好方法,最大的原因是需要在應用程序代碼中指定這些連接端口。
但是,MySQL Router只能通過這種方式實現讀寫分離,所以MySQL Router拿來當玩具玩玩就好。其原理參考下圖,相關安裝配置等非常簡單

 

其實暫不論“MySQL Router拿來當玩具玩玩就好”,類似需要自己指定端口(或者說指定讀寫)來實現讀寫分離這種方式,自己完全可以實現,又何必用一個中間件呢?
對於MySQL Router來說,它自己本身又是單點的,還要考慮Router自身的高可用(解決了一個問題的同時又引入一個問題)。
很早之前就在想,可不可以嘗試不借助中間件,也就無需關注中間件自身的高可用,自己實現讀寫分離呢?


對於最簡單的master-salve復制的集群方式的讀寫分離,
可以集群中的不同節點指定不同的優先級,把master服務器的優先級指定到最高,其余兩個指定成一個較低的優先級
對於應用程序發起的請求,需要指明是讀還是寫,如果是寫操作,就指定到master上執行,如果是讀操作,就隨機地指向slave操作,完全可以在連接層就實現類似於MySQL Router的功能。
其實非常簡單,花不了多久就可以實現類似這么一個功能,在連接層實現讀寫分離,高可用,負載均衡,demo一個代碼實現。

 

如下簡單從數據庫連接層實現了讀寫分離以及負載均衡。
1,寫請求指向連接字符串中最高優先級的master,如果指定的最高優先級實例不可用,這里假如是實現了故障轉移,依次尋找次優先級的實例
2,slave復制master的數據,讀請求隨機指向不同的slave,一旦某個slave不可用,繼續尋找其他的slave
3,維護一個連接池,連接一律從連接池中獲取。

故障轉移可以獨立實現,不需要在連接層做,連接層也不是做故障轉移的。這樣一旦發生故障,只要實現了故障轉移,應用程序端可以不用做任何修改。

# -*- coding: utf-8 -*-
import pymysql
import random
from DBUtils.PooledDB import PooledDB
import socket


class MySQLRouter:

    operation = None
    conn_list = []

    def __init__(self, *args, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)

    # 探測實例端口號
    @staticmethod
    def get_mysqlservice_status(host,port):
        mysql_stat = None
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        result = s.connect_ex((host, int(port)))
        # port os open
        if (result == 0):
            mysql_stat = 1
        return mysql_stat

    def get_connection(self):
        if not conn_list:
            raise("no config error")

        conn = None
        current_conn = None
        # 依據節點優先級排序
        self.conn_list.sort(key=lambda k: (k.get('priority', 0)))
        #寫或者未定義請求,一律指向高優先級的服務器,可讀寫
        if(self.operation.lower() == "write") or not self.operation:
            for conn in conn_list:
                # 如果最高優先級的主節點不可達,這里假設成功實現了故障轉移,繼續找次優先級的實例。
                if self.get_mysqlservice_status(conn["host"], conn["port"]):
                    current_conn = conn
                    break
                else:
                    continue
        #讀請求隨機指向不同的slave
        elif(self.operation.lower() == "read"):
            #隨機獲取除了最該優先級節點之外的節點
            conn_read_list = conn_list[1:len(conn_list)]
            random.shuffle(conn_read_list)
            for conn in conn_read_list:
                #如果不可達,繼續尋找其他除了主節點之外的節點
                if self.get_mysqlservice_status(conn["host"], conn["port"]):
                    current_conn = conn
                    break
                else:
                    continue
        try:
            #從連接池中獲取當前連接
            if (current_conn):
                pool = PooledDB(pymysql,20, host=current_conn["host"], port=current_conn["port"], user=current_conn["user"], password=current_conn["password"],db=current_conn["database"])
                conn = pool.connection()
        except:
            raise

        if not conn:
            raise("create connection error")

        return conn;


if __name__ == '__main__':

    #定義三個實例
    conn_1 = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'password': 'root',"database":"db01","priority":100}
    conn_2 = {'host': '127.0.0.1', 'port': 3307, 'user': 'root', 'password': 'root',"database":"db01","priority":200}
    conn_3 = {'host': '127.0.0.1', 'port': 3308, 'user': 'root', 'password': 'root',"database":"db01","priority":300}
    
    conn_list = []
    conn_list.append(conn_1)
    conn_list.append(conn_2)
    conn_list.append(conn_3)

    print("####execute update on master####")
    myrouter = MySQLRouter(conn_list=conn_list, operation="write")
    conn = myrouter.get_connection()
    cursor = conn.cursor()
    cursor.execute("update t01 set update_date = now() where id = 1")
    conn.commit()
    cursor.close()
    conn.close()

    print("####loop execute read on slave,query result####")
    #循環讀,判斷讀指向哪個節點。
    for loop in range(10):
        myrouter = MySQLRouter(conn_list = conn_list,operation = "read")
        conn = myrouter.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT id,cast(update_date as char), CONCAT('instance port is: ', CAST( @@PORT AS CHAR)) AS port FROM t01;")
        result = cursor.fetchone()
        print(result)
        cursor.close()
        conn.close()

這里用過服務器的一個優先級,將寫請求指向最高優先級的master服務器,讀請求隨機指向非最高優先級的slave,
對於更新請求,都在master上執行,slave復制了master的數據,每次讀到的數據都不一樣,並且每次都請求的執行,基本上都隨機地指向了兩台slave服務器
通過查詢返回一個端口號,來判斷讀請求是否平均分散到了不通的slave端。

與“MySQL Router拿來當玩具玩玩就好”相比,這里的實現一樣low,因為對數據的請求需要請求明確指定是讀還是寫。

不過,對於自動讀寫分離,無非是一個SQL語句執行的是的讀或寫判斷問題,並非難事,這個需要解析請求的SQL是讀的還是寫的問題。
某些數據庫中間件可以實現自動的讀寫分離,但是要明白,對於那些支持自動讀寫分離的中間件,往往是要受到一定的約束的,比如不能用存儲過程什么的,為什么呢?
還是上面提到的SQL解析的問題,因為一旦使用了存儲過程,無法解析出來這個SQL到底是執行的是讀還是寫,最起碼不是太直接。
對於SQL讀寫的判斷,也就是維護一個讀或者寫的枚舉正則表達式,非讀即寫,只是要格外關注這個讀寫的判斷的效率問題。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM