Python3 實現簡易局域網視頻聊天工具


一、課程介紹

1. 來源

課程使用的操作系統為 Ubuntu 16.04 ,OpenCV 版本為 opencv-python 3.4.1.15 。

你可以在我的Github 上找到 Windows 系統和 Linux 系統對應的源代碼,此教程對應的版本是 v0.2。目前我正在開發的版本是 v0.3,新版本將允許使用不同IP協議的主機通信,並且范圍不再局限於局域網內。這個工具最初是為了通過IPv6節省聊天工具使用的流量而開發的。

2. 內容簡介

  • 本實驗實現簡易的視頻通信工具
  • 在視頻通信的基礎上加入語音
  • 用戶可以選擇通信的質量,即畫質、停頓等參數
  • 支持IPv6

3. 實驗知識點

本課程項目完成過程中將學習:

  • Python 基於 OpenCV 對攝像頭信息的捕獲和壓縮
  • Python 關於 線程 和 socket 通信的一些基礎技巧
  • Python 基於 PyAudio 對語音信息的捕獲和壓縮

其中將重點介紹 socket 傳輸過程中對數據的壓縮和處理。

4.實驗環境

  • python 3.5
  • opencv-python 3.4.1.15
  • numpy 1.14.5
  • PyAudio 0.2.11

二、環境搭建

通過以下命令可下載項目源碼,作為參照對比完成下面詳細步驟的學習。

$ cd Code
$ wget https://labfile.oss.aliyuncs.com/courses/672/ichat.zip
$ unzip ichat.zip

現在開始下載環境依賴的包,確保在剛在解壓文件下的目錄里運行。

$ cd ichat
$ sudo pip3 install numpy
$ sudo pip3 install opencv_python

這一步下載了我們需要的opencv-python和numpy兩個包。

剩下的PyAudio,由於本虛擬環境的部分問題,我們單獨分開下載。

$ sudo apt-get install portaudio19-dev python-all-dev python3-all-dev
$ sudo pip3 install pyaudio==0.2.11

現在,我們的實驗環境就搭好了。

三、實驗原理

實驗實現了簡易的視頻通信工具,基於 OpenCV 和 PyAudio,使用 TCP 協議通信,通信雙方建立雙向 CS 連接,雙方均維護一個客戶端和一個服務器端。在捕獲視頻信息后,根據用戶指定的參數對畫面做壓縮並傳輸。

四、實驗步驟

接下來我們分步驟講解本實驗。

4.1 實現雙向 C/S 連接

先為雙方的通信設計 Server 類和 Client類,兩個類均繼承 threading.Thread ,只需要分別實現 __init__ 、 __del__ 和 run 方法,之后對象調用 .start() 方法即可在獨立線程中執行 run 方法中的內容。首先 Client 類需要存儲遠端的IP地址和端口,而 Server 類需要存儲本地服務器監聽的端口號。用戶還應當可以指定通信雙方使用的協議版本,即基於IPv4 還是IPv6 的TCP連接。因此 Server 類的初始化需要傳入兩個參數(端口、版本), Client 類的初始化需要三個參數(遠端IP、端口、版本)。新建文件 vchat.py ,在其中定義基礎的兩個類如下。

from socket import *
import threading
class Video_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
    def __del__(self):
        self.sock.close()
        # TODO
    def run(self):
        print("server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote client success connected...")
        # TODO

class Video_Client(threading.Thread):
    def __init__(self ,ip, port, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
    def __del__(self) :
        self.sock.close()
        # TODO
    def run(self):
        print("client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("client connected...")
        # TODO

4.2 實現攝像頭數據流捕獲

OpenCV 為 Python 提供的接口非常簡單並且易於理解。捕獲視頻流的任務應當由 Client 類完成,下面完善 Client 的 run 函數。在下面的代碼中,我們為類添加了一個成員變量 cap ,它用來捕獲默認攝像頭的輸出。

class Video_Client(threading.Thread):
    def __init__(self ,ip, port, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
        self.cap = cv2.VideoCapture(0)
    def __del__(self) :
        self.sock.close()
        self.cap.release()
    def run(self):
        print("client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
        # TODO

4.3 發送捕獲到的數據到服務器

已經捕獲到數據,接下來要發送字節流。首先我們繼續編寫 Client ,為其添加發送數據功能的實現。這里只改動了 run 方法。在捕獲到幀后,我們使用 pickle.dumps 方法對其打包,並用 sock.sendall 方法發送。注意發送過程中我們用 struct.pack 方法為每批數據加了一個頭,用於接收方確認接受數據的長度。

def run(self):
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            data = pickle.dumps(frame)
            try:
                self.sock.sendall(struct.pack("L", len(data)) + data)
            except:
                break

下面編寫 Server ,在服務器端連接成功后,應當創建一個窗口用於顯示接收到的視頻。因為連接不一定創建成功,因此 cv.destroyAllWindows() 被放在一個 try..catch 塊中防止出現錯誤。在接收數據過程中,我們使用 payload_size 記錄當前從緩沖區讀入的數據長度,這個長度通過 struct.calcsize('L') 來讀取。使用該變量的意義在於緩沖區中讀出的數據可能不足一個幀,也可能由多個幀構成。為了准確提取每一幀,我們用 payload_size 區分幀的邊界。在從緩沖區讀出的數據流長度超過 payload_size 時,剩余部分和下一次讀出的數據流合並,不足 payload_size 時將合並下一次讀取的數據流到當前幀中。在接收完完整的一幀后,顯示在創建的窗口中。同時我們為窗口創建一個鍵盤響應,當按下 Esc 或 q 鍵時退出程序。

class Video_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
    def __del__(self):
        self.sock.close()
        try:
            cv2.destroyAllWindows()
        except:
            pass
    def run(self):
        print("server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
                break

4.4 視頻縮放和數據壓縮

現在的服務器和客戶端已經可以運行,你可以在代碼中創建一個 Client 類實例和一個 Server 類實例,並將IP地址設為 127.0.0.1 ,端口設為任意合法的(0-65535)且不沖突的值,版本設為IPv4。執行代碼等同於自己和自己通信。如果網絡狀況不好,你也許會發現自己和自己的通信也有卡頓現象。為了使畫面質量、延遲能夠和現實網絡狀況相匹配,我們需要允許用戶指定通信中畫面的質量,同時我們的代碼應當本身具有壓縮數據的能力,以盡可能利用帶寬。

當用戶指定使用低畫質通信,我們應當對原始數據做變換,最簡單的方式即將捕獲的每一幀按比例縮放,同時降低傳輸的幀速,在代碼中體現為 resize ,該函數的第二個參數為縮放中心,后兩個參數為縮放比例,並且根據用戶指定的等級,不再傳輸捕獲的每一幀,而是間隔幾幀傳輸一幀。為了防止用戶指定的畫質過差,代碼中限制了最壞情況下的縮放比例為0.3,最大幀間隔為3。此外,我們在發送每一幀的數據前使用 zlib.compress 對其壓縮,盡量降低帶寬負擔。

class Video_Client(threading.Thread):
    def __init__(self ,ip, port, level, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if level <= 3:
            self.interval = level
        else:
            self.interval = 3
        self.fx = 1 / (self.interval + 1)
        if self.fx < 0.3:
            self.fx = 0.3
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
        self.cap = cv2.VideoCapture(0)
    def __del__(self) :
        self.sock.close()
        self.cap.release()
    def run(self):
        print("VEDIO client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("VEDIO client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx)
            data = pickle.dumps(sframe)
            zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION)
            try:
                self.sock.sendall(struct.pack("L", len(zdata)) + zdata)
            except:
                break
            for i in range(self.interval):
                self.cap.read()

服務器端最終代碼如下,增加了對接收到數據的解壓縮處理。

class Video_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
    def __del__(self):
        self.sock.close()
        try:
            cv2.destroyAllWindows()
        except:
            pass
    def run(self):
        print("VEDIO server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote VEDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
                break

4.5 加入音頻的捕獲和傳輸

在完成視頻通信的基礎上,整體框架對於音頻通信可以直接挪用,只需要修改其中捕獲視頻/音頻的代碼和服務器解碼播放的部分。這里我們使用 PyAudio 庫處理音頻,在 Linux 下你也可以選擇 sounddevice。關於 sounddevice 這里不做過多介紹,你可以在這里看到它最新版本的文檔。將 vchat.py 復制一份,重命名為 achat.py ,簡單修改幾處,最終音頻捕獲、傳輸的完整代碼如下。我將上面代碼中的 Server 和 Client 分別加上 Video 和 Audio 前綴以區分,同時顯示給用戶的 print 輸出語句也做了一定修改,對於視頻加上 VIDEO 前綴,音頻加上 AUDIO 前綴。如果你對代碼中使用到的 PyAudio 提供的庫函數有所疑問,可以在這里找到相關的入門文檔及示例。

class Audio_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
        self.p = pyaudio.PyAudio()
        self.stream = None
    def __del__(self):
        self.sock.close()
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
        self.p.terminate()
    def run(self):
        print("AUDIO server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote AUDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        self.stream = self.p.open(format=FORMAT,
                                  channels=CHANNELS,
                                  rate=RATE,
                                  output=True,
                                  frames_per_buffer = CHUNK
                                  )
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            frame_data = data[:msg_size]
            data = data[msg_size:]
            frames = pickle.loads(frame_data)
            for frame in frames:
                self.stream.write(frame, CHUNK)

class Audio_Client(threading.Thread):
    def __init__(self ,ip, port, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
        self.p = pyaudio.PyAudio()
        self.stream = None
    def __del__(self) :
        self.sock.close()
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
        self.p.terminate()
    def run(self):
        print("AUDIO client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("AUDIO client connected...")
        self.stream = self.p.open(format=FORMAT, 
                             channels=CHANNELS,
                             rate=RATE,
                             input=True,
                             frames_per_buffer=CHUNK)
        while self.stream.is_active():
            frames = []
            for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
                data = self.stream.read(CHUNK)
                frames.append(data)
            senddata = pickle.dumps(frames)
            try:
                self.sock.sendall(struct.pack("L", len(senddata)) + senddata)
            except:
                break

至此我們完成了 vchat.py 的編寫。

4.6 編寫程序入口 main.py

為了提供用戶參數解析,代碼使用了 argparse 。你可能對此前幾個類中初始化方法的 self.setDaemon(True) 有疑惑。這個方法的調用使每個線程在主線程結束之后自動退出,保證程序不會出現崩潰且無法銷毀的情況。在 main.py 中,我們通過每隔1s做一次線程的保活檢查,如果視頻/音頻中出現阻塞/故障,主線程會終止。

import sys
import time
import argparse
from vchat import Video_Server, Video_Client
from achat import Audio_Server, Audio_Client

parser = argparse.ArgumentParser()

parser.add_argument('--host', type=str, default='127.0.0.1')
parser.add_argument('--port', type=int, default=10087)
parser.add_argument('--level', type=int, default=1)
parser.add_argument('-v', '--version', type=int, default=4)

args = parser.parse_args()

IP = args.host
PORT = args.port
VERSION = args.version
LEVEL = args.level

if __name__ == '__main__':
    vclient = Video_Client(IP, PORT, LEVEL, VERSION)
    vserver = Video_Server(PORT, VERSION)
    aclient = Audio_Client(IP, PORT+1, VERSION)
    aserver = Audio_Server(PORT+1, VERSION)
    vclient.start()
    aclient.start()
    time.sleep(1)    # make delay to start server
    vserver.start()
    aserver.start()
    while True:
        time.sleep(1)
        if not vserver.isAlive() or not vclient.isAlive():
            print("Video connection lost...")
            sys.exit(0)
        if not aserver.isAlive() or not aclient.isAlive():
            print("Audio connection lost...")
            sys.exit(0)

4.7 運行情況

因為實驗樓的環境沒有提供攝像頭,因此我們需要修改一下代碼,讓程序從一個本地視頻文件讀取,模擬攝像頭的訪問。將 Video_Client 中 self.cap = cv2.VideoCapture(0) 改為 self.cap = cv2.VideoCapture('test.mp4') ,即從本地視頻 test.mp4 中讀取。在修改完你的代碼后,你可以通過以下命令下載 test.mp4 (該視頻文件是周傑倫《浪漫手機》的MV),並檢驗代碼。(請確保在ichat文件夾下!)

$ wget http://labfile.oss.aliyuncs.com/courses/671/test.mp4
$ python3 main.py

和上面命令一樣,在本機可以通過 python3 main.py 來實驗本機和本機的視頻聊天,如果你有條件在同一局域網內的兩台機器上實驗,則可以將程序部署在兩台機器上,並相互連接觀察效果。下面兩張圖為本機上實驗截圖,有些情況下 PyAudio 可能會提示一些警告,你可以忽視它的提示。用戶也可以指定 level 參數, level 越高,畫質越差, level 為 0 為原始畫面,在我們的 main.py 中默認 level 為 1。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM