一、課程介紹
1. 來源
課程使用的操作系統為 Ubuntu 16.04
,OpenCV 版本為 opencv-python 3.4.1.15
。
你可以在我的Github 上找到 Windows 系統和 Linux 系統對應的源代碼,此教程對應的版本是 v0.2。目前我正在開發的版本是 v0.3,新版本將允許使用不同IP協議的主機通信,並且范圍不再局限於局域網內。這個工具最初是為了通過IPv6節省聊天工具使用的流量而開發的。
2. 內容簡介
- 本實驗實現簡易的視頻通信工具
- 在視頻通信的基礎上加入語音
- 用戶可以選擇通信的質量,即畫質、停頓等參數
- 支持IPv6
3. 實驗知識點
本課程項目完成過程中將學習:
- Python 基於 OpenCV 對攝像頭信息的捕獲和壓縮
- Python 關於 線程 和 socket 通信的一些基礎技巧
- Python 基於 PyAudio 對語音信息的捕獲和壓縮
其中將重點介紹 socket 傳輸過程中對數據的壓縮和處理。
4.實驗環境
- python 3.5
- opencv-python 3.4.1.15
- numpy 1.14.5
- PyAudio 0.2.11
二、環境搭建
通過以下命令可下載項目源碼,作為參照對比完成下面詳細步驟的學習。
$ cd Code $ wget https://labfile.oss.aliyuncs.com/courses/672/ichat.zip $ unzip ichat.zip
現在開始下載環境依賴的包,確保在剛在解壓文件下的目錄里運行。
$ cd ichat $ sudo pip3 install numpy $ sudo pip3 install opencv_python
這一步下載了我們需要的opencv-python和numpy兩個包。
剩下的PyAudio,由於本虛擬環境的部分問題,我們單獨分開下載。
$ sudo apt-get install portaudio19-dev python-all-dev python3-all-dev $ sudo pip3 install pyaudio==0.2.11
現在,我們的實驗環境就搭好了。
三、實驗原理
實驗實現了簡易的視頻通信工具,基於 OpenCV 和 PyAudio,使用 TCP 協議通信,通信雙方建立雙向 CS 連接,雙方均維護一個客戶端和一個服務器端。在捕獲視頻信息后,根據用戶指定的參數對畫面做壓縮並傳輸。
四、實驗步驟
接下來我們分步驟講解本實驗。
4.1 實現雙向 C/S 連接
先為雙方的通信設計 Server 類和 Client類,兩個類均繼承 threading.Thread
,只需要分別實現 __init__
、 __del__
和 run
方法,之后對象調用 .start()
方法即可在獨立線程中執行 run
方法中的內容。首先 Client
類需要存儲遠端的IP地址和端口,而 Server
類需要存儲本地服務器監聽的端口號。用戶還應當可以指定通信雙方使用的協議版本,即基於IPv4 還是IPv6 的TCP連接。因此 Server
類的初始化需要傳入兩個參數(端口、版本), Client
類的初始化需要三個參數(遠端IP、端口、版本)。新建文件 vchat.py
,在其中定義基礎的兩個類如下。
from socket import * import threading class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() # TODO def run(self): print("server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote client success connected...") # TODO class Video_Client(threading.Thread): def __init__(self ,ip, port, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) def __del__(self) : self.sock.close() # TODO def run(self): print("client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("client connected...") # TODO
4.2 實現攝像頭數據流捕獲
OpenCV 為 Python 提供的接口非常簡單並且易於理解。捕獲視頻流的任務應當由 Client
類完成,下面完善 Client
的 run
函數。在下面的代碼中,我們為類添加了一個成員變量 cap
,它用來捕獲默認攝像頭的輸出。
class Video_Client(threading.Thread): def __init__(self ,ip, port, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) self.cap = cv2.VideoCapture(0) def __del__(self) : self.sock.close() self.cap.release() def run(self): print("client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() # TODO
4.3 發送捕獲到的數據到服務器
已經捕獲到數據,接下來要發送字節流。首先我們繼續編寫 Client
,為其添加發送數據功能的實現。這里只改動了 run
方法。在捕獲到幀后,我們使用 pickle.dumps
方法對其打包,並用 sock.sendall
方法發送。注意發送過程中我們用 struct.pack
方法為每批數據加了一個頭,用於接收方確認接受數據的長度。
def run(self): while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() data = pickle.dumps(frame) try: self.sock.sendall(struct.pack("L", len(data)) + data) except: break
下面編寫 Server
,在服務器端連接成功后,應當創建一個窗口用於顯示接收到的視頻。因為連接不一定創建成功,因此 cv.destroyAllWindows()
被放在一個 try..catch
塊中防止出現錯誤。在接收數據過程中,我們使用 payload_size
記錄當前從緩沖區讀入的數據長度,這個長度通過 struct.calcsize('L')
來讀取。使用該變量的意義在於緩沖區中讀出的數據可能不足一個幀,也可能由多個幀構成。為了准確提取每一幀,我們用 payload_size
區分幀的邊界。在從緩沖區讀出的數據流長度超過 payload_size
時,剩余部分和下一次讀出的數據流合並,不足 payload_size
時將合並下一次讀取的數據流到當前幀中。在接收完完整的一幀后,顯示在創建的窗口中。同時我們為窗口創建一個鍵盤響應,當按下 Esc
或 q
鍵時退出程序。
class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() try: cv2.destroyAllWindows() except: pass def run(self): print("server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
4.4 視頻縮放和數據壓縮
現在的服務器和客戶端已經可以運行,你可以在代碼中創建一個 Client
類實例和一個 Server
類實例,並將IP地址設為 127.0.0.1
,端口設為任意合法的(0-65535)且不沖突的值,版本設為IPv4。執行代碼等同於自己和自己通信。如果網絡狀況不好,你也許會發現自己和自己的通信也有卡頓現象。為了使畫面質量、延遲能夠和現實網絡狀況相匹配,我們需要允許用戶指定通信中畫面的質量,同時我們的代碼應當本身具有壓縮數據的能力,以盡可能利用帶寬。
當用戶指定使用低畫質通信,我們應當對原始數據做變換,最簡單的方式即將捕獲的每一幀按比例縮放,同時降低傳輸的幀速,在代碼中體現為 resize
,該函數的第二個參數為縮放中心,后兩個參數為縮放比例,並且根據用戶指定的等級,不再傳輸捕獲的每一幀,而是間隔幾幀傳輸一幀。為了防止用戶指定的畫質過差,代碼中限制了最壞情況下的縮放比例為0.3,最大幀間隔為3。此外,我們在發送每一幀的數據前使用 zlib.compress
對其壓縮,盡量降低帶寬負擔。
class Video_Client(threading.Thread): def __init__(self ,ip, port, level, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if level <= 3: self.interval = level else: self.interval = 3 self.fx = 1 / (self.interval + 1) if self.fx < 0.3: self.fx = 0.3 if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) self.cap = cv2.VideoCapture(0) def __del__(self) : self.sock.close() self.cap.release() def run(self): print("VEDIO client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("VEDIO client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx) data = pickle.dumps(sframe) zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION) try: self.sock.sendall(struct.pack("L", len(zdata)) + zdata) except: break for i in range(self.interval): self.cap.read()
服務器端最終代碼如下,增加了對接收到數據的解壓縮處理。
class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() try: cv2.destroyAllWindows() except: pass def run(self): print("VEDIO server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote VEDIO client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
4.5 加入音頻的捕獲和傳輸
在完成視頻通信的基礎上,整體框架對於音頻通信可以直接挪用,只需要修改其中捕獲視頻/音頻的代碼和服務器解碼播放的部分。這里我們使用 PyAudio 庫處理音頻,在 Linux 下你也可以選擇 sounddevice
。關於 sounddevice
這里不做過多介紹,你可以在這里看到它最新版本的文檔。將 vchat.py
復制一份,重命名為 achat.py
,簡單修改幾處,最終音頻捕獲、傳輸的完整代碼如下。我將上面代碼中的 Server
和 Client
分別加上 Video
和 Audio
前綴以區分,同時顯示給用戶的 print
輸出語句也做了一定修改,對於視頻加上 VIDEO
前綴,音頻加上 AUDIO
前綴。如果你對代碼中使用到的 PyAudio 提供的庫函數有所疑問,可以在這里找到相關的入門文檔及示例。
class Audio_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) self.p = pyaudio.PyAudio() self.stream = None def __del__(self): self.sock.close() if self.stream is not None: self.stream.stop_stream() self.stream.close() self.p.terminate() def run(self): print("AUDIO server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote AUDIO client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") self.stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer = CHUNK ) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) frame_data = data[:msg_size] data = data[msg_size:] frames = pickle.loads(frame_data) for frame in frames: self.stream.write(frame, CHUNK) class Audio_Client(threading.Thread): def __init__(self ,ip, port, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) self.p = pyaudio.PyAudio() self.stream = None def __del__(self) : self.sock.close() if self.stream is not None: self.stream.stop_stream() self.stream.close() self.p.terminate() def run(self): print("AUDIO client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("AUDIO client connected...") self.stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) while self.stream.is_active(): frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = self.stream.read(CHUNK) frames.append(data) senddata = pickle.dumps(frames) try: self.sock.sendall(struct.pack("L", len(senddata)) + senddata) except: break
至此我們完成了 vchat.py 的編寫。
4.6 編寫程序入口 main.py
為了提供用戶參數解析,代碼使用了 argparse
。你可能對此前幾個類中初始化方法的 self.setDaemon(True)
有疑惑。這個方法的調用使每個線程在主線程結束之后自動退出,保證程序不會出現崩潰且無法銷毀的情況。在 main.py
中,我們通過每隔1s做一次線程的保活檢查,如果視頻/音頻中出現阻塞/故障,主線程會終止。
import sys import time import argparse from vchat import Video_Server, Video_Client from achat import Audio_Server, Audio_Client parser = argparse.ArgumentParser() parser.add_argument('--host', type=str, default='127.0.0.1') parser.add_argument('--port', type=int, default=10087) parser.add_argument('--level', type=int, default=1) parser.add_argument('-v', '--version', type=int, default=4) args = parser.parse_args() IP = args.host PORT = args.port VERSION = args.version LEVEL = args.level if __name__ == '__main__': vclient = Video_Client(IP, PORT, LEVEL, VERSION) vserver = Video_Server(PORT, VERSION) aclient = Audio_Client(IP, PORT+1, VERSION) aserver = Audio_Server(PORT+1, VERSION) vclient.start() aclient.start() time.sleep(1) # make delay to start server vserver.start() aserver.start() while True: time.sleep(1) if not vserver.isAlive() or not vclient.isAlive(): print("Video connection lost...") sys.exit(0) if not aserver.isAlive() or not aclient.isAlive(): print("Audio connection lost...") sys.exit(0)
4.7 運行情況
因為實驗樓的環境沒有提供攝像頭,因此我們需要修改一下代碼,讓程序從一個本地視頻文件讀取,模擬攝像頭的訪問。將 Video_Client
中 self.cap = cv2.VideoCapture(0)
改為 self.cap = cv2.VideoCapture('test.mp4')
,即從本地視頻 test.mp4
中讀取。在修改完你的代碼后,你可以通過以下命令下載 test.mp4
(該視頻文件是周傑倫《浪漫手機》的MV),並檢驗代碼。(請確保在ichat文件夾下!)
$ wget http://labfile.oss.aliyuncs.com/courses/671/test.mp4 $ python3 main.py
和上面命令一樣,在本機可以通過 python3 main.py
來實驗本機和本機的視頻聊天,如果你有條件在同一局域網內的兩台機器上實驗,則可以將程序部署在兩台機器上,並相互連接觀察效果。下面兩張圖為本機上實驗截圖,有些情況下 PyAudio 可能會提示一些警告,你可以忽視它的提示。用戶也可以指定 level
參數, level
越高,畫質越差, level
為 0 為原始畫面,在我們的 main.py
中默認 level
為 1。