zmq模塊的理解和使用


  最近項目中接觸到ZeroMQ, 內部實現挺復雜的,沒時間深入了解,簡單記錄下使用方法吧,有時間會來填坑。 官方指導文檔http://zguide.zeromq.org/page:all

  項目主要用ZeroMQ在多個ip主機上的服務間進行項目通信,直接用scoket也可以實現,但比較費時費力,ZeroMQ建立在socket的基礎上,提供了一套更加簡單強大的API,可以快速搭建起跨進程,跨ip等的通信網絡。很多文章中都提到了socket只能實現一對一的通信,ZeroMQ可以實現多對多的連接,而且有三種模式供選擇,可以根據業務需要,進行選擇和使用。

  ZeroMQ的三種通信模式分別是:Request-Reply,  Publisher-subscriber,  Parallel Pipeline

  python安裝zmq模塊:pip install pyzmq

  pyzmq官方文檔:https://pyzmq.readthedocs.io/en/latest/

1. Request-Reply(應答模式)

  應答模式特點:

    1. 客戶端提出請求,服務端必須回答請求,每個請求只回答一次

    2.  客戶端沒有收到答復前,不能再次進行請求

    3. 可以有多個客戶端提出請求,服務端能保證各個客戶端只接收到自己的答復

       4. 如果服務端斷掉或者客戶端斷掉會產生怎樣的影響?

      如果是客戶端斷掉,對服務端沒有任何影響,如果客戶端隨后又重新啟動,那么兩方繼續一問一答,但是如果是服務端斷掉了,就可能會產生一些問題,這要看服務端是在什么情況下斷掉的,如果服務端收是在回答完問題后斷掉的,那么沒影響,重啟服務端后,雙發繼續一問一答,但如果服務端是在收到問題后斷掉了,還沒來得及回答問題,這就有問題了,那個提問的客戶端遲遲得不到答案,就會一直等待答案,因此不會再發送新的提問,服務端重啟后,客戶端遲遲不發問題,所以也就一直等待提問。

 

 

 python 實現客戶端和服務端代碼如下:

zmq_server.py

import zmq


context = zmq.Context()            #創建上下文
socket = context.socket(zmq.REP)   #創建Response服務端socket
socket.bind("tcp://*:5555")        #socket綁定,*表示本機ip,端口號為5555,采用tcp協議通信

while True:
    message = socket.recv()
    print(type(message))          #接收到的消息也會bytes類型(字節)
    print("收到消息:{}".format(message))
    socket.send(b"new message")   #發送消息,字節碼消息

zmq_client.py

#coding:utf-8

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

socket.send(b"A message")
response = socket.recv()
print(response)

常用數據發送API如下:

#發送數據
socket.send_json(data)   #data 會被json序列化后進行傳輸 (json.dumps)
socket.send_string(data, encoding="utf-8")   #data為unicode字符串,會進行編碼成子節再傳輸
socket.send_pyobj(obj)    #obj為python對象,采用pickle進行序列化后傳輸
socket.send_multipart(msg_parts)   # msg_parts, 發送多條消息組成的迭代器序列,每條消息是子節類型,
                                    # 如[b"message1", b"message2", b"message2"]

#接收數據
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

 

2. Publisher-Subscriber (發布-訂閱模式)

  publiser廣播消息到所有客戶端,客戶端根據訂閱主題過濾消息

 

 

 python實現代碼如下, 其中publisher發布兩條消息,第一條消息的topic為client1, 被第一個subscriber接收到;第二條消息的topic為client2, 被第二個subscriber接收到。

注意的是subscriber在匹配時,並不是完全匹配的,消息的topic為client1開頭的字符串都會被匹配到,如果topic為"client1cient2", 也會被第一個subscriber接收到

zmq_server.py

#coding:utf-8
import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
topic = ["client1", "client2"]
while True:
    for t in topic:
        data = "message for {}".format(t)
        msg = [t.encode("utf-8"), data.encode("utf-8")]     #列表中的第一項作為消息的topic,sub根據topic過濾消息
        print(msg)
        socket.send_multipart(msg)

zmq_client1.py

#coding:utf-8

import zmq


context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("client1")          #訂閱主題topic為:client1
socket.connect("tcp://localhost:5555")
msg = socket.recv_multipart()
print(msg)

結果:

 

 

zmq_client2.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("client2") #訂閱主題topic為:client2
socket.connect("tcp://localhost:5555")
msg = socket.recv_multipart()
print(msg) 

結果:

3. Parallel Pipeline(並行管道模式)

    管道模式有三部分組成,如下圖所示,最左邊的producer通過push產生任務, 中間的consumer接收任務處理后轉發,最后result collector接收所有任務的結果。 相比於publisher-subscriber,多了一個數據緩存和處理負載的部分,當連接斷開,數據不會丟失,重連后數據繼續發送到客戶端。

 

 

 python實現producer, consumer, resultcollector

producer.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5577")
for num in range(2000):
    work_message = {"num": num}
    socket.send_json(work_message)

consumer.py

import random
import zmq
context = zmq.Context()
consumer_id = random.randint(1, 1000)
#接收工作
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5577")
#轉發結果
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5578")
while True:
    msg = consumer_receiver.recv_json()
    data = msg["num"]
    result = {"consumer_id":consumer_id, "num":data}
    consumer_sender.send_json(result)

resultcollector.py

#coding:utf-8

import zmq

context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5578")
result = result_receiver.recv_json()
collecter_data = {}
for x in range(1000):
    if result['consumer_id'] in collecter_data:
        collecter_data[result['consumer_id']] = collecter_data[result['consumer_id']] + 1
    else:
        collecter_data[result['consumer_id']] = 1
    if x == 999:
        print(collecter_data)

執行順序:

python producer.py
python consumer.py
python resultcollector.py

 

 參考文章:

https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/pushpull.html 

https://segmentfault.com/a/1190000012010573

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM