前言
前段时间由于业务需要,做了一个接入摄像头的视频流然后做人脸识别的项目!!我们选用的是Caffe(卷积神经网络框架)+SSD算法+OpenCV!!开始我们的处理是逻辑是一帧一帧的处理,但是测试的时候发现图像处理的速度很慢,而且准确度也不高!后来我们就换了一套方案,利用多线程来实现,然而效果也并不是很理想!所以我们才想探索一下能不能实现多张图像一起处理!今天就记录一下我在实现过程中的一些问题和一些经验!
首先来简单的说明一下所选的工具!
caffe的全称是Convolutional Architecture for Fast Feature Embedding(译为:快速特征嵌入的卷积体系结构),核心语言是C++。caffe的基本工作流程是设计建立在神经网络的一个简单假设,所有的计算都是层的形式表示的,网络层所做的事情就是输入数据,然后输出计算结果。比如卷积就是输入一幅图像,然后和这一层的参数(filter)做卷积,最终输出卷积结果。每层需要两种函数计算,一种是forward,从输入计算到输出;另一种是backward,从上层给的gradient来计算相对于输入层的gradient。这两个函数实现之后,我们就可以把许多层连接成一个网络,这个网络输入数据(图像,语音或其他原始数据),然后计算需要的输出(比如识别的标签)。在训练的时候,可以根据已有的标签计算loss和gradient,然后用gradient来更新网络中的参数。
目标检测近年来已经取得了很重要的进展,主流的算法主要分为两个类型:(1)two-stage方法,如R-CNN系算法,其主要思路是先通过启发式方法(selective search)或者CNN网络(RPN)产生一系列稀疏的候选框,然后对这些候选框进行分类与回归,two-stage方法的优势是准确度高;(2)one-stage方法,如Yolo和SSD,其主要思路是均匀地在图片的不同位置进行密集抽样,抽样时可以采用不同尺度和长宽比,然后利用CNN提取特征后直接进行分类与回归,整个过程只需要一步,所以其优势是速度快,但是均匀的密集采样的一个重要缺点是训练比较困难,这主要是因为正样本与负样本(背景)极其不均衡(参见Focal Loss),导致模型准确度稍低。不同算法的性能如图1所示,可以看到两类方法在准确度和速度上的差异。
OpenCV是一个基于BSD许可(开源)发行的跨平台计算机视觉和机器学习软件库,可以运行在Linux、Windows、Android和Mac OS操作系统上。 [1] 它轻量级而且高效——由一系列 C 函数和少量 C++ 类构成,同时提供了Python、Ruby、MATLAB等语言的接口,实现了图像处理和计算机视觉方面的很多通用算法
首先来介绍一下blobFromImages函数的参数
image
:这个就是我们将要输入神经网络进行处理或者分类的图片。
mean
:需要将图片整体减去的平均值,如果我们需要对RGB图片的三个通道分别减去不同的值,那么可以使用3组平均值,如果只使用一组,那么就默认对三个通道减去一样的值。减去平均值(mean
):为了消除同一场景下不同光照的图片,对我们最终的分类或者神经网络的影响,我们常常对图片的R、G、B通道的像素求一个平均值,然后将每个像素值减去我们的平均值,这样就可以得到像素之间的相对值,就可以排除光照的影响。
scalefactor
:当我们将图片减去平均值之后,还可以对剩下的像素值进行一定的尺度缩放,它的默认值是1,如果希望减去平均像素之后的值,全部缩小一半,那么可以将scalefactor
设为1/2。
size
:这个参数是我们神经网络在训练的时候要求输入的图片尺寸。
swapRB
:OpenCV中认为我们的图片通道顺序是BGR,但是我平均值假设的顺序是RGB,所以如果需要交换R和G,那么就要使swapRB=true
实现代码
from imutils.video import VideoStream from imutils.video import FPS import numpy as np import argparse import imutils import time import cv2 import threading import queue from flask import Flask, render_template, Response
#获取输入参数 ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to Caffe video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") ap.add_argument("-t", "--threading", type=int, default=1, help="threading number") args = vars(ap.parse_args())
#SSD算法能够识别的物体列表 CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] #为识别出来的物体画框着色 COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3))
print("[INFO] starting video stream...") # 通过opencv获取实时视频流 class VideoCamera(object): def __init__(self):
#从视频流抽帧放在列表中 self.Frame = []
#处理图片的列表 self.Images = []
#每次处理图像数 self.Nu = 10 # 控制图片的结果赛选 self.NuDNN =50 self.done_Images = queue.Queue(maxsize=500) self.FrameQueue = queue.Queue(maxsize=self.Nu) self.ImageQueue = queue.Queue(maxsize=500) self.dataQueue = queue.Queue(maxsize=500) self.status = False self.isstop = False self.video = cv2.VideoCapture(args["video"]) def __del__(self): self.video.release() def start(self): # 把程序放进子线程,daemon=True 表示该线程会随着主线程关闭而关闭。 print('ipcam started!')
#视频流的图像线程 threading.Thread(target=self.queryframe, daemon=True, args=()).start() print('dnn_frame started')
#图像处理线程 threading.Thread(target=self.dnn_frame, daemon=True, args=()).start() print('get_frame started')
#图像返回视频流线程 threading.Thread(target=self.get_frame, daemon=True, args=()).start() def stop(self): # 记得要设计停止无限循环的开关。 self.isstop = True print('ipcam stopped!') #从视频流读取帧的线程 def queryframe(self): while (not self.isstop):
#读取视频流的帧并放在列表中 self.status, self.Frame = self.video.read() self.Frame = imutils.resize(self.Frame, width=1080) if self.status: self.video.grab() if self.FrameQueue.full(): self.FrameQueue.get() self.FrameQueue.put(self.Frame) else: self.FrameQueue.put(self.Frame) self.video.release() #图像处理线程 def dnn_frame(self): time.sleep(1) print("[INFO] loading model...") net = cv2.dnn.Net_readFromModelOptimizer(args["prototxt"], args["model"]) # 设置运算设备 测试选用的CPU net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) while (not self.isstop): start_time = time.time() Images=[] while (len(Images)) < self.Nu: image2 = self.FrameQueue.get() Images.append(image2)
#将图像列表封装成一个blob
blob = cv2.dnn.blobFromImages(Images, 0.007843, (384,672), 127.5,True)
#将blob送进模型进行识别 net.setInput(blob)
#拿到识别后的结果 detections = net.forward()
#这里新建一个列表是为了保证 识别结果和待识别图像进行--对应 data=[] data.append(Images) data.append(detections) if self.dataQueue.full(): self.dataQueue.get() self.dataQueue.put(data) else: self.dataQueue.put(data) #将识别结果在原图像上画框的线程 def get_frame(self): time.sleep(4) n=0 while (not self.isstop): dataList=self.dataQueue.get() detections = dataList[1]#结果 image_list = dataList[0]#图像 # 因为opencv读取的图片并非jpeg格式,因此要用motion JPEG模式需要先将图片转码成jpg格式图片 (h, w) = image_list[0].shape[:2]
#所有预测结果种取 self.NuDNN*self.Nu种 for i in range(0, self.NuDNN*self.Nu):
#拿到预测结果的置信度 confidence = detections[0, 0, i, 2]
#只有大于我输入的置信度的结果才保留 if confidence > args["confidence"]:
#预测结果的下标 idx = int(detections[0, 0, i, 1])
#原图片的id imageIndex = int(detections[0, 0, i, 0])
#在原图上画框 box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") label = "{}: {:.2f}%".format(CLASSES[idx], confidence * 100) cv2.rectangle(image_list[imageIndex], (startX, startY), (endX, endY), COLORS[idx], 2) y = startY - 15 if startY - 15 > 15 else startY + 15 cv2.putText(image_list[imageIndex], label, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLORS[idx], 2) for x in range(0,self.Nu): ret, jpeg = cv2.imencode('.jpg', image_list[x]) if self.done_Images.full(): self.done_Images.get() self.done_Images.put(jpeg.tobytes()) else: self.done_Images.put(jpeg.tobytes()) app = Flask(__name__) @app.route('/') # 主页 def index(): # jinja2模板,具体格式保存在index.html文件中 return render_template('index.html') def gen(camera): while True: if not camera.done_Images.empty(): frame = camera.done_Images.get() time.sleep(0.2) # 使用generator函数输出视频流, 每次请求输出的content类型是image/jpeg yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') @app.route('/video_feed') # 这个地址返回视频流响应 def video_feed(): global ipcam return Response(gen(ipcam), mimetype='multipart/x-mixed-replace; boundary=frame') if __name__ == '__main__': # 连接摄影机 ipcam = VideoCamera() # 启动子线程 print("init") ipcam.start() time.sleep(4) app.run(host='0.0.0.0', debug=False, port=5000)
这里需要特别说明一下图像识别后的结果detections,网上对blobFromImage函数讲解的很详细,但是对于输出的结果几乎没有介绍!小编也是摸索了好久才把结果和图片一一对应起来!
detections其实是一个四维数组,形式为(0, 0, n, s),如果直接循环取出里面的图片进行计算识别,是不行的!这样只会降低形式为detections的维度,先解释一下单张图片与多张图片的blob的区别:
单张图片blob:(1,3,224,224)
多张图片blob:(10,3,224,224)
结果元组的格式如下(num_images, num_channels, width, height)
然后一张图片默认情况下会预测100个结果,那么我们一次处理10张图片就有1000个结果,并且会根据置信度(预测结果的可能性的大小)排序!
n表示第几种结果(最大为100*图片数),s表示这个结果的多个属性值,最大为6
0 表示image_id 图片的id
1 表示label 预测结果的classes下标值
2 表示conf 置信度即预测结果的可能性
3-6表示这个图片的的最大最小长宽
总结
至此,python的多线程进行人脸识别的功能就开发完成!!!但是,这仅仅是实现了功能而已!效果并不一定是最优的!后续可能还需要调整视频显示的帧数,一次处理多少张图片比较合理,预测结果取多少种合适。。。。。个人认为最重要的是选取合适的模型和算法!我这里选取的SSD和Caffe并不一定是最优的!!!!
原创文章,转载请标注出处!!!!!!