ZMQ簡單使用


ZMQ簡單使用

一 什么是ZMQ

引用官方的說法: “ZMQ (以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ 的明確目標是“成為標准網絡協議棧的一部分,之后進入 Linux 內核”。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的“傳統”BSD 套接字之上的一層封裝。ZMQ 讓編寫高性能網絡應用程序極為簡單和有趣。”

近幾年有關”Message Queue”的項目層出不窮,知名的就有十幾種,這主要是因為后摩爾定律時代,分布式處理逐漸成為主流,業界需要一套標准來解決分布式計算環境中節點之間的消息通信。幾年的競爭下來,Apache 基金會旗下的符合 AMQP/1.0標准的 RabbitMQ 已經得到了廣泛的認可,成為領先的 MQ 項目。

與 RabbitMQ 相比,ZMQ 並不像是一個傳統意義上的消息隊列服務器,事實上,它也根本不是一個服務器,它更像是一個底層的網絡通訊庫,在 Socket API 之上做了一層封裝,將網絡通訊、進程通訊和線程通訊抽象為統一的 API 接口。

這是個類似於 Socket 的一系列接口,他跟 Socket 的區別是:普通的 socket 是端到端的(1:1的關系),而 ZMQ 卻是可以N:M 的關系,人們對 BSD 套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而 ZMQ 屏蔽了這些細節,讓你的網絡編程更為簡單。ZMQ 用於 node 與 node 間的通信,node 可以是主機或者是進程。

二 C/S模式

多個客戶端,一個服務端,多個客戶端可以連接一個服務端發送消息

# -*- coding: utf-8 -*-
# server端
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:8888')

while True:
    message = socket.recv()
    print(message)
    response = 'server response!'
    socket.send(response.encode())
# -*- coding: utf-8 -*-
# client端
import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:8888')

while True:
    data = input('input your data:')
    socket.send(data.encode())
    response = socket.recv()
    print(response)

三 發布訂閱模式

一個發布者,多個訂閱者,廣播所有client,沒有隊列緩存,斷開連接數據將永遠丟失。client可以進行數據過濾。

# -*- coding: utf-8 -*-
import zmq

contest = zmq.Context()
socket = contest.socket(zmq.PUB)
socket.bind('tcp://*:8888')

while True:
    data = input('input your data:')
    socket.send(data.encode())
# -*- coding: utf-8 -*-
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:8888')
socket.setsockopt(zmq.SUBSCRIBE, ''.encode())

while True:
    data = socket.recv()
    print(data)

四 平行管道模式

消息單向的,也是有去無回的。push的任何一個消息,始終只會有一個pull端收到消息。由三部分組成,push進行數據推送,work進行數據緩存,pull進行數據競爭獲取處理。區別於Publish-Subscribe存在一個數據緩存和處理負載。當連接被斷開,數據不會丟失,重連后數據繼續發送到對端。

# -*- coding: utf-8 -*-
import zmq

contest = zmq.Context()
socket = contest.socket(zmq.PULL)
socket.bind('tcp://*:8888')

while True:
    print(socket.recv())
# -*- coding: utf-8 -*-
import zmq

context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:8887')

sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:8888')

while True:
    data = recive.recv()
    sender.send(data)
# -*- coding: utf-8 -*-
import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind('tcp://*:8887')

while True:
    data = input('input your data:')
    socket.send(data.encode())


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM