一. ZMQ是什么?
普通的socket是端對端(1:1)的關系,ZMQ是N:M的關系,socket的連接需要顯式地建立連接,銷毀連接,選擇協議(TCP/UDP)和
錯誤處理,ZQM屏蔽了這些細節,像是一個封裝了的socket庫,讓網絡編程變得更簡單。ZMQ不關用於主機與主機之間的socket通信,
還可以是線程和進程之間的通信。
ZMQ提供的套接字可以在多種協議中傳輸消息,線程間,進程間,TCP等。可以使用套接字創建多種消息模式,如‘請求-應答模式’,‘發布-訂閱模式’,‘分布式模式’等。
二. ZMQ特點
1. 組件來去自如,ZQM會負責自動重連,服務端和客戶端可以隨意的退出網絡。tcp的話,必須現有服務端啟動,在啟動客戶端,否則會報錯。
2. ZMQ會在必要的情況下將消息放入隊列中保存,一旦建立了連接就開始發送。
3. ZMQ有閾值機制,當隊列滿的時候,可以自動阻塞發送者,或者丟棄部分消息。
4. ZMQ可以使用不同的通信協議進行連接,TCP,進程間,線程間。
5. ZMQ提供了多種模式進行消息路由。如請求-應答模式,發布-訂閱模式等,這些模式可以用來搭建網絡拓撲結構。
6. ZMQ會在后台線程異步的處理I/O操作,他使用一種不會死鎖的數據結構來存儲消息。
三. ZMQ的三種消息模式
1. Reuqest-Reply(請求-應答模式)
(1). 使用Request-Reply模式,需要遵循一定的規律。
(2).客戶端必要先發送消息,在接收消息;服務端必須先進行接收客戶端發送過來的消息,在發送應答給客戶端,如此循環
(3). 服務端和客戶端誰先啟動,效果都是一樣的。
(4). 服務端在收到消息之前,會一直阻塞,等待客戶端連上來。
創建一個客戶端和服務端,客戶端發送消息給服務端,服務端返回消息給客戶端,客戶端和服務器誰先啟動都可以
client.py
import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") #客戶端必須要先發送消息,然后在接收消息 if __name__ == '__main__': print('zmq client start....') for i in range(1, 10): socket.send_string("hello") message = socket.recv() print('received reply message:{}'.format(message))
server.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") count = 0 #必須要先接收消息,然后在應答 if __name__ == '__main__': print('zmq server start....') while True: message = socket.recv() count += 1 print('received request. message:{} count:{}'.format(message, count)) time.sleep(1) socket.send_string("World!")
常見數據請求發送API:
#發送數據 socket.send_json(data) #data 會被json序列化后進行傳輸 (json.dumps) socket.send_string(data, encoding="utf-8") #data為unicode字符串,會進行編碼成子節再傳輸 socket.send_pyobj(obj) #obj為python對象,采用pickle進行序列化后傳輸 socket.send_multipart(msg_parts) # msg_parts, 發送多條消息組成的迭代器序列,每條消息是子節類型, # 如[b"message1", b"message2", b"message2"] #接收數據 socket.recv_json() socket.recv_string() socket.recv_pyobj() socket.recv_multipart()
2. Publisher-Subscriber(發布-訂閱模式)
Publisher-Subscriber模式,消息是單向流動的,發布者只能發布消息,不能接受消息;訂閱者只能接受消息,不能發送消息。
服務端發布消息的過程中,如果有訂閱者退出,不影響發布者繼續發布消息,當訂閱者再次連接上來,收到的消息是后來發布的消息
比較晚加入的訂閱者,或者中途離開的訂閱者,必然會丟掉一部分信息
如果發布者停止,所有的訂閱者會阻塞,等發布者再次上線的時候回繼續接受消息。
"慢連接": 我們不知道訂閱者是何時開始接受消息的,就算啟動"訂閱者",在啟動"發布者", "訂閱者"還是會缺失一部分的消息,因為建立連接是需要時間的,雖然時間很短,但不是零。ZMQ在后台是進行異步的IO傳輸,在建立TCP連接的短短的時間段內,ZMQ就可以發送很多消息了。
有種簡單的方法來同步"發布者" 和"訂閱者", 通過sleep讓發布者延遲發布消息,等連接建立完成后再進行發送.
Publisher.py
import zmq import time import random context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5555") if __name__ == '__main__': print("發布者啟動.....") time.sleep(2) for i in range(1000): tempterature = random.randint(-10, 40) message = "我是publisher, 這是我發布給你們的第{}個消息!今日溫度{}".format(i+1, tempterature) socket.send_string(message)
Subscriber.py
import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") # 客戶端需要設定一個過濾,否則收不到任何信息 socket.setsockopt_string(zmq.SUBSCRIBE, '') if __name__ == '__main__': print('訂閱者一號啟動....') while True: message = socket.recv_string() print("(訂閱者一號)接收到'發布者'發送的消息:{}".format(message))
3. Push-Pull(平行管道模式/分布式處理)
Ventilator:任務發布器會生成大量可以並行運算的任務。
Worker:有一組worker會處理這些任務
Sink:結果接收器會在末端接收所有的Worker的處理結果,進行匯總
Worker上游和"任務發布器"相連,下游和"結果接收器"相連
"任務發布器" 和 "結果接收器"是這個網路結構中比較穩定的部分,由他們綁定至端點
Worker只是連接兩個端點
需要等Worker全部啟動后,在進行任務分發。Socket的連接會消耗一定時間(慢連接), 如果不盡興同步的話,第一個Worker啟動
會一下子接收很多任務。
"任務分發器" 會向Worker均勻的分發任務(負載均衡機制)
"結果接收器" 會均勻地從Worker處收集消息(公平隊列機制)
Ventilator.py 任務分發
import zmq import random raw_input = input context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") if __name__ == '__main__': # 同步操作 print("Press Enter when the workers are ready: ") _ = raw_input() print("Sending tasks to workers…") sink.send_string('0') # 發送十個任務 total_msec = 0 for task_nbr in range(10): # 每個任務耗時為N workload = random.randint(1, 5) total_msec += workload sender.send_string(u'%i' % workload) print("10個任務的總工作量: %s 秒" % total_msec)
Worker.py (中間處理)
import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") if __name__ == '__main__': while True: s = receiver.recv() print('work1 接收到一個任務... 需要{}秒'.format(s)) # Do the work time.sleep(int(s)) # Send results to sink sender.send_string('work1 完成一個任務,耗時{}秒'.format(s))
Sink.py 結果接收器
import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") if __name__ == '__main__': s = receiver.recv() print('開始接收處理結果.....') # 計時,所有任務處理完一共需要多久 tstart = time.time() # 接受十個任務的處理結果 for task_nbr in range(10): s = receiver.recv_string() print(s) tend = time.time() print("三個worker同時工作,耗時: %d 秒" % (tend-tstart))
zmq.Poller(ZMQ超時判斷)
socket_req_url = config.zmq_rep_host.format(config.zmq_rep_port) socket_req = zmq.Context().socket(zmq.REQ) socket_req.connect(socket_req_url) poller = zmq.Poller() poller.register(socket_req, zmq.POLLIN)
超時重連實例:
Server服務端:
import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") count = 0 #必須要先接收消息,然后在應答 if __name__ == '__main__': print('zmq server start....') while True: message = socket.recv() count += 1 print('received request. message:{} count:{}'.format(message, count)) time.sleep(1) socket.send_string("ping test suc")
Client客戶端:
import zmq #超時重連 class PingPort(): def __init__(self): self.port = '5555' self.socket_req_url = 'tcp://localhost:{}'.format(self.port) self.socket_req = zmq.Context().socket(zmq.REQ) self.socket_req.connect(self.socket_req_url) self.poller = zmq.Poller() self.poller.register(self.socket_req, zmq.POLLIN) def ping(self): self.socket_req.send_string('ping test') if self.poller.poll(3000): resp = self.socket_req.recv() print(resp)
return True else: print('ping {} port fail, no response.'.format(self.port)) self.socket_req.setsockopt(zmq.LINGER, 0) self.socket_req.close() self.poller.unregister(self.socket_req) print('-------------開始重連--------------------') self.socket_req = zmq.Context().socket(zmq.REQ) self.socket_req.connect(self.socket_req_url) self.poller = zmq.Poller() self.poller.register(self.socket_req, zmq.POLLIN) self.ping() if __name__ == '__main__': obj = PingPort() print(obj.ping)
如果服務端未開啟,則顯示效果如下所示:
服務端開啟則返回True