使用連接池的好處暫且不說,本文主要探討Redis連接池的原理方式
要使用Redis,首先應該建立一個Redis客戶端,比如:
rds = redis.Redis(host='localhost', port=6379, db=0, max_connections=100)
在建立Redis客戶端的時候,可以指定max_connections參數,這個參數指定了連接池中連接的數量
當讀寫Redis的時候,首先將傳入的參數命令做一些處理,然后進入 execute_command 函數
這個函數顯示了處理連接池的過程:
# COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response"
# pool是ConnectionPool類,用於管理連接池 pool = self.connection_pool command_name = args[0]
connection = pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection)
connection = pool.get_connection(command_name, **options)
這里,展開get_connection函數:
def get_connection(self, command_name, *keys, **options): "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) return connection
這個方法也在ConnectionPool類里,可見,如果有可用的連接(connection),就把它放在正在使用的列表里返回,否則就調用
make_connection方法
這個方法如下:
def make_connection(self): "Create a new connection" if self._created_connections >= self.max_connections: raise ConnectionError("Too many connections") self._created_connections += 1 return self.connection_class(**self.connection_kwargs)
可見,簡單判斷了一下創建的連接池是否超過了規定的數目,就調用了connection_class
connection_class也就是Connection類,也就是創建連接的地方
Connection這個類的構造函數中定義了一系列的和socket,TCP有關的參數
在make_connection方法里只是初始化了一下Connection類,還並沒有創建socket連接
所以如果第一次連接的時候,在get_connection方法里add進的Connection的實例還沒有連接過
連接是在execute_command方法里調用send_command方法的時候執行的:
def send_packed_command(self, command): "Send an already packed command to the Redis server" if not self._sock: self.connect() try: if isinstance(command, str): command = [command] for item in command: self._sock.sendall(item) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") except socket.error: e = sys.exc_info()[1] self.disconnect() if len(e.args) == 1: errno, errmsg = 'UNKNOWN', e.args[0] else: errno = e.args[0] errmsg = e.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) except: self.disconnect() raise
當調用self.connect()的時候,在這個方法里繼續調用 _connect方法,在這個方法里創建了socket
所以,在調用完connect方法之后,Connection類的_sock對象就已經初始化完畢了
以后當再使用這個Connection實例的時候,由於socket已經初始化完成,就不必再繼續創建socket了
通過socket的sendall方法,將命令發往redis服務器
在不出錯的情況下,執行release方法:
def release(self, connection): "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) self._available_connections.append(connection)
這個方法將Connection實例從_in_use_connection列表轉移到_available_connection列表
然后在parse_reponse方法里,將服務器返回的結果返回
