Rabbitmq的connection連接池(Python版)


 

 

Rabbitmq Connect與Channel

RabbitMQ官方提供了Connection對象,本質就是一個TCP連接對象。
Channels對象,虛擬連接。虛擬連接建立在上面Connection對象的TCP連接中。數據流動都是在Channel中進行的。每個Connection對象的虛擬連接也是有限的,如果單個Connnection的Channel對象超出指定范圍了,也會有性能問題,另外一個TCP連接上的多個虛擬連接,實際在傳輸數據時,傳輸數據的虛擬連接還是獨占了TCP連接,其它虛擬連接在排隊等待。

在單個的 Connection對象創建多個 Channel來實現數據傳輸,在 channel信息比較大的情況下, Connection帶寬會限制消息的傳輸。那么需要設計 Connection池,將流量分攤到不同的 connection上。

官網對於Connection的解讀:

AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.

大概意思就是:AMQP 0-9-1一般是一個TCP的長鏈接,當應用程序不再需要連接到服務器時,應該正常關閉AMQP 0-9-1連接而不是關閉TCP連接。

官網對於Channel的解讀:

Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed withchannels that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

大概的意思就是:一些應用需要同時創建多個連接到broker也就是RabbitMQ服務器上。然而因為防火牆的存在,很難同時創建多個連接。 AMQP 0-9-1連接使用多個channel連接實現對單一Connection的復用。
客戶端的每一個協議操作都發送在channel上。每個協議方法攜帶者channel IDbrokerclient使用channel ID來確定方法對應的channel。因此實現channel之間的數據隔離。
channel不能單獨存在,僅存在connection上下文中。當connection關閉時,channel也會關閉。
多線程/進程之間打開一個channel但不共享channels是很普遍的。

 

通道和並發注意事項(線程安全)

As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

線程之間共享channel是無法避免的,應用程序跟喜歡每個線程使用一個channel而不是跨線程共享相同的channel

A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.

要避免一個反例,為每一個發布的消息新建一個channel,開辟一個新的channel需要一個網絡的往返,這種模式是很低效的。channel保持合理的存活時間。

It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.

可以使用channel pool來避免共享channel上並發發布:一旦一個線程使用完了channel,那么它將返回到pool中。其他線程便可使用這個Channel。線程池是一個解決方案,可以使用 Spring AMQP線程池而不是自己開發。

總結:頻繁建立TCP連接和channel連接是消耗性能的,於是我們希望可以共享connection或者channel。達到連接的復用

 

Python實現rabbitmq connection連接池

import pika
import threading
import random
import uuid



"""
Class:  
Parameters:
    Connectionsize:int類型,Rabbitmqpool池連接的最大數
    Channelsize:int類型,Rabbitmqpool池Channel的最大數
return:None
"""
# 單例保證唯一
class Rabbitmqpool:
    # 定義類屬性
    __instance = None
    __lock = threading.Lock()

    def __init__(self, Connectionsize, Channelsize):
        self.maxConnectionsize = Connectionsize
        self.maxChannelsize = Channelsize
        self.nowConnectionsize = 0
        self.nowChannelsize = 0
        self.connectpool = {}
        self.channelpool = {}
        self.certdic = {}
    def __new__(cls, Connectionsize, Channelsize):
        if not cls.__instance:
            cls.__instance = object.__new__(cls)
        return cls.__instance
    """
    function:  獲取一個空閑Channel或者新建一個Channel
    Parameters:

    return:
        channel:channel
        cname:連接名
    """

    def get_channel(self):
        try:
            self.__lock.acquire()
            cname = ""
            channel = None
            # 在已存在鍵中查找空閑Channel
            for connectionname in self.connectpool:
                if len(self.channelpool[connectionname]) != 0:
                    channel = self.channelpool[connectionname][-1]
                    cname = connectionname
                    self.channelpool[connectionname] = self.channelpool[connectionname][0:-1]
                    print("取出一個Channel")
                    break
            # 如果沒有找到空閑Channel,canme為"",則新建一個Channel
            if cname == "":
                if self.nowChannelsize < self.maxChannelsize:
                    # 從連接池返回一個連接的名字
                    if len(self.connectpool) != 0:
                        cname = random.choice(list(self.connectpool))
                        # 根據名字拿到此連接,傳入連接和Pool池創建Channel
                        CreateChannel(self.connectpool[cname], self)
                        # 得到一個新Channel
                        channel = self.channelpool[cname][-1]
                        self.channelpool[cname] = self.channelpool[cname][0:-1]
                        print("創建一個Channel")
                    # 如果沒有連接,則新建連接與channel
                    else:
                        if len(self.certdic) != 0:
                            cert = random.choice(list(self.certdic))
                            cname = str(uuid.uuid4().int)
                            print("創建一個連接")
                            CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]),
                                             str(self.certdic[cert]["rabbitmq_virtual_host"]),
                                             str(self.certdic[cert]["rabbitmq_user"]),
                                             str(self.certdic[cert]["rabbitmq_password"]), self, cname)
                            CreateChannel(self.connectpool[cname], self)
                            # 得到一個新Channel
                            channel = self.channelpool[cname][-1]
                            self.channelpool[cname] = self.channelpool[cname][0:-1]
                            print("創建一個Channel")
                        else:
                            print("無法創建Channel,無連接憑證,不能創建連接!")
                else:
                    print("無法創建Channel,超過限制")

        finally:
            self.__lock.release()
        return channel, cname

    def create_channel(self):
        try:
            self.__lock.acquire()
            if len(self.certdic) != 0:
                cert = random.choice(list(self.certdic))
                cname = str(uuid.uuid4().int)
                print("創建一個連接")
                CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]),
                                 str(self.certdic[cert]["rabbitmq_virtual_host"]),
                                 str(self.certdic[cert]["rabbitmq_user"]),
                                 str(self.certdic[cert]["rabbitmq_password"]), self, cname)
                CreateChannel(self.connectpool[cname], self)
                # 得到一個新Channel
                channel = self.channelpool[cname][-1]
                self.channelpool[cname] = self.channelpool[cname][0:-1]
                print("創建一個Channel")
                return channel, cname
            else:
                print("無法創建Channel,無連接憑證,不能創建連接!")
            return None,""
        finally:
            self.__lock.release()

    def return_channel(self, channel, connectionname):
        try:
            self.__lock.acquire()
            self.channelpool[connectionname].append(channel)
        finally:
            self.__lock.release()

    def closepool(self):
        pass

    def delconnection(self, connectionname):
        try:
            self.__lock.acquire()
            if connectionname in self.connectpool:
                del self.connectpool[connectionname]
                self.nowConnectionsize = self.nowConnectionsize -1
                self.nowChannelsize  = self.nowChannelsize - len(self.channelpool[connectionname])
                del self.channelpool[connectionname]

        finally:
            self.__lock.release()

    def get_certtemplate(self):
        return {"rabbitmq_host": "", "rabbitmq_port": 5672, "rabbitmq_virtual_host": "", "rabbitmq_user": "",
                "rabbitmq_password": ""}

    def addcert(self,cert):
        self.certdic[cert["rabbitmq_host"]] = cert


# 連接可以自己創建
class CreateConnection:
    def __init__(self, rabbitmq_host, rabbitmq_port, rabbitmq_virtual_host, rabbitmq_user, rabbitmq_password,
                 Rabbitmqpool, Connectionname = str(uuid.uuid4().int), heartbeat=60):
        if Rabbitmqpool.nowConnectionsize < Rabbitmqpool.maxConnectionsize:
            if Connectionname not in Rabbitmqpool.connectpool:
                self.rabbitmq_user = str(rabbitmq_user)
                self.rabbitmq_password = str(rabbitmq_password)
                self.rabbitmq_host = rabbitmq_host
                self.rabbitmq_port = rabbitmq_port
                self.rabbitmq_virtual_host = rabbitmq_virtual_host
                self.connectionname = Connectionname
                print(self.rabbitmq_user,self.rabbitmq_password,self.rabbitmq_host,self.rabbitmq_port,self.rabbitmq_virtual_host,self.connectionname)
                credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
                try:
                    self.connection = pika.BlockingConnection(
                        pika.ConnectionParameters(
                            host=rabbitmq_host,
                            port=rabbitmq_port,
                            virtual_host=rabbitmq_virtual_host,
                            heartbeat=heartbeat,
                            credentials=credentials))
                    Rabbitmqpool.connectpool[Connectionname] = self
                    Rabbitmqpool.nowConnectionsize += 1
                    if self.connectionname not in Rabbitmqpool.channelpool:
                        Rabbitmqpool.channelpool[self.connectionname] = []
                    print("創建連接:", Connectionname)
                except Exception as e:
                    print("創建連接失敗:", e)
            else:
                print("創建連接失敗,此連接名已存在:", Connectionname)
        else:
            print("創建連接失敗,連接池已滿,無法創建連接池")

    def get_connection(self):
        return self.connection


class CreateChannel:
    def __init__(self, Connection, Rabbitmqpool):
        Rabbitmqpool.channelpool[Connection.connectionname].append(Connection.get_connection().channel())
        Rabbitmqpool.nowChannelsize += 1

我這里並沒有增加過期時間:

 

 

初始化

rabbitmq_host = ""
rabbitmq_port = ""
rabbitmq_user = ""
rabbitmq_password = ""
rabbitmq_virtual_host = ""
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
Pool = Rabbitmqpool(3, 20)
cert = Pool.get_certtemplate()
cert['rabbitmq_host'] = rabbitmq_host
cert['rabbitmq_virtual_host'] = rabbitmq_virtual_host
cert['rabbitmq_user'] = rabbitmq_user
cert['rabbitmq_password'] = rabbitmq_password
cert['rabbitmq_port'] = rabbitmq_port
Pool.addcert(cert)

發送消息代碼

 try:
        c, cname = Pool.get_channel()
        c.basic_publish(exchange='',
                        routing_key='隊列名',
                        body=str(data),
                        )
        Pool.return_channel(c, cname)
    except Exception as e:
        print("發送錯誤:",e) #鏈接過期
        Pool.delconnection(cname) #channel過期時,刪除此鏈接和此鏈接下的所有channel
        c, cname = Pool.create_channel() #創建一個新的鏈接和channel
        c.basic_publish(exchange='',
                        routing_key='隊列名',
                        body=str(senddata),
                        )
        Pool.return_channel(c, cname)

refer:

https://www.jianshu.com/p/24e541170ace


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM