Rabbitmq Connect與Channel
RabbitMQ官方提供了Connection對象,本質就是一個TCP連接對象。
Channels對象,虛擬連接。虛擬連接建立在上面Connection對象的TCP連接中。數據流動都是在Channel中進行的。每個Connection對象的虛擬連接也是有限的,如果單個Connnection的Channel對象超出指定范圍了,也會有性能問題,另外一個TCP連接上的多個虛擬連接,實際在傳輸數據時,傳輸數據的虛擬連接還是獨占了TCP連接,其它虛擬連接在排隊等待。
Connection
對象創建多個
Channel
來實現數據傳輸,在
channel
信息比較大的情況下,
Connection
帶寬會限制消息的傳輸。那么需要設計
Connection
池,將流量分攤到不同的
connection
上。
官網對於Connection的解讀:
AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.
大概意思就是:AMQP 0-9-1
一般是一個TCP
的長鏈接,當應用程序不再需要連接到服務器時,應該正常關閉AMQP 0-9-1
連接而不是關閉TCP
連接。
官網對於Channel的解讀:
Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed withchannels that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
大概的意思就是:一些應用需要同時創建多個連接到broker
也就是RabbitMQ
服務器上。然而因為防火牆的存在,很難同時創建多個連接。 AMQP 0-9-1
連接使用多個channel
連接實現對單一Connection
的復用。
客戶端的每一個協議操作都發送在channel
上。每個協議方法攜帶者channel ID
。broker
和client
使用channel ID
來確定方法對應的channel
。因此實現channel
之間的數據隔離。
channel
不能單獨存在,僅存在connection
上下文中。當connection
關閉時,channel
也會關閉。
多線程/進程之間打開一個channel
但不共享channels
是很普遍的。
通道和並發注意事項(線程安全)
As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
線程之間共享channel
是無法避免的,應用程序跟喜歡每個線程使用一個channel
而不是跨線程共享相同的channel
。
A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.
要避免一個反例,為每一個發布的消息新建一個channel
,開辟一個新的channel
需要一個網絡的往返,這種模式是很低效的。channel
保持合理的存活時間。
It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.
可以使用channel pool
來避免共享channel
上並發發布:一旦一個線程使用完了channel
,那么它將返回到pool
中。其他線程便可使用這個Channel
。線程池是一個解決方案,可以使用 Spring AMQP線程池而不是自己開發。
總結:頻繁建立TCP
連接和channel
連接是消耗性能的,於是我們希望可以共享connection
或者channel
。達到連接的復用
Python實現rabbitmq connection連接池
import pika import threading import random import uuid """ Class: Parameters: Connectionsize:int類型,Rabbitmqpool池連接的最大數 Channelsize:int類型,Rabbitmqpool池Channel的最大數 return:None """ # 單例保證唯一 class Rabbitmqpool: # 定義類屬性 __instance = None __lock = threading.Lock() def __init__(self, Connectionsize, Channelsize): self.maxConnectionsize = Connectionsize self.maxChannelsize = Channelsize self.nowConnectionsize = 0 self.nowChannelsize = 0 self.connectpool = {} self.channelpool = {} self.certdic = {} def __new__(cls, Connectionsize, Channelsize): if not cls.__instance: cls.__instance = object.__new__(cls) return cls.__instance """ function: 獲取一個空閑Channel或者新建一個Channel Parameters: return: channel:channel cname:連接名 """ def get_channel(self): try: self.__lock.acquire() cname = "" channel = None # 在已存在鍵中查找空閑Channel for connectionname in self.connectpool: if len(self.channelpool[connectionname]) != 0: channel = self.channelpool[connectionname][-1] cname = connectionname self.channelpool[connectionname] = self.channelpool[connectionname][0:-1] print("取出一個Channel") break # 如果沒有找到空閑Channel,canme為"",則新建一個Channel if cname == "": if self.nowChannelsize < self.maxChannelsize: # 從連接池返回一個連接的名字 if len(self.connectpool) != 0: cname = random.choice(list(self.connectpool)) # 根據名字拿到此連接,傳入連接和Pool池創建Channel CreateChannel(self.connectpool[cname], self) # 得到一個新Channel channel = self.channelpool[cname][-1] self.channelpool[cname] = self.channelpool[cname][0:-1] print("創建一個Channel") # 如果沒有連接,則新建連接與channel else: if len(self.certdic) != 0: cert = random.choice(list(self.certdic)) cname = str(uuid.uuid4().int) print("創建一個連接") CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]), str(self.certdic[cert]["rabbitmq_virtual_host"]), str(self.certdic[cert]["rabbitmq_user"]), str(self.certdic[cert]["rabbitmq_password"]), self, cname) CreateChannel(self.connectpool[cname], self) # 得到一個新Channel channel = self.channelpool[cname][-1] self.channelpool[cname] = self.channelpool[cname][0:-1] print("創建一個Channel") else: print("無法創建Channel,無連接憑證,不能創建連接!") else: print("無法創建Channel,超過限制") finally: self.__lock.release() return channel, cname def create_channel(self): try: self.__lock.acquire() if len(self.certdic) != 0: cert = random.choice(list(self.certdic)) cname = str(uuid.uuid4().int) print("創建一個連接") CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]), str(self.certdic[cert]["rabbitmq_virtual_host"]), str(self.certdic[cert]["rabbitmq_user"]), str(self.certdic[cert]["rabbitmq_password"]), self, cname) CreateChannel(self.connectpool[cname], self) # 得到一個新Channel channel = self.channelpool[cname][-1] self.channelpool[cname] = self.channelpool[cname][0:-1] print("創建一個Channel") return channel, cname else: print("無法創建Channel,無連接憑證,不能創建連接!") return None,"" finally: self.__lock.release() def return_channel(self, channel, connectionname): try: self.__lock.acquire() self.channelpool[connectionname].append(channel) finally: self.__lock.release() def closepool(self): pass def delconnection(self, connectionname): try: self.__lock.acquire() if connectionname in self.connectpool: del self.connectpool[connectionname] self.nowConnectionsize = self.nowConnectionsize -1 self.nowChannelsize = self.nowChannelsize - len(self.channelpool[connectionname]) del self.channelpool[connectionname] finally: self.__lock.release() def get_certtemplate(self): return {"rabbitmq_host": "", "rabbitmq_port": 5672, "rabbitmq_virtual_host": "", "rabbitmq_user": "", "rabbitmq_password": ""} def addcert(self,cert): self.certdic[cert["rabbitmq_host"]] = cert # 連接可以自己創建 class CreateConnection: def __init__(self, rabbitmq_host, rabbitmq_port, rabbitmq_virtual_host, rabbitmq_user, rabbitmq_password, Rabbitmqpool, Connectionname = str(uuid.uuid4().int), heartbeat=60): if Rabbitmqpool.nowConnectionsize < Rabbitmqpool.maxConnectionsize: if Connectionname not in Rabbitmqpool.connectpool: self.rabbitmq_user = str(rabbitmq_user) self.rabbitmq_password = str(rabbitmq_password) self.rabbitmq_host = rabbitmq_host self.rabbitmq_port = rabbitmq_port self.rabbitmq_virtual_host = rabbitmq_virtual_host self.connectionname = Connectionname print(self.rabbitmq_user,self.rabbitmq_password,self.rabbitmq_host,self.rabbitmq_port,self.rabbitmq_virtual_host,self.connectionname) credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) try: self.connection = pika.BlockingConnection( pika.ConnectionParameters( host=rabbitmq_host, port=rabbitmq_port, virtual_host=rabbitmq_virtual_host, heartbeat=heartbeat, credentials=credentials)) Rabbitmqpool.connectpool[Connectionname] = self Rabbitmqpool.nowConnectionsize += 1 if self.connectionname not in Rabbitmqpool.channelpool: Rabbitmqpool.channelpool[self.connectionname] = [] print("創建連接:", Connectionname) except Exception as e: print("創建連接失敗:", e) else: print("創建連接失敗,此連接名已存在:", Connectionname) else: print("創建連接失敗,連接池已滿,無法創建連接池") def get_connection(self): return self.connection class CreateChannel: def __init__(self, Connection, Rabbitmqpool): Rabbitmqpool.channelpool[Connection.connectionname].append(Connection.get_connection().channel()) Rabbitmqpool.nowChannelsize += 1
我這里並沒有增加過期時間:
初始化
rabbitmq_host = "" rabbitmq_port = "" rabbitmq_user = "" rabbitmq_password = "" rabbitmq_virtual_host = "" credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) Pool = Rabbitmqpool(3, 20) cert = Pool.get_certtemplate() cert['rabbitmq_host'] = rabbitmq_host cert['rabbitmq_virtual_host'] = rabbitmq_virtual_host cert['rabbitmq_user'] = rabbitmq_user cert['rabbitmq_password'] = rabbitmq_password cert['rabbitmq_port'] = rabbitmq_port Pool.addcert(cert)
發送消息代碼
try: c, cname = Pool.get_channel() c.basic_publish(exchange='', routing_key='隊列名', body=str(data), ) Pool.return_channel(c, cname) except Exception as e: print("發送錯誤:",e) #鏈接過期 Pool.delconnection(cname) #channel過期時,刪除此鏈接和此鏈接下的所有channel c, cname = Pool.create_channel() #創建一個新的鏈接和channel c.basic_publish(exchange='', routing_key='隊列名', body=str(senddata), ) Pool.return_channel(c, cname)
refer: