ZMQ的三種消息模式


一. ZMQ是什么?

普通的socket是端對端(1:1)的關系,ZMQ是N:M的關系,socket的連接需要顯式地建立連接,銷毀連接,選擇協議(TCP/UDP)和

錯誤處理,ZQM屏蔽了這些細節,像是一個封裝了的socket庫,讓網絡編程變得更簡單。ZMQ不關用於主機與主機之間的socket通信,

還可以是線程和進程之間的通信。

ZMQ提供的套接字可以在多種協議中傳輸消息,線程間,進程間,TCP等。可以使用套接字創建多種消息模式,如‘請求-應答模式’,‘發布-訂閱模式’,‘分布式模式’等。

 

二. ZMQ特點

1. 組件來去自如,ZQM會負責自動重連,服務端和客戶端可以隨意的退出網絡。tcp的話,必須現有服務端啟動,在啟動客戶端,否則會報錯。

2. ZMQ會在必要的情況下將消息放入隊列中保存,一旦建立了連接就開始發送。

3. ZMQ有閾值機制,當隊列滿的時候,可以自動阻塞發送者,或者丟棄部分消息。

4. ZMQ可以使用不同的通信協議進行連接,TCP,進程間,線程間。

5. ZMQ提供了多種模式進行消息路由。如請求-應答模式,發布-訂閱模式等,這些模式可以用來搭建網絡拓撲結構。

6. ZMQ會在后台線程異步的處理I/O操作,他使用一種不會死鎖的數據結構來存儲消息。

 

三. ZMQ的三種消息模式

1. Reuqest-Reply(請求-應答模式)

(1). 使用Request-Reply模式,需要遵循一定的規律。

(2).客戶端必要先發送消息,在接收消息;服務端必須先進行接收客戶端發送過來的消息,在發送應答給客戶端,如此循環

(3). 服務端和客戶端誰先啟動,效果都是一樣的。

(4). 服務端在收到消息之前,會一直阻塞,等待客戶端連上來。

 

 

 創建一個客戶端和服務端,客戶端發送消息給服務端,服務端返回消息給客戶端,客戶端和服務器誰先啟動都可以

client.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

#客戶端必須要先發送消息,然后在接收消息
if __name__ == '__main__':
    print('zmq client start....')
    for i in range(1, 10):
        socket.send_string("hello")
        message = socket.recv()
        print('received reply message:{}'.format(message))

server.py

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
count = 0

#必須要先接收消息,然后在應答
if __name__ == '__main__':
    print('zmq server start....')
    while True:
        message = socket.recv()
        count += 1
        print('received request. message:{} count:{}'.format(message, count))
        time.sleep(1)
        socket.send_string("World!")

 

常見數據請求發送API:

#發送數據
socket.send_json(data)   #data 會被json序列化后進行傳輸 (json.dumps)
socket.send_string(data, encoding="utf-8")   #data為unicode字符串,會進行編碼成子節再傳輸
socket.send_pyobj(obj)    #obj為python對象,采用pickle進行序列化后傳輸
socket.send_multipart(msg_parts)   # msg_parts, 發送多條消息組成的迭代器序列,每條消息是子節類型,
                                    # 如[b"message1", b"message2", b"message2"]

#接收數據
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

 

2. Publisher-Subscriber(發布-訂閱模式)

Publisher-Subscriber模式,消息是單向流動的,發布者只能發布消息,不能接受消息;訂閱者只能接受消息,不能發送消息。

服務端發布消息的過程中,如果有訂閱者退出,不影響發布者繼續發布消息,當訂閱者再次連接上來,收到的消息是后來發布的消息

比較晚加入的訂閱者,或者中途離開的訂閱者,必然會丟掉一部分信息

如果發布者停止,所有的訂閱者會阻塞,等發布者再次上線的時候回繼續接受消息。

"慢連接": 我們不知道訂閱者是何時開始接受消息的,就算啟動"訂閱者",在啟動"發布者", "訂閱者"還是會缺失一部分的消息,因為建立連接是需要時間的,雖然時間很短,但不是零。ZMQ在后台是進行異步的IO傳輸,在建立TCP連接的短短的時間段內,ZMQ就可以發送很多消息了。

有種簡單的方法來同步"發布者" 和"訂閱者", 通過sleep讓發布者延遲發布消息,等連接建立完成后再進行發送.

 

 

 Publisher.py

import zmq
import time
import random

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

if __name__ == '__main__':
    print("發布者啟動.....")
    time.sleep(2)
    for i in range(1000):
        tempterature = random.randint(-10, 40)
        message = "我是publisher, 這是我發布給你們的第{}個消息!今日溫度{}".format(i+1, tempterature)
        socket.send_string(message)

Subscriber.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")

# 客戶端需要設定一個過濾,否則收不到任何信息
socket.setsockopt_string(zmq.SUBSCRIBE, '')

if __name__ == '__main__':
    print('訂閱者一號啟動....')
    while True:
        message = socket.recv_string()
        print("(訂閱者一號)接收到'發布者'發送的消息:{}".format(message))

 

3. Push-Pull(平行管道模式/分布式處理)

Ventilator:任務發布器會生成大量可以並行運算的任務。

Worker:有一組worker會處理這些任務

Sink:結果接收器會在末端接收所有的Worker的處理結果,進行匯總

Worker上游和"任務發布器"相連,下游和"結果接收器"相連

"任務發布器" 和 "結果接收器"是這個網路結構中比較穩定的部分,由他們綁定至端點

Worker只是連接兩個端點

需要等Worker全部啟動后,在進行任務分發。Socket的連接會消耗一定時間(慢連接), 如果不盡興同步的話,第一個Worker啟動

會一下子接收很多任務。

"任務分發器" 會向Worker均勻的分發任務(負載均衡機制)

"結果接收器" 會均勻地從Worker處收集消息(公平隊列機制)

 

 

 

Ventilator.py 任務分發

import zmq
import random

raw_input = input
context = zmq.Context()

sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

if __name__ == '__main__':

    # 同步操作
    print("Press Enter when the workers are ready: ")
    _ = raw_input()
    print("Sending tasks to workers…")

    sink.send_string('0')

    # 發送十個任務
    total_msec = 0
    for task_nbr in range(10):

        # 每個任務耗時為N
        workload = random.randint(1, 5)
        total_msec += workload

        sender.send_string(u'%i' % workload)

    print("10個任務的總工作量: %s 秒" % total_msec)

 

Worker.py (中間處理)

import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

if __name__ == '__main__':

    while True:
        s = receiver.recv()
        print('work1 接收到一個任務... 需要{}秒'.format(s))

        # Do the work
        time.sleep(int(s))

        # Send results to sink
        sender.send_string('work1 完成一個任務,耗時{}秒'.format(s))

 

Sink.py 結果接收器

import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

if __name__ == '__main__':

    s = receiver.recv()
    print('開始接收處理結果.....')

    # 計時,所有任務處理完一共需要多久
    tstart = time.time()

    # 接受十個任務的處理結果
    for task_nbr in range(10):
        s = receiver.recv_string()
        print(s)

    tend = time.time()
    print("三個worker同時工作,耗時: %d 秒" % (tend-tstart))

 

zmq.Poller(ZMQ超時判斷)

socket_req_url = config.zmq_rep_host.format(config.zmq_rep_port)
socket_req = zmq.Context().socket(zmq.REQ)
socket_req.connect(socket_req_url)
poller = zmq.Poller()
poller.register(socket_req, zmq.POLLIN)    

 

超時重連實例:

Server服務端:

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
count = 0

#必須要先接收消息,然后在應答
if __name__ == '__main__':
    print('zmq server start....')
    while True:
        message = socket.recv()
        count += 1
        print('received request. message:{} count:{}'.format(message, count))
        time.sleep(1)
        socket.send_string("ping test suc")

Client客戶端:

import zmq

#超時重連
class PingPort():
    def __init__(self):
        self.port = '5555'
        self.socket_req_url = 'tcp://localhost:{}'.format(self.port)
        self.socket_req = zmq.Context().socket(zmq.REQ)
        self.socket_req.connect(self.socket_req_url)
        self.poller = zmq.Poller()
        self.poller.register(self.socket_req, zmq.POLLIN)

    def ping(self):
        self.socket_req.send_string('ping test')
        if self.poller.poll(3000):
            resp = self.socket_req.recv()
            print(resp)
       return True
else: print('ping {} port fail, no response.'.format(self.port)) self.socket_req.setsockopt(zmq.LINGER, 0) self.socket_req.close() self.poller.unregister(self.socket_req) print('-------------開始重連--------------------') self.socket_req = zmq.Context().socket(zmq.REQ) self.socket_req.connect(self.socket_req_url) self.poller = zmq.Poller() self.poller.register(self.socket_req, zmq.POLLIN) self.ping() if __name__ == '__main__': obj = PingPort() print(obj.ping)

如果服務端未開啟,則顯示效果如下所示:

 服務端開啟則返回True


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM