最近項目中接觸到ZeroMQ, 內部實現挺復雜的,沒時間深入了解,簡單記錄下使用方法吧,有時間會來填坑。 官方指導文檔http://zguide.zeromq.org/page:all
項目主要用ZeroMQ在多個ip主機上的服務間進行項目通信,直接用scoket也可以實現,但比較費時費力,ZeroMQ建立在socket的基礎上,提供了一套更加簡單強大的API,可以快速搭建起跨進程,跨ip等的通信網絡。很多文章中都提到了socket只能實現一對一的通信,ZeroMQ可以實現多對多的連接,而且有三種模式供選擇,可以根據業務需要,進行選擇和使用。
ZeroMQ的三種通信模式分別是:Request-Reply, Publisher-subscriber, Parallel Pipeline
python安裝zmq模塊:pip install pyzmq
pyzmq官方文檔:https://pyzmq.readthedocs.io/en/latest/
1. Request-Reply(應答模式)
應答模式特點:
1. 客戶端提出請求,服務端必須回答請求,每個請求只回答一次
2. 客戶端沒有收到答復前,不能再次進行請求
3. 可以有多個客戶端提出請求,服務端能保證各個客戶端只接收到自己的答復
4. 如果服務端斷掉或者客戶端斷掉會產生怎樣的影響?
如果是客戶端斷掉,對服務端沒有任何影響,如果客戶端隨后又重新啟動,那么兩方繼續一問一答,但是如果是服務端斷掉了,就可能會產生一些問題,這要看服務端是在什么情況下斷掉的,如果服務端收是在回答完問題后斷掉的,那么沒影響,重啟服務端后,雙發繼續一問一答,但如果服務端是在收到問題后斷掉了,還沒來得及回答問題,這就有問題了,那個提問的客戶端遲遲得不到答案,就會一直等待答案,因此不會再發送新的提問,服務端重啟后,客戶端遲遲不發問題,所以也就一直等待提問。
python 實現客戶端和服務端代碼如下:
zmq_server.py
import zmq context = zmq.Context() #創建上下文 socket = context.socket(zmq.REP) #創建Response服務端socket socket.bind("tcp://*:5555") #socket綁定,*表示本機ip,端口號為5555,采用tcp協議通信 while True: message = socket.recv() print(type(message)) #接收到的消息也會bytes類型(字節) print("收到消息:{}".format(message)) socket.send(b"new message") #發送消息,字節碼消息
zmq_client.py
#coding:utf-8 import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") socket.send(b"A message") response = socket.recv() print(response)
常用數據發送API如下:
#發送數據 socket.send_json(data) #data 會被json序列化后進行傳輸 (json.dumps) socket.send_string(data, encoding="utf-8") #data為unicode字符串,會進行編碼成子節再傳輸 socket.send_pyobj(obj) #obj為python對象,采用pickle進行序列化后傳輸 socket.send_multipart(msg_parts) # msg_parts, 發送多條消息組成的迭代器序列,每條消息是子節類型, # 如[b"message1", b"message2", b"message2"] #接收數據 socket.recv_json() socket.recv_string() socket.recv_pyobj() socket.recv_multipart()
2. Publisher-Subscriber (發布-訂閱模式)
publiser廣播消息到所有客戶端,客戶端根據訂閱主題過濾消息
python實現代碼如下, 其中publisher發布兩條消息,第一條消息的topic為client1, 被第一個subscriber接收到;第二條消息的topic為client2, 被第二個subscriber接收到。
注意的是subscriber在匹配時,並不是完全匹配的,消息的topic為client1開頭的字符串都會被匹配到,如果topic為"client1cient2", 也會被第一個subscriber接收到
zmq_server.py
#coding:utf-8 import zmq context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5555") topic = ["client1", "client2"] while True: for t in topic: data = "message for {}".format(t) msg = [t.encode("utf-8"), data.encode("utf-8")] #列表中的第一項作為消息的topic,sub根據topic過濾消息 print(msg) socket.send_multipart(msg)
zmq_client1.py
#coding:utf-8 import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.subscribe("client1") #訂閱主題topic為:client1 socket.connect("tcp://localhost:5555") msg = socket.recv_multipart() print(msg)
結果:
zmq_client2.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.subscribe("client2") #訂閱主題topic為:client2
socket.connect("tcp://localhost:5555")
msg = socket.recv_multipart()
print(msg)
結果:
3. Parallel Pipeline(並行管道模式)
管道模式有三部分組成,如下圖所示,最左邊的producer通過push產生任務, 中間的consumer接收任務處理后轉發,最后result collector接收所有任務的結果。 相比於publisher-subscriber,多了一個數據緩存和處理負載的部分,當連接斷開,數據不會丟失,重連后數據繼續發送到客戶端。
python實現producer, consumer, resultcollector
producer.py
import zmq context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5577") for num in range(2000): work_message = {"num": num} socket.send_json(work_message)
consumer.py
import random import zmq context = zmq.Context() consumer_id = random.randint(1, 1000) #接收工作 consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://localhost:5577") #轉發結果 consumer_sender = context.socket(zmq.PUSH) consumer_sender.bind("tcp://*:5578") while True: msg = consumer_receiver.recv_json() data = msg["num"] result = {"consumer_id":consumer_id, "num":data} consumer_sender.send_json(result)
resultcollector.py
#coding:utf-8 import zmq context = zmq.Context() result_receiver = context.socket(zmq.PULL) result_receiver.connect("tcp://localhost:5578") result = result_receiver.recv_json() collecter_data = {} for x in range(1000): if result['consumer_id'] in collecter_data: collecter_data[result['consumer_id']] = collecter_data[result['consumer_id']] + 1 else: collecter_data[result['consumer_id']] = 1 if x == 999: print(collecter_data)
執行順序:
python producer.py
python consumer.py
python resultcollector.py
參考文章:
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/pushpull.html