用netty和opencv從攝像頭抓取視頻在前端播放


0. 描述

這個demo實現了利用opencv從攝像頭抓取視頻,並使用netty框架通過websocket將視頻傳到前端,前端不斷將圖像寫入canvas實現播放,但只有圖像沒有聲音。

1. 基本思路

攝像頭:筆記本的攝像頭

抽幀:opencv

隊列:線程安全的java.util.concurrent.ConcurrentLinkedQueue

消息分發:自己實現。用來支持多個連接,主要邏輯就是將生產者生產的數據拷貝多份分發給各個websocket連接。

websocket服務:netty

幀數據的傳輸路徑如下圖所示:

netty+websocket視頻傳輸demo
2. 使用opencv抽幀

利用opencv的videoCapture從攝像頭抽幀,並將幀數據存入隊列。

import org.opencv.core.Mat;
import org.opencv.videoio.VideoCapture;

import java.util.concurrent.ConcurrentLinkedQueue;

public class FrameProducer extends Thread {
    private static final int MAX_QUEUE_SIZE = 30;

    private final VideoCapture videoCapture = new VideoCapture(0);
    private final ConcurrentLinkedQueue<Mat> queue = new ConcurrentLinkedQueue<>();

    public void close() {
        this.videoCapture.release();
    }

    public ConcurrentLinkedQueue<Mat> queue() {
        return queue;
    }

    @Override
    public void run() {
        videoCapture.open(0);
        while (videoCapture.isOpened()) {
            if (queue.size() < MAX_QUEUE_SIZE) {
                Mat mat = new Mat();
                videoCapture.read(mat);
                queue.offer(mat);
            }
        }
    }
}
2. 消息分發
import org.opencv.core.Mat;

public class FrameDistributor extends Thread {
    private static final int MAX_QUEUE_SIZE = 30;

    FrameProducer producer;
    ConcurrentLinkedQueue<Mat> queue;
    Map<String, Queue<Mat>> matQueueMap = new ConcurrentHashMap<>();
    public boolean running = true;

    public FrameDistributor(FrameProducer producer) {
        this.producer = producer;
        this.queue = producer.queue();
    }

    synchronized public void register(String key, Queue<Mat> matQueue) {
        matQueueMap.put(key, matQueue);
        System.out.println("alive matQueue num: " + matQueueMap.entrySet().size());
    }

    synchronized public void unregister(String key) {
        matQueueMap.remove(key);
        System.out.println("alive matQueue num: " + matQueueMap.entrySet().size());
    }

    @Override
    public void run() {
        while (running) {
            Mat mat = queue.poll();
            if (mat != null) {
                matQueueMap.forEach((key, value) -> {
                    if (value.size() < MAX_QUEUE_SIZE) {
                        // 取到的mat復制一份
                        value.offer(mat.clone());
                    } else {
                        mat.release();
                    }
                });
            }
        }
    }
}
3. 使用netty實現websocket服務器

Websocket服務器:

import org.opencv.core.Mat;

public class WebsocketServer {
    private int port = 9999;
    private String address = "127.0.0.1";

    ConcurrentLinkedQueue<Mat> queue;

    public void run(ConcurrentLinkedQueue<Mat> queue) {
        this.queue = queue;

        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            bootstrap.group(boss, worker);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.handler(new LoggingHandler(LogLevel.INFO));
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new HttpServerCodec())
                            .addLast(new ChunkedWriteHandler())
                            .addLast(new HttpObjectAggregator(65536))
                            .addLast(new WebSocketServerProtocolHandler("/", null, true))//websocket協議處理器
                            .addLast(new WebSocketServerHandler(queue));//自定義websocket處理器,在此實現消息推送
                }
            });
            bootstrap.bind(address, port).sync().channel().closeFuture().sync();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

WebSocket消息處理器:

import org.opencv.core.Mat;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final ConcurrentLinkedQueue<Mat> queue = new ConcurrentLinkedQueue<>();
    private final FrameDistributor distributor;
    private ScheduledFuture<?> future;

    public WebSocketServerHandler(FrameDistributor distributor) {
        super();
        this.distributor = distributor;
    }
  
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String id = ctx.channel().id().asLongText();
        System.out.println("channelActive: " + id);
        distributor.register(id, queue);
        // 周期任務
        ctx.executor().scheduleAtFixedRate(() -> {
            Mat mat = queue.poll();
            if (mat != null) {
                // 這里用到了一個工具函數,將opencv的Mat對象轉為BufferedImage
                BufferedImage image = Utils.matToBufferedImage(mat);
                ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024);
                ByteBufOutputStream stream = new ByteBufOutputStream(buffer);

                ImageIO.write(image, "JPEG", stream);

                ctx.channel().writeAndFlush(new BinaryWebSocketFrame(buffer));
                // 由於消息分發的時候我沒有將數據拷貝,而是直接將mat對象的引用分發出去,因此這里的mat用完還不能釋放,我索性將
                // 其交給JVM的GC來釋放了。如果這里直接釋放,那么第一個連接將數據用完之后直接釋放,其他連接拿到的同樣的數據就
                // 不可用了。
                // mat.release();
            }
            // 第三個參數period不能填0,否則直接出錯,而且還沒報錯信息。
            // period盡量調整到合適的大小。(也許可以動態調整適應來環境?
            // 我的電腦攝像頭幀率30fps,最慢每33ms就得處理完一幀,不然就會丟幀。這里平均每12ms就能處理完一幀送出去。
            // 因此每20ms調用一次差不多剛好。
        }, 100, 20, TimeUnit.MILLISECONDS);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String id = ctx.channel().id().asLongText();
        if (future != null) {
            future.cancel(true);
        }
        distributor.unregister(id);
        System.out.println("channelInactive: " + id);
    }
}

工具類

public class Utils {
    // Mat轉BufferedImage
    public static BufferedImage matToBufferedImage(Mat mat) {
        int dataSize = mat.cols() * mat.rows() * (int) mat.elemSize();
        byte[] data = new byte[dataSize];
        mat.get(0, 0, data);
        int type = mat.channels() == 1 ? BufferedImage.TYPE_BYTE_GRAY : BufferedImage.TYPE_3BYTE_BGR;

        if (type == BufferedImage.TYPE_3BYTE_BGR) {
            for (int i = 0; i < dataSize; i += 3) {
                byte blue = data[i];
                data[i] = data[i + 2];
                data[i + 2] = blue;
            }
        }
        BufferedImage image = new BufferedImage(mat.cols(), mat.rows(), type);
        image.getRaster().setDataElements(0, 0, mat.cols(), mat.rows(), data);

        return image;
    }
}
4. 啟動類
import org.opencv.core.Core;
import org.opencv.core.Mat;

public class Application {
    static {
        // 這里加載opencv的動態庫
        System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
    }

    public static void main(String[] args) {
        FrameProducer producer = new FrameProducer();
        producer.setDaemon(true);
        producer.start();

        FrameDistributor distributor = new FrameDistributor(producer);
        distributor.setDaemon(true);
        distributor.start();

        new WebsocketServer().run(distributor);
    }
}
5. 前端頁面

頁面需要代理一下。可以用python的http server做代理,切到需要代理的目錄,執行下面的腳本即可。

python -m http.server 8080 --bind 127.0.0.1

ws-index.html

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <title>ws</title>
</head>

<body>
<div style="text-align: center; margin-top: 100px;">
    <button onclick='stop()'>stop</button>
    <h4>服務端消息</h4>
    <div id="msg">...</div>
    <div>
        <canvas id="canvas" width="640" height="480"></canvas>
    </div>
</div>

<script>
    const ws = new WebSocket("ws://127.0.0.1:9999/");
    const canvas = document.getElementById("canvas");
    const context = canvas.getContext("2d");
    const msg = document.getElementById("msg");
    let count = 0;

    ws.onopen = function (evt) {
        console.log("Connection open ...");
    };
    const image = new Image();

    ws.onmessage = function (evt) {
        if (evt?.data != undefined && evt.data instanceof Blob) {
            msg.innerHTML = 'Got a Frame '+ (count++);
            let frame = evt.data;
            frame.type = 'image/jpeg';
            blobToDataURI(frame, (url) => {
                image.src = url;
                image.onload = function (e) {
                    context.drawImage(image, 0, 0, 640, 480);
                }
            });
        }
    };

    ws.onclose = function (evt) {
        console.log("Connection closed.");
    };

    function stop() {
        ws.close();
    }

    function blobToDataURI(blob, callback) {
        let reader = new FileReader();
        reader.onload = function (e) {
            callback(e.target.result);
        }

        reader.readAsDataURL(blob);
    }
</script>
</body>
</html>
6. 總結

我發現Java確實不太適合做圖像這方面的處理。因為圖像數據本身就大,處理過程中難免產生中間數據,而且中間數據都跟原數據一個量級,累積起來太吃內存了。如果使用C++的話,可以在對象使用過后就及時手動釋放,Java里面得等GC去釋放。

我遇到的幾個疑點和坑:

  1. 為什么要加個消息分發?

    我一開始是沒有加消息分發的,直到我打開了三個頁面,發現事情不太對勁。打開的連接越多,視頻就越卡。

    原來,抽幀線程拿到的幀數據都存放在那一個隊列里面,websocket的連接是被並發處理的,多個連接同時從一個隊列里面搶數據,結果就是你一個我一個,連續的幀序列被瓜分,每個連接拿到的都是原序列的子序列,播放出來當然就卡成PPT了。

  2. mat什么時候釋放?

    其實mat可以不釋放,雖然它是個native對象,但由於其實現了JNI,它也在GC的管理范圍內。

  3. 為什么消息分發的時候非得復制一份?復用同一份對象可以嗎?

    消息分發的時候,可以不用復制(事實上連接多的時候,這么做會更好),直接將對象的引用分發給各個channel,由於消息分發之后沒有再對這個mat對象進行修改,因此也不存在線程安全問題,這個對象用完之后會被GC回收掉。

    我之前在WebSocketServerHandler中的周期任務里面,將用完的mat釋放掉了,后面matToBufferedImage這個方法瘋狂報錯。冷靜下來思考之后發現,這是個線程安全問題,同一份mat被多個channel引用,先來的線程用完mat跑完任務之后就把mat釋放了,后到的線程用到mat的時候發現數據沒了 (😂),然后給我拋異常:

    java.lang.IllegalArgumentException: Width (0) and height (0) must be > 0
    	at java.awt.image.SampleModel.<init>(SampleModel.java:126)
    	at java.awt.image.ComponentSampleModel.<init>(ComponentSampleModel.java:146)
    	at java.awt.image.PixelInterleavedSampleModel.<init>(PixelInterleavedSampleModel.java:87)
    	at java.awt.image.Raster.createInterleavedRaster(Raster.java:641)
    	at java.awt.image.Raster.createInterleavedRaster(Raster.java:278)
    	at java.awt.image.BufferedImage.<init>(BufferedImage.java:376)
    	at cn.kui.app.opencv.Utils.matToBufferedImage(Utils.java:27)
    	at cn.kui.app.websocket.WebSocketServerHandler.lambda$channelActive$0(WebSocketServerHandler.java:66)
    

實際的視頻傳輸傳的都是視頻片段,圖像、聲音、字幕等數據封裝成一個segment進行傳輸。下次嘗試一波。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM