zmq訂閱發布模式
server端代碼:
#coding=utf-8 ''''' 服務端,發布模式 ''' import zmq from random import randrange context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://127.0.0.1:8000") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send("%i %i %i" % (zipcode,temperature , relhumidity))
客戶端代碼:
#coding=utf-8 ''''' 訂閱模式,如果設置了過濾條件,那么只會接收到以過濾條件開頭的消息 ''' import sys import zmq # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://127.0.0.1:8000") # Subscribe to zipcode, default is NYC, 10001 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002" #此處設置過濾條件,只有以 zip_filter 開頭的消息才會被接收 socket.setsockopt(zmq.SUBSCRIBE, zip_filter) # Process 5 updates total_temp = 0 for update_nbr in range(5): string = socket.recv() print string zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print("Average temperature for zipcode '%s' was %dF" % ( zip_filter, total_temp / update_nbr) )
總結
1、 zmq的程序,也是要分清服務端和客戶端的,服務端也是要綁定ip和端口的
2、 如果我們先啟動客戶端,后啟動服務端,那么程序是可以正常運行的,換成socket,就不行,socket只能先啟動服務端,后啟動客戶端
3、 學習zmq的過程,千萬別總想着socket,你能用socket傳輸文件,但是如果用zmq做同樣的事情,那你就錯誤的使用了zmq,記住,這是一個消息通信庫,它自己實現了一些協議,使得我們可以非常輕松的在節點間,進程間,線程間傳遞消息,如果你對我剛才說的節點間,進程間,線程間傳遞消息沒什么興趣,說明,你平日里寫的程序都是單進程,單線程的,只管順序執行就好了,其他的不用考慮。
4、廣播,這種模式沒有隊列緩存,斷開之后數據將丟失
下面,分析這兩段程序。
1、 不論是服務端還是客戶端,都需要獲得zmq上下文
context = zmq.Context()
2、獲得socket,這個socket不是我們平日里以為的那個socket。zmq里叫socket,我猜可能是為了方便大家學習才這么命名。它的表現,已經遠遠的超出了我們對以前的那個socket的了解。每一個socket都是有自己的類型的,示例中,服務端的socket的類型是zmq.PUB,客戶端的socket的類型是zmq.SUB,pub是發布,sub是訂閱。說的通俗點,就是有一個pub節點,可以有多個sub節點,pub節點發出去的消息,如果sub節點沒有設置過濾條件,那么就會接收所有的消息,如果有過濾條件,就只接收滿足過濾條件的消息。想想看,有沒有那么一個時刻,你希望你的程序等待一個命令,收到命令后,你讓程序去做一些事情?那么pub與sub模式非常適合這種應用場景。
3、設置過濾條件很簡單
socket.setsockopt(zmq.SUBSCRIBE, zip_filter)
第二個參數就是你期望的過濾條件,只有那些以這個過濾條件開頭的消息才會被接收
問答環節
問題1: 如果想創建多個socket怎么寫?
答: 一個上下文可以創建任意多個socket,完全不受限制
問題2: 明明先啟動了客戶端,后啟動的服務端,為啥有些消息卻沒有收到呢?
答: 就算你先啟動了客戶端,服務端pub出去的一些消息也還是可能沒有被收到,因為你啟動服務端時,服務端與客戶端要建立連接,而這個時候,消息其實已經發出去了,所以你沒收到。
問題3: 在訂閱發布模型中,如果客戶端斷開連接,或是服務端斷開連接會產生什么樣的影響
答: 如果是客戶端斷開連接,沒什么的,就好比一堆人在聽收音機,現在離開一個人,收音機繼續播放嘍。如果是服務端斷開了呢,比如程序死掉了,那么請放心,客戶端不會發生崩潰,只是阻塞在socket.recv() 這條語句上,更神奇的是,如果你恢復了服務端
現在,我們修改一下客戶端程序
#coding=utf-8 ''''' 訂閱模式,如果設置了過濾條件,那么只會接收到以過濾條件開頭的消息 ''' import sys import zmq import time # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:8000") # Subscribe to zipcode, default is NYC, 10001 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002" #此處設置過濾條件,只有以 zip_filter 開頭的消息才會被接收 socket.setsockopt(zmq.SUBSCRIBE, zip_filter) # Process 5 updates total_temp = 0 for update_nbr in range(50): print 'wait recv' string = socket.recv() print 'has recv' time.sleep(1) print string zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print("Average temperature for zipcode '%s' was %dF" % ( zip_filter, total_temp / update_nbr) )
服務端
#coding=utf-8 ''''' 服務端,發布模式 ''' import zmq import time from random import randrange context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:8000") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send("%i %i %i" % (10002,temperature , relhumidity))
服務端和客戶端都啟動,這時候,客戶端收到一條消息后會睡一秒鍾,但是服務端卻是一刻不停的在發送消息,那么問題來了,一個發的快,一個收的慢,那么這時候把服務端停掉會怎樣呢?
實際的效果是,服務端停下來了,客戶端依然在接收消息,因為有一些消息被緩存起來了,雖然服務端不再發送了,客戶端卻依然可以接收得到,但這種接收,只是從之前接收的緩沖區里取數據。
現在,我們在服務端最后加上一條語句,time.sleep(2),這樣,服務端發送一條消息后,睡兩秒鍾,發的慢,收的快了,我們再次啟動服務端和客戶端,當客戶端收到一些消息后,關掉服務端,這次,客戶端很快就停止接收了,因為發的慢,所以緩沖區里沒有數據,現在,我們再次啟動服務端,你會發現,客戶端又開始接收數據了,哈哈,神奇吧!