Pika 連接 rabbitmq 集群


原文:https://blog.csdn.net/Tech_Salon/article/details/82890431

使用 Pika 連接 rabbitmq 集群
使用 python 編程經常會用到 pika 來向 rabbitmq 發送消息,單個 rabbitmq 節點連接比較簡單,本文介紹使用 rabbitmq 集群情況下的連接方式。

vip 連接方式
在 client 與 rabbitmq server 之間通過 haproxy 等負載均衡來提供 vip,我使用的環境就是采用這種方式,但是遇到某一節點掛掉時再訪問 vip 連接 rabbitmq 集群會連接失敗,常見 log 如下:

<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>
DEBUG:pika.adapters.select_connection:Using EPollPoller
DEBUG:pika.callback:Added: {'callback': <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Added: {'callback': <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1}
DEBUG:pika.callback:Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987a0>>, 'only': None, 'one_shot': False, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db98758>>, 'only': None, 'one_shot': False, 'arguments': None}
DEBUG:pika.callback:Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987e8>>, 'only': None, 'one_shot': False, 'arguments': None}
DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f869dc0b6d0> with deadline=1538140088.706256 and callback=<bound method SelectConnection._on_connect_timer of <SelectConnection INIT socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>; now=1538140088.71; delay=0
INFO:pika.adapters.base_connection:Pika version 0.12.0 connecting to 10.10.11.1:5672
ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
INFO:pika.connection:Disconnected from RabbitMQ at 10.10.11.1:5672 (-1): EOF
ERROR:pika.connection:Incompatible Protocol Versions
ERROR:pika.connection:Connection setup failed due to The protocol returned by the server is not supported: (-1, 'EOF')
DEBUG:pika.callback:Processing 0:_on_connection_error
DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987a0>> for "0:_on_connection_error"
DEBUG:pika.callback:Processing 0:_on_connection_closed
DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987e8>> for "0:_on_connection_closed"
DEBUG:pika.callback:Incremented callback reference counter: {'callback': <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 2}
DEBUG:pika.callback:Incremented callback reference counter: {'callback': <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 2}
ERROR:pika.adapters.blocking_connection:Connection open failed - The protocol returned by the server is not supported: (-1

這個報錯通常是由於網絡問題導致,嘗試過 Telnet 連接 vip 和端口,都正常返回,目前還未找到 pika 訪問 vip 連接 rabbitmq 失敗的原因,所以采用類似於 openstack 中連接 rabbitmq 的方式,配置多主機列表,建立連接池。

配置 multiple hosts
openstack 配置transport_url 采用 rabbitmq 集群 host 列表方式,然后在 oslo.message 中建立連接池,通過 kombu 來使用 rabbitmq。參考這種方式,用 pika 實現。
pika 的官方文檔中有示例參考 blocking_consume_recover_multiple_hosts

實際實現的時候會拋異常,原因是傳遞給 pika 需要是個實例而不是列表,官網上提供的方式把 host url 參數化后直接放到列表里傳給 pika 進行連接:

node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]
connection = pika.BlockingConnection(all_endpoints)

實際執行后報錯如下:

Expected instance of Parameters, not [.........]

github 上提交的 issue: parameters error

目前不支持直接傳入多 host url 來池化 rabbitmq 集群的連接,所以要在應用程序中單獨實現。

用一個簡單 for 循環來做:

random.shuffle(all_endpoints)
for url in all_endpoints:
    try:
        logging.basicConfig(level=logging.DEBUG)
        connection = pika.BlockingConnection(url)
    except Exception as ex:
        print str(ex)
    else:
        break

用 shuffle 來改變列表中的 host 順序,可以起到負載均衡的作用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM