0. 描述
這個demo實現了利用opencv從攝像頭抓取視頻,並使用netty框架通過websocket將視頻傳到前端,前端不斷將圖像寫入canvas實現播放,但只有圖像沒有聲音。
1. 基本思路
攝像頭:筆記本的攝像頭
抽幀:opencv
隊列:線程安全的java.util.concurrent.ConcurrentLinkedQueue
消息分發:自己實現。用來支持多個連接,主要邏輯就是將生產者生產的數據拷貝多份分發給各個websocket連接。
websocket服務:netty
幀數據的傳輸路徑如下圖所示:
2. 使用opencv抽幀
利用opencv的videoCapture從攝像頭抽幀,並將幀數據存入隊列。
import org.opencv.core.Mat;
import org.opencv.videoio.VideoCapture;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FrameProducer extends Thread {
private static final int MAX_QUEUE_SIZE = 30;
private final VideoCapture videoCapture = new VideoCapture(0);
private final ConcurrentLinkedQueue<Mat> queue = new ConcurrentLinkedQueue<>();
public void close() {
this.videoCapture.release();
}
public ConcurrentLinkedQueue<Mat> queue() {
return queue;
}
@Override
public void run() {
videoCapture.open(0);
while (videoCapture.isOpened()) {
if (queue.size() < MAX_QUEUE_SIZE) {
Mat mat = new Mat();
videoCapture.read(mat);
queue.offer(mat);
}
}
}
}
2. 消息分發
import org.opencv.core.Mat;
public class FrameDistributor extends Thread {
private static final int MAX_QUEUE_SIZE = 30;
FrameProducer producer;
ConcurrentLinkedQueue<Mat> queue;
Map<String, Queue<Mat>> matQueueMap = new ConcurrentHashMap<>();
public boolean running = true;
public FrameDistributor(FrameProducer producer) {
this.producer = producer;
this.queue = producer.queue();
}
synchronized public void register(String key, Queue<Mat> matQueue) {
matQueueMap.put(key, matQueue);
System.out.println("alive matQueue num: " + matQueueMap.entrySet().size());
}
synchronized public void unregister(String key) {
matQueueMap.remove(key);
System.out.println("alive matQueue num: " + matQueueMap.entrySet().size());
}
@Override
public void run() {
while (running) {
Mat mat = queue.poll();
if (mat != null) {
matQueueMap.forEach((key, value) -> {
if (value.size() < MAX_QUEUE_SIZE) {
// 取到的mat復制一份
value.offer(mat.clone());
} else {
mat.release();
}
});
}
}
}
}
3. 使用netty實現websocket服務器
Websocket服務器:
import org.opencv.core.Mat;
public class WebsocketServer {
private int port = 9999;
private String address = "127.0.0.1";
ConcurrentLinkedQueue<Mat> queue;
public void run(ConcurrentLinkedQueue<Mat> queue) {
this.queue = queue;
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerProtocolHandler("/", null, true))//websocket協議處理器
.addLast(new WebSocketServerHandler(queue));//自定義websocket處理器,在此實現消息推送
}
});
bootstrap.bind(address, port).sync().channel().closeFuture().sync();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
WebSocket消息處理器:
import org.opencv.core.Mat;
public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final ConcurrentLinkedQueue<Mat> queue = new ConcurrentLinkedQueue<>();
private final FrameDistributor distributor;
private ScheduledFuture<?> future;
public WebSocketServerHandler(FrameDistributor distributor) {
super();
this.distributor = distributor;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String id = ctx.channel().id().asLongText();
System.out.println("channelActive: " + id);
distributor.register(id, queue);
// 周期任務
ctx.executor().scheduleAtFixedRate(() -> {
Mat mat = queue.poll();
if (mat != null) {
// 這里用到了一個工具函數,將opencv的Mat對象轉為BufferedImage
BufferedImage image = Utils.matToBufferedImage(mat);
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024);
ByteBufOutputStream stream = new ByteBufOutputStream(buffer);
ImageIO.write(image, "JPEG", stream);
ctx.channel().writeAndFlush(new BinaryWebSocketFrame(buffer));
// 由於消息分發的時候我沒有將數據拷貝,而是直接將mat對象的引用分發出去,因此這里的mat用完還不能釋放,我索性將
// 其交給JVM的GC來釋放了。如果這里直接釋放,那么第一個連接將數據用完之后直接釋放,其他連接拿到的同樣的數據就
// 不可用了。
// mat.release();
}
// 第三個參數period不能填0,否則直接出錯,而且還沒報錯信息。
// period盡量調整到合適的大小。(也許可以動態調整適應來環境?
// 我的電腦攝像頭幀率30fps,最慢每33ms就得處理完一幀,不然就會丟幀。這里平均每12ms就能處理完一幀送出去。
// 因此每20ms調用一次差不多剛好。
}, 100, 20, TimeUnit.MILLISECONDS);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String id = ctx.channel().id().asLongText();
if (future != null) {
future.cancel(true);
}
distributor.unregister(id);
System.out.println("channelInactive: " + id);
}
}
工具類
public class Utils {
// Mat轉BufferedImage
public static BufferedImage matToBufferedImage(Mat mat) {
int dataSize = mat.cols() * mat.rows() * (int) mat.elemSize();
byte[] data = new byte[dataSize];
mat.get(0, 0, data);
int type = mat.channels() == 1 ? BufferedImage.TYPE_BYTE_GRAY : BufferedImage.TYPE_3BYTE_BGR;
if (type == BufferedImage.TYPE_3BYTE_BGR) {
for (int i = 0; i < dataSize; i += 3) {
byte blue = data[i];
data[i] = data[i + 2];
data[i + 2] = blue;
}
}
BufferedImage image = new BufferedImage(mat.cols(), mat.rows(), type);
image.getRaster().setDataElements(0, 0, mat.cols(), mat.rows(), data);
return image;
}
}
4. 啟動類
import org.opencv.core.Core;
import org.opencv.core.Mat;
public class Application {
static {
// 這里加載opencv的動態庫
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
}
public static void main(String[] args) {
FrameProducer producer = new FrameProducer();
producer.setDaemon(true);
producer.start();
FrameDistributor distributor = new FrameDistributor(producer);
distributor.setDaemon(true);
distributor.start();
new WebsocketServer().run(distributor);
}
}
5. 前端頁面
頁面需要代理一下。可以用python的http server做代理,切到需要代理的目錄,執行下面的腳本即可。
python -m http.server 8080 --bind 127.0.0.1
ws-index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ws</title>
</head>
<body>
<div style="text-align: center; margin-top: 100px;">
<button onclick='stop()'>stop</button>
<h4>服務端消息</h4>
<div id="msg">...</div>
<div>
<canvas id="canvas" width="640" height="480"></canvas>
</div>
</div>
<script>
const ws = new WebSocket("ws://127.0.0.1:9999/");
const canvas = document.getElementById("canvas");
const context = canvas.getContext("2d");
const msg = document.getElementById("msg");
let count = 0;
ws.onopen = function (evt) {
console.log("Connection open ...");
};
const image = new Image();
ws.onmessage = function (evt) {
if (evt?.data != undefined && evt.data instanceof Blob) {
msg.innerHTML = 'Got a Frame '+ (count++);
let frame = evt.data;
frame.type = 'image/jpeg';
blobToDataURI(frame, (url) => {
image.src = url;
image.onload = function (e) {
context.drawImage(image, 0, 0, 640, 480);
}
});
}
};
ws.onclose = function (evt) {
console.log("Connection closed.");
};
function stop() {
ws.close();
}
function blobToDataURI(blob, callback) {
let reader = new FileReader();
reader.onload = function (e) {
callback(e.target.result);
}
reader.readAsDataURL(blob);
}
</script>
</body>
</html>
6. 總結
我發現Java確實不太適合做圖像這方面的處理。因為圖像數據本身就大,處理過程中難免產生中間數據,而且中間數據都跟原數據一個量級,累積起來太吃內存了。如果使用C++的話,可以在對象使用過后就及時手動釋放,Java里面得等GC去釋放。
我遇到的幾個疑點和坑:
-
為什么要加個消息分發?
我一開始是沒有加消息分發的,直到我打開了三個頁面,發現事情不太對勁。打開的連接越多,視頻就越卡。
原來,抽幀線程拿到的幀數據都存放在那一個隊列里面,websocket的連接是被並發處理的,多個連接同時從一個隊列里面搶數據,結果就是你一個我一個,連續的幀序列被瓜分,每個連接拿到的都是原序列的子序列,播放出來當然就卡成PPT了。
-
mat什么時候釋放?
其實mat可以不釋放,雖然它是個native對象,但由於其實現了JNI,它也在GC的管理范圍內。
-
為什么消息分發的時候非得復制一份?復用同一份對象可以嗎?
消息分發的時候,可以不用復制(事實上連接多的時候,這么做會更好),直接將對象的引用分發給各個channel,由於消息分發之后沒有再對這個mat對象進行修改,因此也不存在線程安全問題,這個對象用完之后會被GC回收掉。
我之前在WebSocketServerHandler中的周期任務里面,將用完的mat釋放掉了,后面matToBufferedImage這個方法瘋狂報錯。冷靜下來思考之后發現,這是個線程安全問題,同一份mat被多個channel引用,先來的線程用完mat跑完任務之后就把mat釋放了,后到的線程用到mat的時候發現數據沒了 (😂),然后給我拋異常:
java.lang.IllegalArgumentException: Width (0) and height (0) must be > 0 at java.awt.image.SampleModel.<init>(SampleModel.java:126) at java.awt.image.ComponentSampleModel.<init>(ComponentSampleModel.java:146) at java.awt.image.PixelInterleavedSampleModel.<init>(PixelInterleavedSampleModel.java:87) at java.awt.image.Raster.createInterleavedRaster(Raster.java:641) at java.awt.image.Raster.createInterleavedRaster(Raster.java:278) at java.awt.image.BufferedImage.<init>(BufferedImage.java:376) at cn.kui.app.opencv.Utils.matToBufferedImage(Utils.java:27) at cn.kui.app.websocket.WebSocketServerHandler.lambda$channelActive$0(WebSocketServerHandler.java:66)
實際的視頻傳輸傳的都是視頻片段,圖像、聲音、字幕等數據封裝成一個segment進行傳輸。下次嘗試一波。
