Python zmq的三種簡單模式


  ZMQ (以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。

是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ 的明確目標是“成為標准網絡協議棧的一部分,之后進入 Linux 內核”。

ZMQ 讓編寫高性能網絡應用程序極為簡單和有趣。

ZeroMQ並不是一個對socket的封裝,不能用它去實現已有的網絡協議。

 

它有自己的模式,不同於更底層的點對點通訊模式。

 

它有比tcp協議更高一級的協議。(當然ZeroMQ不一定基於TCP協議,它也可以用於進程間和進程內通訊)

 

zeromq 並不是類似rabbitmq消息列隊,它實際上只一個消息列隊組件,一個庫。

 

zeromq的幾種模式

Request-Reply模式(請求響應模型):

客戶端在請求后,服務端必須回響應

由客戶端發起請求,並等待服務端響應請求。從客戶端端來看,一定是一對對發收配對的;

反之,在服務端一定是收發對。服務端和客戶端都可以是1:N的模型。通常把1認為是server,N認為是Client。

 

ZMQ可以很好的支持路由功能(實現路由功能的組件叫做Device),把1:N擴展為N:M(只需要加入若干路由節點)。

從這個模型看,更底層的端點地址是對上層隱藏的。每個請求都隱含回應地址,而應用則不關心它

 

 

 

服務端:

# sever.py

import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
    try:
        print("wait for client ...")
        message = socket.recv()
        print("message from client:", message.decode('utf-8'))
        socket.send(message)
    except Exception as e:
        print('異常:',e)
        sys.exit()

 

 

客戶端:

#client.py

import zmq
import sys
context = zmq.Context()
print("Connecting to server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
while True:

    input1 = input("請輸入內容:").strip()
    if input1 == 'b':
        sys.exit()
    socket.send(input1.encode('utf-8'))

    message = socket.recv()
    print("Received reply: ", message.decode('utf-8'))

 

 

 

Publish-Subscribe模式(發布訂閱模型):

廣播所有client,沒有隊列緩存,斷開連接數據將永遠丟失。client可以進行數據過濾。

 

服務端

server.py

 

 
         
import zmq
import time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

while True:
msg = input("請輸入要發布的信息:").strip()
if msg == 'b':
sys.exit()
socket.send(msg.encode('utf-8'))
time.sleep(
1)
 
        

 

 

 

客戶端1

client1.py

 
         
import zmq


context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8')) # 接收所有消息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response)
 

 

 

 

 客戶端2

client2.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8')) # 消息過濾 只接受123開頭的信息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response)

 

 

 運行結果:

發布端發布以下信息(注意:b是關閉發布端的指令):

請輸入要發布的信息:hello python
請輸入要發布的信息:大唐不夜城
請輸入要發布的信息:123435678
請輸入要發布的信息:123我愛你
請輸入要發布的信息:廣播模式,發布端只關心發布信息,不關心訂閱端是否接收
請輸入要發布的信息:b

 

 

 客戶端1接收的信息:

response: hello python
response: 大唐不夜城
response: 123435678
response: 123我愛你
response: 廣播模式,發布端只關心發布信息,不關心訂閱端是否接收

 

 

 客戶端2接收的信息:

response: 123435678
response: 123我愛你

 

 

Parallel Pipeline模式(管道模型): 

   由三部分組成,push進行數據推送,work進行數據緩存,pull進行數據競爭獲取處理。區別於Publish-Subscribe存在一個數據緩存和處理負載。

當連接被斷開,數據不會丟失,重連后數據繼續發送到對端。

server.py

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

while True:
    msg = input("請輸入要發布的信息:").strip()
    socket.send(msg.encode('utf-8'))
    print("已發送")
    time.sleep(1)

 

 

worker.py

import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
    data = receive.recv()
    print("正在轉發...")
    sender.send(data)

 

 

client.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
    response = socket.recv().decode('utf-8')
    print("response: %s" % response)

 

結果:

server端:

請輸入要發布的信息:hello  python
已發送
請輸入要發布的信息:王者不可阻擋
已發送
請輸入要發布的信息:123abc
已發送
請輸入要發布的信息:

 

 

 work端

正在轉發...
正在轉發...
正在轉發...

 

 

 client端:(接收第二條信息后斷開,斷開后重新收到的信息)

response: 123abc

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM