论文:《Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Networks》
论文网址:https://arxiv.org/abs/1604.02878v1
一、总体框架
MTCNN通过不同的卷积神经网络,实现对人脸的识别以及人脸关键点检测。总的框架如下:
图1 Pipeline
如图1所示为MTCNN的整体框架(检测实现流程—测试流程)。
给定一张图片,需要将其resize成不同大小的图片,建立图像金字塔。这些不同size的图片是下面三个stage的输入。
stage1:首先使用全卷积网络(P-Net)获取候选框和他们的回归向量。然后使用估计的bounding box回归向量去标定候选框。然后再使用非极大值抑制(NMS)去合并高度重叠的候选框;
stage2:这一层使用一个提炼网络(Refine Network, R-Net)。所有stage1中的候选框传入R-Net,使用边界框回归(bounding box regression)以及NMS,使得消除更多假的候选框(false candidates);
stage3:stage3使用一个输出网络(O-Net),该阶段与stage2相似。但是在这个阶段,我们的目标是更加详细地描述人脸。尤其是该网络将要输出5个人脸标记位置(facial landmarks' positions)。
二、CNN结构:
许多的论文都设计CNN用于人脸检测。但是,这些论文都受到以下几个原因的限制:
1)许多filter缺乏权重多样性限制了他们产生有判别力的描述。
2)相比于其他的多分类物体检测和分类任务,人脸检测是一个具有挑战性的二分类任务,因此可能需要更少数量的filters但是需要对人脸更具有辨别力。为了这个目的,我们减少filter的数量,将5×5的filter变成3×3的filter,减少计算量,尽管增加filter的深度能够获取到更好的性能。通过这些改进,我们可以得到更好的性能,但是运行时间变少。CNN的结构如图2所示。
图2 CNN的结构(MP-max polling,Conv—convolution, 卷积和池化的step分别为1和2)
三、训练
使用三个tasks训练CNN detector,分别为:人脸/非人脸分类,边界框回归以及人脸标记定位。
1)人脸分类
学习目标可以表述为二分类任务。对于每个样本$x_{i}$,我们使用交叉熵损失函数(cross-entropy loss):
$L_{i}^{det}=-(y_{i}^{det}log(p_{i})+(1-y_{i}^{det})(1-log(p_{i})))$ (1)
其中,$p_{i}$是神经网络输出的概率,表示了一个样本是人脸的概率。$y_{i}^{det}\in \left \{0, 1\right \}$,表示ground truth的标签。
2)边界框回归
对于每一个候选框,我们需要预测它与最近的ground truth的偏移,包括:左上坐标、高度和宽度。学习目标可以表述为回归问题,对于每一个样本$x_{i}$,我们使用欧式损失(Euclidean loss):
$L_{i}^{box}=\left \| \hat{y}_{i}^{box} - y_{i}^{box}\right \|_{2}^{2}$ (2)
其中,$\hat{y}_{i}^{box}$回归目标是从神经网络获得的(即网络的输出),$y_{i}^{box}$是ground truth。有4个坐标,包括:左上、高度和宽度,因此$y_{i}^{box}\in \mathbb{R}^{4}$。
3)人脸标记定位
和边界框回归任务相似,人脸标记定位可以表示为回归任务问题,使用最小化欧式损失:
$L_{i}^{landmark}=\left \| \hat{y}_{i}^{landmark} - y_{i}^{landmark} \right \|_{2}^{2}$ (3)
其中,$\hat{y}_{i}^{landmark}$是从神经网络输出获得的人脸标记坐标,$ y_{i}^{landmark}$是ground truth。因为有5个人脸标记,包括:左眼睛、右眼睛、鼻子、嘴巴左边界和嘴巴右边界,因此$y_{i}^{landmark}\in \mathbb{R}^{10}$。
4)多数据源训练
因为我们在不同的CNN中执行不同的任务,所以在训练过程中,使用不同类型的训练图像数据,例如:人脸、非人脸和部分人脸数据。所以,一些损失函数(1-3公式)不会使用。例如,对于背景区域,我们仅仅计算$L_{i}^{det}$,另外两个损失设置为0,这个可以使用采样类型指示器实现。总的学习目标可以表示为:
$min\sum _{i=1}^{N}\sum _{j\in det,box,landmark}\alpha _{j}\beta _{i}^{j}L_{i}^{j}$ (4)
其中,$N$为训练样本的数量。$\alpha_{j}$表示人物的重要性(分别设置为:P-Net和R-Net中,$\alpha_{det}=1,\alpha_{box}=0.5,\alpha_{landmark}=0.5$;O-Net为了获得更加精确的人脸标记点定位,在O-Net中,$\alpha_{det}=1,\alpha_{box}=0.5,\alpha_{landmark}=1$)。$\beta _{i}^{j}\in \left \{ 0,1 \right \}$为采样类型指示器。使用随机梯度下降算法(SGD)训练CNNs。
5)在线困难样本挖掘
不同于在原始分类训练完成之后执行传统的困难样本挖掘,我们采用在线困难样挖掘适应训练过程。
特别地,我们对前向传播过程计算出的损失进行分类,然后只采用其中的70%作为困难样本。然后我们在后向传播过程中,只计算困难样本的梯度。这也就意味着我们忽略简单样本,这些简单样本对增强训练过程的探测功能不太有帮助。
6)训练数据
因为我们联合执行人脸检测和人脸对齐,因此我们在训练过程中使用四种不同的数据标记。分别为:
6.1负样本:与图片中任何一个ground truth的IOU小于0.3的区域;
6.2正样本:与图片中任何一个ground truth的IOU大于0.65的区域;
6.3部分人脸:IOU介于0.4和0.65之间;
6.4标记人脸:标记5个人脸标记位置的图片;
其中,负样本和正样本用于人脸分类任务(即判别是人脸还是非人脸);正样本和部分人脸用于边界框回归;人脸编辑样本用于人脸标记定位。每一个网络的训练数据可以如下表示:
①P-Net:从WIDER FACE数据集中随机裁剪获取正样本、负样本和部分人脸样本。然后,从CelebA数据裁剪人脸标记数据,需要resize成12×12;
②R-Net:将框架第一阶段的输出的proposal作为R-Net的输入,需要resize成24×24;
③O-Net :输入是经过第二步筛选和refine过的人脸框,同样从原图抠出后统一resize到48*48,成批输入ONet。
后面阶段都是在前面阶段的基础上对训练结果进行调整。
四、测试阶段
如第一节的总体架构,首先使图像生成图像金字塔,生成多尺度的图像,然后输入P-Net(因为P-Net是全卷积网络,该网络的输出的featuremap上的每一个特征点都对应于输入图像上的12×12的区域,因此)。PNet由于尺寸很小,所以可以很快的选出候选区域,但是准确率不高,不同尺度上的判断出来的人脸检测框,然后采用NMS算法,合并候选框,然后根据候选框提取图像,之后缩放到24*24的大小,作为RNet的输入,RNet可以精确的选取边框,一般最后只剩几个边框,最后缩放到48*48的大小,输入ONet,判断后选框是不是人脸,ONet虽然速度较慢,但是由于经过前两个网络,已经得到了高概率的边框,所以输入ONet的图像较少,然后ONet输出精确的边框和关键点信息,只是在第三个阶段上才显示人脸特征定位;前两个阶段只是分类,不显示人脸定点的结果。
参考:https://blog.csdn.net/wfei101/article/details/79935037
五、项目实践
参考项目地址:GitHub
根据参考项目做一些调整,模型实现。
数据集下载:
这里使用的数据集是WIDER FACE以及CelebA。
代码讲解如下:
参考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html
主要代码理解如下:
生成P-Net数据:
gen_12net_data.py

# coding: utf-8 """ 截取pos,neg,part三种类型图片并resize成12x12大小作为PNet的输入 """ import os import cv2 import numpy as np npr = np.random from tqdm import tqdm from utils import IOU # face的id对应label的txt anno_file = '../data/wider_face_train.txt' # 图片地址 im_dir = '../data/WIDER_train/images' # pos,part,neg裁剪图片放置位置 pos_save_dir = '../data/12/positive' part_save_dir = '../data/12/part' neg_save_dir = '../data/12/negative' # PNet数据地址 save_dir = '../data/12' if not os.path.exists(save_dir): os.mkdir(save_dir) if not os.path.exists(pos_save_dir): os.mkdir(pos_save_dir) if not os.path.exists(part_save_dir): os.mkdir(part_save_dir) if not os.path.exists(neg_save_dir): os.mkdir(neg_save_dir) f1 = open(os.path.join(save_dir, 'pos_12.txt'), 'w') f2 = open(os.path.join(save_dir, 'neg_12.txt'), 'w') f3 = open(os.path.join(save_dir, 'part_12.txt'), 'w') with open(anno_file, 'r') as f: annotations = f.readlines() num = len(annotations) print('总共的图片数: %d' % num) # 记录pos, neg, part三类生成数 p_idx = 0 n_idx = 0 d_idx = 0 # 记录读取图片数 idx = 0 for annotation in tqdm(annotations): # 进度条显示 annotation = annotation.strip().split(' ') im_path = annotation[0] box = list(map(float, annotation[1:])) boxes = np.array(box, dtype=np.float32).reshape(-1, 4) # numpy.array.reshape -> 4列, 每一行是box img = cv2.imread(os.path.join(im_dir, im_path+'.jpg')) idx += 1 height, width, channel = img.shape neg_num = 0 # 先采样一定数量neg图片 while neg_num < 50: # 随机选取截取图像大小 size = npr.randint(12, min(width, height)/2) # 随机选取左上坐标 nx = npr.randint(0, width-size) ny = npr.randint(0, height-size) # 截取box crop_box = np.array([nx, ny, nx+size, ny+size]) # 计算iou值 Iou = IOU(crop_box, boxes) # 截取图片并resize成12x12大小 cropped_im = img[ny:ny+size, nx:nx+size, :] # cv2.imread读取的图片第一维度是y resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR) # P-Net的训练输入图像大小为12 × 12 # iou值小于0.3判定为neg图像 if np.max(Iou) < 0.3: save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx) # neg的图片的绝对路径 f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0\n') # neg_12.txt文件保存neg的图片的绝对路径 cv2.imwrite(save_file, resized_im) # 将截取的图片保存 n_idx += 1 neg_num += 1 for box in boxes: # 以每个box为基础选取截图 # 左上右下坐标 x1, y1, x2, y2 = box w = x2 - x1 + 1 h = y2 - y1 + 1 # 舍去图像过小和box在图片外的图像 if max(w, h) < 20 or x1 < 0 or y1 < 0: continue for i in range(5): # 每个box附近截取5个截图用于判断是否为negative训练样本 size = npr.randint(12, min(width, height)/2) # 随机生成的关于x1, y1的偏移量,并且保证x1+delta_x>0,y1+delta_y>0 delta_x = npr.randint(max(-size, -x1), w) delta_y = npr.randint(max(-size, -y1), h) # 截取后的左上角坐标 # 这里面是获取negative的截图, 所以可以(最好是)随意选取, 因此左上角坐标和偏移量都是随意选取的. nx1 = int(max(0, x1+delta_x)) ny1 = int(max(0, y1+delta_y)) # 排除大于图片尺度的 if nx1 + size > width or ny1 + size > height: continue crop_box = np.array([nx1, ny1, nx1+size, ny1+size]) Iou = IOU(crop_box, boxes) cropped_im = img[ny1:ny1+size, nx1:nx1+size, :] resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR) if np.max(Iou) < 0.3: save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx) f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0\n') cv2.imwrite(save_file, resized_im) n_idx += 1 for i in range(20): # 每个box附近截取20个截图用于判断是否为positive或者是part训练样本 # 这里是截取positive和part图片, 目的是需要截取box附近的图片, 因此下面size的大小也需要接近w, h. 不然取不到positive、part的几率大. size = npr.randint(int(min(w, h)*0.8), np.ceil(1.25*max(w, h))) # 除去尺度小的box # 注意:w, h是box的尺寸. width、height是整个训练图片的尺寸. if w < 5: continue # 在box附近截取图片, 偏移量取值, 稍微小一点好. delta_x = npr.randint(-w*0.2, w*0.2) delta_y = npr.randint(-h*0.2, h*0.2) # 截取图像左上坐标计算是先计算x1+w/2表示的中心坐标,再+delta_x偏移量,再-size/2, nx1 = int(max(x1+w/2+delta_x-size/2, 0)) ny1 = int(max(y1+h/2+delta_y-size/2, 0)) nx2 = nx1 + size ny2 = ny1 + size # 排除超出的图像 if nx2 > width or ny2 > height: continue crop_box = np.array([nx1, ny1, nx2, ny2]) # 人脸框相对于截取图片的偏移量并做归一化处理 # 这里训练数据使用相对于人脸框归一化处理的offset, 实际测试时得到的也是归一化的offset. 因此训练就是获取归一化的offset. offset_x1 = (x1-nx1)/float(size) offset_y1 = (y1-ny1)/float(size) offset_x2 = (x2-nx2)/float(size) offset_y2 = (y2-ny2)/float(size) cropped_im = img[ny1:ny2, nx1:nx2, :] resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR) # box扩充一个维度作为iou输入 box_ = box.reshape(1, -1) # 这里是每一个box, 对每一个box和截取的图像进行IOU计算 iou = IOU(crop_box, box_) if iou >= 0.65: save_file = os.path.join(pos_save_dir, '%s.jpg'%p_idx) f1.write(pos_save_dir+'/%s.jpg'%p_idx+' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) p_idx += 1 elif iou >= 0.4: save_file = os.path.join(part_save_dir, '%s.jpg'%d_idx) f3.write(part_save_dir+'/%s.jpg'%d_idx+' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) d_idx += 1 print('%s 个图片已处理,pos:%s part: %s neg:%s' %(idx, p_idx, d_idx, n_idx)) f1.close() f2.close() f3.close()
生成landmark数据 :
gen_landmark_aug.py

# coding: utf-8 import os import random import sys import cv2 import numpy as np npr = np.random import argparse from tqdm import tqdm from utils import IOU from BBox_utils import getDataFromTxt, BBox data_dir = '../data' def main(args): """ 用于处理带有landmark的数据 """ size = args.input_size # 是否对图像变换 argument = True if size == 12: net = 'PNet' elif size == 24: net = 'RNet' elif size == 48: net = 'ONet' image_id = 0 # 数据输出路径 OUTPUT = os.path.join(data_dir, str(size)) if not os.path.exists(OUTPUT): os.mkdir(OUTPUT) # 图片处理后输出路径 dstdir = os.path.join(OUTPUT, 'train_%s_landmark_aug' %(net)) if not os.path.exists(dstdir): os.mkdir(dstdir) # label记录txt ftxt = os.path.join(data_dir, 'trainImageList.txt') # trainImageList.txt记录了CelebA数据的路径以及关键点信息. # 记录label的txt f = open(os.path.join(OUTPUT, 'landmark_%d_aug.txt' %(size)), 'w') # 获取图像路径,box,关键点 data = getDataFromTxt(ftxt, data_dir) idx = 0 for (imgPath, box, landmarkGt) in tqdm(data): # 存储人脸图片和关键点 F_imgs = [] F_landmarks = [] img = cv2.imread(imgPath) img_h, img_w, img_c = img.shape gt_box = np.array([box.left, box.top, box.right, box.bottom]) # 人脸图片 f_face = img[box.top:box.bottom+1, box.left:box.right+1] # resize成网络输入大小 f_face = cv2.resize(f_face, (size, size)) landmark = np.zeros((5, 2)) for index, one in enumerate(landmarkGt): # 关键点相对于左上坐标偏移量并归一化 rv = ((one[0]-gt_box[0])/(gt_box[2]-gt_box[0]), (one[1]-gt_box[1])/(gt_box[3]-gt_box[1])) landmark[index] = rv F_imgs.append(f_face) F_landmarks.append(landmark.reshape(10)) landmark = np.zeros((5, 2)) if argument: # 对图像变换 idx = idx+1 x1, y1, x2, y2 = gt_box gt_w = x2 - x1 + 1 gt_h = y2 - y1 + 1 # 除去过小的人脸图像 if max(gt_w, gt_h) < 40 or x1 < 0 or y1 < 0: continue for i in range(10): # 随机裁剪图像大小 # 每张图片截取10个, x下面计算方法类似于在positive和part的截图过程. box_size = npr.randint(int(min(gt_w, gt_h)*0.8), np.ceil(1.25*max(gt_w, gt_h))) # 随机左上坐标偏移量 delta_x = npr.randint(-gt_w*0.2, gt_w*0.2) delta_y = npr.randint(-gt_h*0.2, gt_h*0.2) # 计算左上坐标 nx1 = int(max(x1+gt_w/2-box_size/2+delta_x, 0)) ny1 = int(max(y1+gt_h/2-box_size/2+delta_y, 0)) nx2 = nx1 + box_size ny2 = ny1 + box_size # 除去超过边界的 if nx2 > img_w or ny2 > img_h: continue # 裁剪边框, 图片 crop_box = np.array([nx1, ny1, nx2, ny2]) cropped_im = img[ny1:ny2+1, nx1:nx2+1, :] resized_im = cv2.resize(cropped_im, (size, size)) iou = IOU(crop_box, np.expand_dims(gt_box, 0)) # 扩展数组形状. -> 1 * 1 * 4 # 只保留pos图像 if iou > 0.65: F_imgs.append(resized_im) # 关键点相对偏移 for index, one in enumerate(landmarkGt): rv = ((one[0]-nx1)/box_size, (one[1]-ny1)/box_size) landmark[index] = rv F_landmarks.append(landmark.reshape(10)) landmark = np.zeros((5, 2)) landmark_ = F_landmarks[-1].reshape(-1, 2) box = BBox([nx1, ny1, nx2, ny2]) # 镜像 if random.choice([0, 1]) > 0: face_flipped, landmark_flipped = flip(resized_im, landmark_) face_flipped = cv2.resize(face_flipped, (size, size)) F_imgs.append(face_flipped) F_landmarks.append(landmark_flipped.reshape(10)) # 逆时针翻转 if random.choice([0, 1]) > 0: face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), 5) # 关键点偏移 landmark_rorated = box.projectLandmark(landmark_rorated) face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size)) F_imgs.append(face_rotated_by_alpha) F_landmarks.append(landmark_rorated.reshape(10)) # 左右翻转 face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated) face_flipped = cv2.resize(face_flipped, (size, size)) F_imgs.append(face_flipped) F_landmarks.append(landmark_flipped.reshape(10)) # 顺时针翻转 if random.choice([0, 1]) > 0: face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), -5) # 关键点偏移 landmark_rorated = box.projectLandmark(landmark_rorated) face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size)) F_imgs.append(face_rotated_by_alpha) F_landmarks.append(landmark_rorated.reshape(10)) # 左右翻转 face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated) face_flipped = cv2.resize(face_flipped, (size, size)) F_imgs.append(face_flipped) F_landmarks.append(landmark_flipped.reshape(10)) F_imgs, F_landmarks = np.asarray(F_imgs), np.asarray(F_landmarks) for i in range(len(F_imgs)): # 剔除数据偏移量在[0,1]之间 if np.sum(np.where(F_landmarks[i] <= 0, 1, 0)) > 0: continue if np.sum(np.where(F_landmarks[i] >= 1, 1, 0)) > 0: continue cv2.imwrite(os.path.join(dstdir, '%d.jpg' %(image_id)), F_imgs[i]) landmarks = list(map(str, list(F_landmarks[i]))) f.write(os.path.join(dstdir, '%d.jpg' %(image_id))+' -2 '+' '.join(landmarks)+'\n') image_id += 1 f.close() return F_imgs, F_landmarks def flip(face, landmark): # 镜像 face_flipped_by_x = cv2.flip(face, 1) landmark_ = np.asarray([(1-x, y) for (x, y) in landmark]) landmark_[[0, 1]] = landmark_[[1, 0]] landmark_[[3, 4]] = landmark_[[4, 3]] return (face_flipped_by_x, landmark_) def rotate(img, box, landmark, alpha): # 旋转 center = ((box.left+box.right)/2, (box.top+box.bottom)/2) rot_mat = cv2.getRotationMatrix2D(center, alpha, 1) img_rotated_by_alpha = cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0])) landmark_ = np.asarray([(rot_mat[0][0]*x+rot_mat[0][1]*y+rot_mat[0][2], rot_mat[1][0]*x+rot_mat[1][1]*y+rot_mat[1][2]) for (x, y) in landmark]) face = img_rotated_by_alpha[box.top:box.bottom+1, box.left:box.right+1] return (face, landmark_) def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
合并生成P-Net训练使用的数据:
gen_imglist_pnet.py

# coding: utf-8 import numpy as np npr = np.random import os data_dir = '../data/' """ 将pos, part, neg, landmark四者混在一起 """ size = 12 with open(os.path.join(data_dir, '12/pos_12.txt'), 'r') as f: pos = f.readlines() with open(os.path.join(data_dir, '12/neg_12.txt'), 'r') as f: neg = f.readlines() with open(os.path.join(data_dir, '12/part_12.txt'), 'r') as f: part = f.readlines() with open(os.path.join(data_dir, '12/landmark_12_aug.txt'), 'r') as f: landmark = f.readlines() dir_path = os.path.join(data_dir, '12') if not os.path.exists(dir_path): os.makedirs(dir_path) with open(os.path.join(dir_path, 'train_pnet_landmark.txt'), 'w') as f: nums = [len(neg), len(pos), len(part)] base_num = 250000 print('neg数量:{} pos数量:{} part数量:{} 基数:{}'.format(len(neg), len(pos), len(part), base_num)) if len(neg) > base_num*3: neg_keep = npr.choice(len(neg), size=base_num*3, replace=True) else: neg_keep = npr.choice(len(neg), size=len(neg), replace=True) sum_p = len(neg_keep)//3 # pos : part : neg = 1 : 1 : 3 pos_keep = npr.choice(len(pos), sum_p, replace=True) part_keep = npr.choice(len(part), sum_p, replace=True) print('neg数量:{} pos数量:{} part数量:{}'.format(len(neg_keep), len(pos_keep), len(part_keep))) for i in pos_keep: f.write(pos[i]) for i in neg_keep: f.write(neg[i]) for i in part_keep: f.write(part[i]) for item in landmark: f.write(item)
将训练数据转换成TFRecords个数文件:
gen_tfrecords.py

# coding: utf-8 import os import random import sys import time import tensorflow as tf import cv2 from tqdm import tqdm import argparse def main(args): """ 生成tfrecords文件 """ size = args.input_size # 数据存放地址 dataset_dir = '../data/' # tfrecord存放地址 output_dir = os.path.join(dataset_dir, str(size)+'/tfrecord') if not os.path.exists(output_dir): os.mkdir(output_dir) # pnet只生成一个混合的tfrecords, rnet和onet要分别生成4个 if size == 12: net = 'PNet' tf_filenames = [os.path.join(output_dir, 'train_%s_landmark.tfrecord' % net)] items = ['12/train_pnet_landmark.txt'] elif size == 24: net = 'RNet' tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord') item1 = '%d/pos_%d.txt' % (size, size) tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord') item2 = '%d/part_%d.txt' % (size, size) tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord') item3 = '%d/neg_%d.txt' % (size, size) tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord') item4 = '%d/landmark_%d_aug.txt' % (size, size) tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4] items = [item1, item2, item3, item4] elif size == 48: net = 'ONet' tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord') item1 = '%d/pos_%d.txt' % (size, size) tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord') item2 = '%d/part_%d.txt' % (size, size) tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord') item3 = '%d/neg_%d.txt' % (size, size) tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord') item4 = '%d/landmark_%d_aug.txt' % (size, size) tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4] items = [item1, item2, item3, item4] if tf.gfile.Exists(tf_filenames[0]): print('tfrecords文件早已生成,无需此操作') return # 获取数据 for tf_filename, item in zip(tf_filenames, items): print('开始读取数据') dataset = get_dataset(dataset_dir, item) tf_filename = tf_filename+'_shuffle' random.shuffle(dataset) # 数据进行打乱 print('开始转换tfrecords') with tf.python_io.TFRecordWriter(tf_filename) as tfrecord_writer: for image_example in tqdm(dataset): filename = image_example['filename'] try: _add_to_tfrecord(filename, image_example, tfrecord_writer) except: print(filename) print('完成转换') def get_dataset(dir, item): """ 从txt获取数据 参数: dir:存放数据目录 item:txt目录 返回值: 包含label,box,关键点的data """ dataset_dir = os.path.join(dir, item) imagelist = open(dataset_dir, 'r') dataset = [] for line in tqdm(imagelist.readlines()): # 进度条显示 info = line.strip().split(' ') data_example = dict() bbox = dict() data_example['filename'] = info[0] data_example['label'] = int(info[1]) # neg的box默认为0,part,pos的box只包含人脸框,landmark的box只包含关键点 bbox['xmin'] = 0 bbox['ymin'] = 0 bbox['xmax'] = 0 bbox['ymax'] = 0 bbox['xlefteye'] = 0 bbox['ylefteye'] = 0 bbox['xrighteye'] = 0 bbox['yrighteye'] = 0 bbox['xnose'] = 0 bbox['ynose'] = 0 bbox['xleftmouth'] = 0 bbox['yleftmouth'] = 0 bbox['xrightmouth'] = 0 bbox['yrightmouth'] = 0 if len(info) == 6: # 长度为6, 说明只有人脸框标记(6-2) bbox['xmin'] = float(info[2]) bbox['ymin'] = float(info[3]) bbox['xmax'] = float(info[4]) bbox['ymax'] = float(info[5]) if len(info) == 12: # 长度为12, 说明是人脸关键点关键点(12-2) bbox['xlefteye'] = float(info[2]) bbox['ylefteye'] = float(info[3]) bbox['xrighteye'] = float(info[4]) bbox['yrighteye'] = float(info[5]) bbox['xnose'] = float(info[6]) bbox['ynose'] = float(info[7]) bbox['xleftmouth'] = float(info[8]) bbox['yleftmouth'] = float(info[9]) bbox['xrightmouth'] = float(info[10]) bbox['yrightmouth'] = float(info[11]) data_example['bbox'] = bbox dataset.append(data_example) return dataset def _add_to_tfrecord(filename, image_example, tfrecord_writer): """ 转换成tfrecord文件 参数: filename:图片文件名 image_example:数据 tfrecord_writer:写入文件 """ image_data, height, width = _process_image_withoutcoder(filename) example = _convert_to_example_simple(image_example, image_data) tfrecord_writer.write(example.SerializeToString()) def _process_image_withoutcoder(filename): """ 读取图片文件,返回图片大小 """ image = cv2.imread(filename) image_data = image.tostring() assert len(image.shape) == 3 height = image.shape[0] width = image.shape[1] assert image.shape[2] == 3 return image_data, height, width # 不同类型数据的转换 def _int64_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) def _float_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(float_list=tf.train.FloatList(value=value)) def _bytes_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(bytes_list=tf.train.BytesList(value=value)) def _convert_to_example_simple(image_example, image_buffer): """ 转换成tfrecord接受形式 """ class_label = image_example['label'] bbox = image_example['bbox'] roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']] landmark = [bbox['xlefteye'], bbox['ylefteye'], bbox['xrighteye'], bbox['yrighteye'], bbox['xnose'], bbox['ynose'], bbox['xleftmouth'], bbox['yleftmouth'], bbox['xrightmouth'], bbox['yrightmouth']] example = tf.train.Example(features=tf.train.Features(feature={ 'image/encoded': _bytes_feature(image_buffer), 'image/label': _int64_feature(class_label), 'image/roi': _float_feature(roi), 'image/landmark': _float_feature(landmark) })) return example def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
训练:
train_model.py

# coding: utf-8 import os import sys from datetime import datetime import numpy as np import tensorflow as tf import config as FLAGS import random import cv2 def train(net_factory, prefix, end_epoch, base_dir, display, base_lr): """ 训练模型 """ size = int(base_dir.split('/')[-1]) # 获取得到网络大小(因为base_dir保存的路径为:../data/12, ../data/24, ../data/48) # 论文中的alpha, 代表了任务的重要性. 和论文中保持一致. if size == 12: net = 'PNet' radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5; elif size == 24: net = 'RNet' radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5; elif size == 48: net = 'ONet' radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 1; if net == 'PNet': # 计算一共多少组数据 label_file = os.path.join(base_dir, 'train_pnet_landmark.txt') f = open(label_file, 'r') num = len(f.readlines()) dataset_dir = os.path.join(base_dir, 'tfrecord/train_PNet_landmark.tfrecord_shuffle') # 从tfrecord读取数据 image_batch, label_batch, bbox_batch, landmark_batch = read_single_tfrecord(dataset_dir, FLAGS.batch_size, net) else: # 计算一共多少组数据 label_file1 = os.path.join(base_dir, 'pos_%d.txt' % size) f1 = open(label_file1, 'r') label_file2 = os.path.join(base_dir, 'part_%d.txt' % size) f2 = open(label_file2, 'r') label_file3 = os.path.join(base_dir, 'neg_%d.txt' % size) f3 = open(label_file3, 'r') label_file4 = os.path.join(base_dir, 'landmark_%d_aug.txt' % size) f4 = open(label_file4, 'r') num = len(f1.readlines())+len(f2.readlines())+len(f3.readlines())+len(f4.readlines()) pos_dir = os.path.join(base_dir, 'tfrecord/pos_landmark.tfrecord_shuffle') part_dir = os.path.join(base_dir, 'tfrecord/part_landmark.tfrecord_shuffle') neg_dir = os.path.join(base_dir, 'tfrecord/neg_landmark.tfrecord_shuffle') landmark_dir = os.path.join(base_dir, 'tfrecord/landmark_landmark.tfrecord_shuffle') dataset_dirs = [pos_dir, part_dir, neg_dir, landmark_dir] # 各数据占比 # 目的是使每一个batch的数据占比都相同 # 训练数据的比例, pos : part : landmark, neg = 1 : 1 : 1 : 3. pos_radio, part_radio, landmark_radio, neg_radio = 1.0/6, 1.0/6, 1.0/6, 3.0/6 pos_batch_size = int(np.ceil(FLAGS.batch_size*pos_radio)) assert pos_batch_size != 0, "Batch Size 有误 " part_batch_size = int(np.ceil(FLAGS.batch_size*part_radio)) assert part_batch_size != 0, "BBatch Size 有误 " neg_batch_size = int(np.ceil(FLAGS.batch_size*neg_radio)) assert neg_batch_size != 0, "Batch Size 有误 " landmark_batch_size = int(np.ceil(FLAGS.batch_size*landmark_radio)) assert landmark_batch_size != 0, "Batch Size 有误 " batch_sizes = [pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size] image_batch, label_batch, bbox_batch, landmark_batch = read_multi_tfrecords(dataset_dirs, batch_sizes, net) # 定义占位符, 训练时使用, 后续将读取的tfrecords数据传入. input_image = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, size, size, 3], name='input_image') label = tf.placeholder(tf.float32, shape=[FLAGS.batch_size], name='label') bbox_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 4], name='bbox_target') landmark_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 10], name='landmark_target') # 图像色相变换 input_image = image_color_distort(input_image) cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, accuracy_op = net_factory(input_image, label, bbox_target, landmark_target, training=True) # 计算训练损失, 论文中公式实现. total_loss_op = radio_cls_loss*cls_loss_op+radio_bbox_loss*bbox_loss_op+radio_landmark_loss*landmark_loss_op+L2_loss_op train_op, lr_op = optimize(base_lr, total_loss_op, num) # 将变量添加到tensorboard, 实现可视化. tf.summary.scalar("cls_loss", cls_loss_op) # cls_loss tf.summary.scalar("bbox_loss", bbox_loss_op) # bbox_loss tf.summary.scalar("landmark_loss", landmark_loss_op) # landmark_loss tf.summary.scalar("cls_accuracy", accuracy_op) # cls_acc tf.summary.scalar("total_loss", total_loss_op) # cls_loss, bbox loss, landmark loss and L2 loss add together summary_op = tf.summary.merge_all() logs_dir = "../graph/%s" % net if not os.path.exists(logs_dir): # if os.path.exists(logs_dir) == False: os.mkdir(logs_dir) # 模型训练 init = tf.global_variables_initializer() sess = tf.Session() saver = tf.train.Saver(max_to_keep=3) sess.run(init) # 模型的graph writer = tf.summary.FileWriter(logs_dir, sess.graph) # 使用 tf.train.Coordinator()来创建一个线程管理器(协调器)对象, 管理线程. coord = tf.train.Coordinator() # 启动QueueRunner threads = tf.train.start_queue_runners(sess=sess, coord=coord) i = 0 MAX_STEP = int(num / FLAGS.batch_size + 1) * end_epoch epoch = 0 sess.graph.finalize() try: for step in range(MAX_STEP): i = i + 1 if coord.should_stop(): break image_batch_array, label_batch_array, bbox_batch_array, landmark_batch_array = sess.run([image_batch, label_batch, bbox_batch, landmark_batch]) # 随机翻转图像 image_batch_array, landmark_batch_array = random_flip_images(image_batch_array, label_batch_array, landmark_batch_array) _, _, summary = sess.run([train_op, lr_op, summary_op], feed_dict={input_image: image_batch_array, label: label_batch_array, bbox_target: bbox_batch_array, landmark_target: landmark_batch_array}) # 训练过程 if (step+1) % display == 0: cls_loss, bbox_loss, landmark_loss, L2_loss, lr, acc = sess.run([cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, lr_op, accuracy_op], feed_dict={input_image: image_batch_array, label: label_batch_array, bbox_target: bbox_batch_array, landmark_target: landmark_batch_array}) total_loss = radio_cls_loss*cls_loss + radio_bbox_loss*bbox_loss + radio_landmark_loss*landmark_loss + L2_loss print('epoch: %d/%d' % (epoch+1, end_epoch)) print("Step: %d/%d, accuracy: %3f, cls loss: %4f, bbox loss: %4f, Landmark loss :%4f, L2 loss: %4f, Total Loss: %4f, lr:%f" % (step+1, MAX_STEP, acc, cls_loss, bbox_loss, landmark_loss, L2_loss, total_loss, lr)) # 每一次epoch保留一次模型 if i * FLAGS.batch_size > num: epoch = epoch + 1 i = 0 path_prefix = saver.save(sess, prefix, global_step=epoch) writer.add_summary(summary, global_step=step) except tf.errors.OutOfRangeError: print("完成!!!") finally: coord.request_stop() writer.close() coord.join(threads) sess.close() def optimize(base_lr, loss, data_num): """ 参数优化 """ lr_factor = 0.1 global_step = tf.Variable(0, trainable=False) # 计算训练次数 data_num / batch 为整个训练集完成一次训练需要的次数. 再乘以epoch(整个数据集训练次数), 即为总的训练次数. # 这里使用阶梯式的学习率lr, 所以lr也区分三个. base_lr * lr_factor ^ x ---> x=(0, 1, 2, 3) boundaries = [int(epoch * data_num / FLAGS.batch_size) for epoch in FLAGS.LR_EPOCH] lr_values = [base_lr * (lr_factor ** x) for x in range(0, len(FLAGS.LR_EPOCH) + 1)] lr_op = tf.train.piecewise_constant(global_step, boundaries, lr_values) # 使用momentum优化器 optimizer = tf.train.MomentumOptimizer(lr_op, 0.9) train_op = optimizer.minimize(loss, global_step) return train_op, lr_op def read_single_tfrecord(tfrecord_file, batch_size, net): """ 读取tfrecord数据 """ filename_queue = tf.train.string_input_producer([tfrecord_file], shuffle=True) reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) image_features = tf.parse_single_example(serialized_example, features={ 'image/encoded': tf.FixedLenFeature([], tf.string), 'image/label': tf.FixedLenFeature([], tf.int64), 'image/roi': tf.FixedLenFeature([4], tf.float32), 'image/landmark': tf.FixedLenFeature([10], tf.float32)}) if net == 'PNet': image_size = 12 elif net == 'RNet': image_size = 24 elif net == 'ONet': image_size = 48 # _bytes_feature将原始图像进行转换保存到tfrecords文件, tf.decode_raw将原来编码为字符串类型的变量重新变回来原始图像数据 image = tf.decode_raw(image_features['image/encoded'], tf.uint8) image = tf.reshape(image, [image_size, image_size, 3]) # 将值规划在[-1,1]内 image = (tf.cast(image, tf.float32)-127.5)/128 # 上面将数据转换成uint8, 即8位无符号整型(0-255). label = tf.cast(image_features['image/label'], tf.float32) roi = tf.cast(image_features['image/roi'], tf.float32) landmark = tf.cast(image_features['image/landmark'], tf.float32) image, label, roi, landmark = tf.train.batch([image, label, roi, landmark], batch_size=batch_size, num_threads=2, capacity=batch_size) # tf.train.batch获取一个batch的数据, 所以下面将数据的第一维reshape成batch_size. label = tf.reshape(label, [batch_size]) roi = tf.reshape(roi, [batch_size, 4]) landmark = tf.reshape(landmark, [batch_size, 10]) return image, label, roi, landmark def read_multi_tfrecords(tfrecord_files, batch_sizes, net): """ 读取多个tfrecord文件放一起 """ pos_dir, part_dir, neg_dir, landmark_dir = tfrecord_files pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size = batch_sizes pos_image, pos_label, pos_roi, pos_landmark = read_single_tfrecord(pos_dir, pos_batch_size, net) part_image, part_label, part_roi, part_landmark = read_single_tfrecord(part_dir, part_batch_size, net) neg_image, neg_label, neg_roi, neg_landmark = read_single_tfrecord(neg_dir, neg_batch_size, net) landmark_image, landmark_label, landmark_roi, landmark_landmark = read_single_tfrecord(landmark_dir, landmark_batch_size, net) images = tf.concat([pos_image, part_image, neg_image, landmark_image], 0, name="concat/image") labels = tf.concat([pos_label, part_label, neg_label, landmark_label], 0, name="concat/label") assert isinstance(labels, object) rois = tf.concat([pos_roi, part_roi, neg_roi, landmark_roi], 0, name="concat/roi") landmarks = tf.concat([pos_landmark, part_landmark, neg_landmark, landmark_landmark], 0, name="concat/landmark") return images, labels, rois, landmarks def image_color_distort(inputs): inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5) inputs = tf.image.random_brightness(inputs, max_delta=0.2) inputs = tf.image.random_hue(inputs,max_delta= 0.2) inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5) return inputs def random_flip_images(image_batch,label_batch,landmark_batch): '''随机翻转图像''' if random.choice([0,1]) > 0: num_images = image_batch.shape[0] fliplandmarkindexes = np.where(label_batch==-2)[0] flipposindexes = np.where(label_batch==1)[0] flipindexes = np.concatenate((fliplandmarkindexes,flipposindexes)) for i in flipindexes: cv2.flip(image_batch[i],1,image_batch[i]) for i in fliplandmarkindexes: landmark_ = landmark_batch[i].reshape((-1,2)) landmark_ = np.asarray([(1-x, y) for (x, y) in landmark_]) landmark_[[0, 1]] = landmark_[[1, 0]] landmark_[[3, 4]] = landmark_[[4, 3]] landmark_batch[i] = landmark_.ravel() return image_batch,landmark_batch
train.py

# coding: utf-8 from model import P_Net, R_Net, O_Net import argparse import os import sys import config as FLAGS from train_model import train net_factorys = [P_Net, R_Net, O_Net] def main(args): size = args.input_size base_dir = os.path.join('../data/', str(size)) if size == 12: net = 'PNet' net_factory = net_factorys[0] end_epoch = FLAGS.end_epoch[0] elif size == 24: net = 'RNet' net_factory = net_factorys[1] end_epoch = FLAGS.end_epoch[1] elif size == 48: net = 'ONet' net_factory = net_factorys[2] end_epoch = FLAGS.end_epoch[2] model_path = os.path.join('../model/', net) if not os.path.exists(model_path): os.mkdir(model_path) prefix = os.path.join(model_path, net) display = FLAGS.display lr = FLAGS.lr train(net_factory, prefix, end_epoch, base_dir, display, lr) def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
其中,模型文件:
model.py

# coding: utf-8 # In[1]: import tensorflow as tf slim = tf.contrib.slim import numpy as np # 只把70%数据用作参数更新 num_keep_radio = 0.7 def P_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True): """ PNet的结构 """ with tf.variable_scope('PNet'): # 使用tensorflow slim构建神经网络 with slim.arg_scope([slim.conv2d], activation_fn=prelu, weights_initializer=slim.xavier_initializer(), weights_regularizer=slim.l2_regularizer(0.0005), padding='VALID'): net = slim.conv2d(inputs, 10, 3, scope='conv1') # 第一层:输出为10, kernel_size为3 net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool1') net = slim.conv2d(net, 16, 3, scope='conv2') net = slim.conv2d(net, 32, 3, scope='conv3') # 二分类输出通道数为2 conv4_1 = slim.conv2d(net, 2, 1, activation_fn=tf.nn.softmax, scope='conv4_1') # 二分类预测是不是人脸框 bbox_pred = slim.conv2d(net, 4, 1, activation_fn=None, scope='conv4_2') # 4回归获取人脸框坐标 landmark_pred = slim.conv2d(net, 10, 1, activation_fn=None, scope='conv4_3') # 10回归获取人脸特征点坐标 if training: # 删除维度1, 2, size为1的维度, 即:[batch 1 1 2] -> [batch, 2] cls_prob = tf.squeeze(conv4_1, [1, 2], name='cls_prob') cls_loss = cls_ohem(cls_prob, label) bbox_pred = tf.squeeze(bbox_pred, [1, 2], name='bbox_pred') # [batch, 4] bbox_loss = bbox_ohem(bbox_pred, bbox_target, label) landmark_pred = tf.squeeze(landmark_pred, [1, 2], name='landmark_pred') # [batch, 10] landmark_loss = landmark_ohem(landmark_pred, landmark_target, label) accuracy = cal_accuracy(cls_prob, label) L2_loss = tf.add_n(slim.losses.get_regularization_losses()) return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy else: # 测试时batch_size=1 cls_pro_test = tf.squeeze(conv4_1, axis=0) bbox_pred_test = tf.squeeze(bbox_pred, axis=0) landmark_pred_test = tf.squeeze(landmark_pred, axis=0) return cls_pro_test, bbox_pred_test, landmark_pred_test def R_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True): """ RNet的结构 """ with tf.variable_scope('RNet'): with slim.arg_scope([slim.conv2d], activation_fn=prelu, weights_initializer=slim.xavier_initializer(), weights_regularizer=slim.l2_regularizer(0.0005), padding='VALID'): net = slim.conv2d(inputs, 28, 3, scope='conv1') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1') net = slim.conv2d(net, 48, 3, scope='conv2') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2') net = slim.conv2d(net, 64, 2, scope='conv3') fc_flatten = slim.flatten(net) fc1 = slim.fully_connected(fc_flatten, num_outputs=128, scope='fc1') cls_prob = slim.fully_connected(fc1, num_outputs=2,activation_fn=tf.nn.softmax, scope='cls_fc') bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc') landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc') if training: cls_loss = cls_ohem(cls_prob, label) bbox_loss = bbox_ohem(bbox_pred, bbox_target, label) landmark_loss = landmark_ohem(landmark_pred, landmark_target, label) accuracy = cal_accuracy(cls_prob, label) L2_loss = tf.add_n(slim.losses.get_regularization_losses()) return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy else: return cls_prob, bbox_pred, landmark_pred def O_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True): """ ONet结构 """ with tf.variable_scope('ONet'): with slim.arg_scope([slim.conv2d], activation_fn=prelu, weights_initializer=slim.xavier_initializer(), weights_regularizer=slim.l2_regularizer(0.0005), padding='VALID'): net = slim.conv2d(inputs, 32, 3, scope='conv1') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1') net = slim.conv2d(net, 64, 3, scope='conv2') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2') net = slim.conv2d(net, 64, 3, scope='conv3') net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool3') net = slim.conv2d(net, 128, 2, scope='conv4') fc_flatten = slim.flatten(net) fc1 = slim.fully_connected(fc_flatten, num_outputs=256, scope='fc1') cls_prob = slim.fully_connected(fc1, num_outputs=2, activation_fn=tf.nn.softmax, scope='cls_fc') bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc') landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc') if training: cls_loss = cls_ohem(cls_prob, label) bbox_loss = bbox_ohem(bbox_pred, bbox_target, label) landmark_loss = landmark_ohem(landmark_pred, landmark_target, label) accuracy = cal_accuracy(cls_prob, label) L2_loss = tf.add_n(slim.losses.get_regularization_losses()) return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy else: return cls_prob, bbox_pred, landmark_pred def prelu(inputs): """ prelu函数定义 """ alphas = tf.get_variable('alphas', shape=inputs.get_shape()[-1], dtype=tf.float32, initializer=tf.constant_initializer(0.25)) pos = tf.nn.relu(inputs) neg = alphas*(inputs-abs(inputs))*0.5 return pos+neg def cls_ohem(cls_prob, label): """ 计算类别损失 参数: cls_prob:预测类别,是否有人 label:真实值 返回值: 损失 """ zeros = tf.zeros_like(label) # neg: 0, pos: 1, part: -1 # negatives and positives are used for face classification tasks # 这里只把pos的label置1, neg和part的label置0. # neg: label->0, pos: label->1, part: 0 label_filter_invalid = tf.where(tf.less(label, 0), zeros, label) num_cls_prob = tf.size(cls_prob) # 计算类别的size=batch*2 cls_prob_reshape = tf.reshape(cls_prob, [num_cls_prob, -1]) # 将类别数组转换成1维的 label_int = tf.cast(label_filter_invalid, tf.int32) # 将置0, 1的数组转换成int32的 num_row = tf.to_int32(cls_prob.get_shape()[0]) # 获取batch数 # 对应某一batch而言,batch*2为非人类别概率,batch*2+1为人概率类别,indices为对应 cls_prob_reshape # 应该的真实值,后续用交叉熵计算损失 row = tf.range(num_row) * 2 # 生成每一个类别的基址:(0, 2, 4, 6, ..., (num_row - 1) * 2) # 以上面为基址, 即每个样本的neg类别, label_int为是neg还是pos. 训练样本的label_int=0, neg; label_int=1, pos. indices_ = row + label_int # 获取真实标签对应的概率, indices_显示了实际标签的类别. 是neg还是pos. label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_)) loss = -tf.log(label_prob+1e-10) # 这里有点疑问, 交叉熵损失函数公式不是这样的吧?????? zeros = tf.zeros_like(label_prob, dtype=tf.float32) ones = tf.ones_like(label_prob, dtype=tf.float32) # 统计neg和pos的数量loss, 这里筛选neg和pos的loss用于后续训练 # label小于0(即part: -1)-> 0, 否则:pos、part均为1. # 上面全部计算了所有的 valid_inds = tf.where(label < zeros, zeros, ones) num_valid = tf.reduce_sum(valid_inds) # 选取70%的数据 keep_num = tf.cast(num_valid*num_keep_radio, dtype=tf.int32) # 只选取neg, pos的70%损失 # loss * valid_inds 数组想乘只保留valid_inds为1的元素 loss = loss * valid_inds loss, _ = tf.nn.top_k(loss, k=keep_num) return tf.reduce_mean(loss) def bbox_ohem(bbox_pred, bbox_target, label): """ 计算box的损失 """ zeros_index = tf.zeros_like(label, dtype=tf.float32) ones_index = tf.ones_like(label, dtype=tf.float32) # 保留pos和part的数据 valid_inds = tf.where(tf.equal(tf.abs(label), 1), ones_index, zeros_index) # 计算平方差损失 square_error = tf.square(bbox_pred-bbox_target) square_error = tf.reduce_sum(square_error, axis=1) # 保留的数据的个数 num_valid = tf.reduce_sum(valid_inds) keep_num = tf.cast(num_valid, dtype=tf.int32) # 保留pos和part部分的损失 square_error = square_error*valid_inds square_error, _ = tf.nn.top_k(square_error, k=keep_num) return tf.reduce_mean(square_error) def landmark_ohem(landmark_pred, landmark_target, label): """ 计算关键点损失 """ ones = tf.ones_like(label, dtype=tf.float32) zeros = tf.zeros_like(label, dtype=tf.float32) # 只保留landmark数据 valid_inds = tf.where(tf.equal(label, -2), ones, zeros) # 计算平方差损失 square_error = tf.square(landmark_pred-landmark_target) square_error = tf.reduce_sum(square_error, axis=1) # 保留数据个数 num_valid = tf.reduce_sum(valid_inds) keep_num = tf.cast(num_valid, dtype=tf.int32) # 保留landmark部分数据损失 square_error = square_error*valid_inds square_error, _ = tf.nn.top_k(square_error, k=keep_num) return tf.reduce_mean(square_error) def cal_accuracy(cls_prob, label): """ 计算分类准确率 """ # 预测最大概率的类别,0代表无人,1代表有人 pred = tf.argmax(cls_prob, axis=1) label_int = tf.cast(label, tf.int64) # 保留label>=0的数据,即pos和neg的数据 cond = tf.where(tf.greater_equal(label_int, 0)) picked = tf.squeeze(cond) # 获取pos和neg的label值 label_picked = tf.gather(label_int, picked) pred_picked = tf.gather(pred, picked) # 计算准确率 accuracy_op = tf.reduce_mean(tf.cast(tf.equal(label_picked, pred_picked), tf.float32)) return accuracy_op
生成下一个网络的输入:
gen_hard_example.py

# coding: utf-8 import sys from utils import * import numpy as np import argparse import os import pickle import cv2 from tqdm import tqdm from loader import TestLoader sys.path.append('../') from train.model import P_Net, R_Net, O_Net import train.config as config from detection.detector import Detector from detection.fcn_detector import FcnDetector from detection.MtcnnDetector import MtcnnDetector def main(args): """ 通过PNet或RNet生成下一个网络的输入 """ size = args.input_size batch_size = config.batches min_face_size = config.min_face stride = config.stride thresh = config.thresh # 模型地址 model_path = ['../model/PNet/', '../model/RNet/', '../model/ONet'] if size == 12: net = 'PNet' save_size = 24 elif size == 24: net = 'RNet' save_size = 48 # 图片数据地址 base_dir = '../data/WIDER_train/' # 处理后的图片存放地址 data_dir = '../data/%d' % save_size neg_dir = os.path.join(data_dir, 'negative') pos_dir = os.path.join(data_dir, 'positive') part_dir = os.path.join(data_dir, 'part') for dir_path in [neg_dir, pos_dir, part_dir]: if not os.path.exists(dir_path): os.makedirs(dir_path) detectors = [None, None, None] PNet = FcnDetector(P_Net, model_path[0]) detectors[0] = PNet if net == 'RNet': RNet = Detector(R_Net, 24, batch_size[1], model_path[1]) detectors[1] = RNet basedir = '../data/' filename = '../data/wider_face_train_bbx_gt.txt' # 读取文件的image和box对应函数在utils中 data = read_annotation(base_dir, filename) mtcnn_detector = MtcnnDetector(detectors, min_face_size=min_face_size, stride=stride, threshold=thresh) save_path = data_dir save_file = os.path.join(save_path, 'detections.pkl') if not os.path.exists(save_file): # 将data制作成迭代器 print('载入数据') test_data = TestLoader(data['images']) detectors, _ = mtcnn_detector.detect_face(test_data) print('完成识别') with open(save_file, 'wb') as f: pickle.dump(detectors, f, 1) print('开始生成图像') save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path) def save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path): """ 将网络识别的box用来裁剪原图像作为下一个网络的输入 """ im_idx_list = data['images'] gt_boxes_list = data['bboxes'] num_of_images = len(im_idx_list) neg_label_file = "../data/%d/neg_%d.txt" % (save_size, save_size) neg_file = open(neg_label_file, 'w') pos_label_file = "../data/%d/pos_%d.txt" % (save_size, save_size) pos_file = open(pos_label_file, 'w') part_label_file = "../data/%d/part_%d.txt" % (save_size, save_size) part_file = open(part_label_file, 'w') # read detect result det_boxes = pickle.load(open(os.path.join(save_path, 'detections.pkl'), 'rb')) # print(len(det_boxes), num_of_images) assert len(det_boxes) == num_of_images, "弄错了" n_idx = 0 p_idx = 0 d_idx = 0 image_done = 0 for im_idx, dets, gts in tqdm(zip(im_idx_list, det_boxes, gt_boxes_list)): gts = np.array(gts, dtype=np.float32).reshape(-1, 4) image_done += 1 if dets.shape[0] == 0: continue img = cv2.imread(im_idx) # 转换成正方形 dets = convert_to_square(dets) dets[:, 0:4] = np.round(dets[:, 0:4]) neg_num = 0 for box in dets: x_left, y_top, x_right, y_bottom, _ = box.astype(int) width = x_right - x_left + 1 height = y_bottom - y_top + 1 # 除去过小的box框 if width < 20 or x_left < 0 or y_top < 0 or x_right > img.shape[1] - 1 or y_bottom > img.shape[0] - 1: continue Iou = IOU(box, gts) cropped_im = img[y_top:y_bottom + 1, x_left:x_right + 1, :] # 截取图片得到box. resized_im = cv2.resize(cropped_im, (save_size, save_size), interpolation=cv2.INTER_LINEAR) # 划分种类, 选取60张neg人脸框用于后续网络的训练. if np.max(Iou) < 0.3 and neg_num < 60: save_file = os.path.join(neg_dir, "%s.jpg" % n_idx) neg_file.write(save_file + ' 0\n') cv2.imwrite(save_file, resized_im) n_idx += 1 neg_num += 1 else: idx = np.argmax(Iou) # 获取IOU最大的索引 assigned_gt = gts[idx] # 得到IOU最大的人脸框 x1, y1, x2, y2 = assigned_gt # 偏移量 offset_x1 = (x1 - x_left) / float(width) offset_y1 = (y1 - y_top) / float(height) offset_x2 = (x2 - x_right) / float(width) offset_y2 = (y2 - y_bottom) / float(height) # pos和part if np.max(Iou) >= 0.65: save_file = os.path.join(pos_dir, "%s.jpg" % p_idx) pos_file.write(save_file + ' 1 %.2f %.2f %.2f %.2f\n' % ( offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) p_idx += 1 elif np.max(Iou) >= 0.4: save_file = os.path.join(part_dir, "%s.jpg" % d_idx) part_file.write(save_file + ' -1 %.2f %.2f %.2f %.2f\n' % ( offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) d_idx += 1 neg_file.close() part_file.close() pos_file.close() def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
训练过程如下:
source activate tensorflow
进入到preprocess目录下:
python gen_12net_data.py生成三种pnet数据
python gen_landmark_aug.py 12 生成pnet的landmark数据
python gen_imglist_pnet.py整理到一起
python gen_tfrecords.py 12生成tfrecords文件
进入到train目录下:
python train.py 12 训练pnet
tensorboard显示loss:
进入到preprocess目录:
python gen_hard_example.py 12 生成三种rnet数据,
python gen_landmark_aug.py 24 生成rnet的landmark数据,
python gen_tfrecords.py 24生成tfrecords文件
将目录cd到train上python train.py 24 训练rnet
将目录cd到preprocess上,
python gen_hard_example.py 24 生成三种onet数据,
python gen_landmark_aug.py 48 生成onet的landmark数据,
python gen_tfrecords.py 48生成tfrecords文件
将目录cd到train上python train.py 48 训练onet
测试文件:
test.py

# coding: utf-8 import sys from detection.MtcnnDetector import MtcnnDetector from detection.detector import Detector from detection.fcn_detector import FcnDetector from train.model import P_Net, R_Net, O_Net import cv2 import os import numpy as np import train.config as config test_mode = config.test_mode thresh = config.thresh min_face_size = config.min_face stride = config.stride detectors = [None, None, None] # 模型放置位置 model_path = ['model/PNet/', 'model/RNet/', 'model/ONet'] batch_size = config.batches PNet = FcnDetector(P_Net, model_path[0]) detectors[0] = PNet if test_mode in ["RNet", "ONet"]: RNet = Detector(R_Net, 24, batch_size[1], model_path[1]) detectors[1] = RNet if test_mode == "ONet": ONet = Detector(O_Net, 48, batch_size[2], model_path[2]) detectors[2] = ONet mtcnn_detector = MtcnnDetector(detectors=detectors, min_face_size=min_face_size, stride=stride, threshold=thresh) out_path = config.out_path if config.input_mode == '1': # 选用图片 path = config.test_dir # print(path) for item in os.listdir(path): img_path = os.path.join(path, item) img = cv2.imread(img_path) boxes_c, landmarks = mtcnn_detector.detect(img) for i in range(boxes_c.shape[0]): bbox = boxes_c[i, :4] score = boxes_c[i, 4] corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])] # 画人脸框 cv2.rectangle(img, (corpbbox[0], corpbbox[1]), (corpbbox[2], corpbbox[3]), (255, 0, 0), 1) # 判别为人脸的置信度 cv2.putText(img, '{:.2f}'.format(score), (corpbbox[0], corpbbox[1] - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # 画关键点 for i in range(landmarks.shape[0]): for j in range(len(landmarks[i])//2): cv2.circle(img, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255)) cv2.imshow('im', img) k = cv2.waitKey(0) & 0xFF if k == 27: cv2.imwrite(out_path + item, img) cv2.destroyAllWindows() if config.input_mode == '2': cap = cv2.VideoCapture(0) fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(out_path+'out.mp4', fourcc, 10, (640, 480)) while True: t1 = cv2.getTickCount() ret, frame = cap.read() if ret: boxes_c, landmarks = mtcnn_detector.detect(frame) t2 = cv2.getTickCount() t = (t2-t1)/cv2.getTickFrequency() fps = 1.0/t for i in range(boxes_c.shape[0]): bbox = boxes_c[i, :4] score = boxes_c[i, 4] corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])] # 画人脸框 cv2.rectangle(frame, (corpbbox[0], corpbbox[1]), (corpbbox[2], corpbbox[3]), (255, 0, 0), 1) # 画置信度 cv2.putText(frame, '{:.2f}'.format(score), (corpbbox[0], corpbbox[1] - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # 画fps值 cv2.putText(frame, '{:.4f}'.format(t) + " " + '{:.3f}'.format(fps), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2) # 画关键点 for i in range(landmarks.shape[0]): for j in range(len(landmarks[i])//2): cv2.circle(frame, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255)) a = out.write(frame) cv2.imshow("result", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break else: break cap.release() out.release() cv2.destroyAllWindows()
其中使用到的模块:
detector.py

# coding: utf-8 import tensorflow as tf import numpy as np class Detector: """ 识别多组图片 """ def __init__(self, net_factory, data_size, batch_size, model_path): graph = tf.Graph() with graph.as_default(): self.image_op = tf.placeholder(tf.float32, [None, data_size, data_size, 3]) self.cls_prob, self.bbox_pred, self.landmark_pred = net_factory(self.image_op, training=False) self.sess = tf.Session() # 重载模型 saver = tf.train.Saver() model_file = tf.train.latest_checkpoint(model_path) saver.restore(self.sess, model_file) self.data_size = data_size self.batch_size = batch_size def predict(self, databatch): scores = [] batch_size = self.batch_size minibatch = [] cur = 0 # 所有数据总数 n = databatch.shape[0] # 将数据整理成固定batch while cur < n: minibatch.append(databatch[cur:min(cur+batch_size, n), :, :, :]) cur += batch_size cls_prob_list = [] bbox_pred_list = [] landmark_pred_list = [] for idx, data in enumerate(minibatch): m = data.shape[0] real_size = self.batch_size # 最后一组数据不够一个batch的处理 if m < batch_size: keep_inds = np.arange(m) gap = self.batch_size-m while gap >= len(keep_inds): gap -= len(keep_inds) keep_inds = np.concatenate((keep_inds, keep_inds)) if gap != 0: keep_inds = np.concatenate((keep_inds, keep_inds[:gap])) data = data[keep_inds] real_size = m cls_prob, bbox_pred, landmark_pred = self.sess.run([self.cls_prob, self.bbox_pred, self.landmark_pred], feed_dict={self.image_op: data}) cls_prob_list.append(cls_prob[:real_size]) bbox_pred_list.append(bbox_pred[:real_size]) landmark_pred_list.append(landmark_pred[:real_size]) return np.concatenate(cls_prob_list, axis=0), np.concatenate(bbox_pred_list, axis=0), np.concatenate(landmark_pred_list, axis=0)
fcn_detector.py

# coding: utf-8 import tensorflow as tf import sys sys.path.append('../') import train.config as config class FcnDetector: """ 识别单张图片 """ def __init__(self, net_factory, model_path): graph = tf.Graph() with graph.as_default(): self.image_op = tf.placeholder(tf.float32, name='input_image') self.width_op = tf.placeholder(tf.int32, name='image_width') self.height_op = tf.placeholder(tf.int32, name='image_height') image_reshape = tf.reshape(self.image_op, [1, self.height_op, self.width_op, 3]) # 预测值 self.cls_prob, self.bbox_pred, _ = net_factory(image_reshape, training=False) self.sess = tf.Session() # 重载模型 saver = tf.train.Saver() model_file = tf.train.latest_checkpoint(model_path) saver.restore(self.sess, model_file) def predict(self, databatch): height, width, _ = databatch.shape cls_prob, bbox_pred = self.sess.run([self.cls_prob, self.bbox_pred], feed_dict={self.image_op: databatch, self.width_op: width, self.height_op: height}) return cls_prob, bbox_pred
MtcnnDetector.py

# coding: utf-8 import cv2 import numpy as np import sys sys.path.append('../') from preprocess.utils import * from tqdm import tqdm def py_nms(dets, thresh): """ 剔除太相似的box """ x1 = dets[:, 0] y1 = dets[:, 1] x2 = dets[:, 2] y2 = dets[:, 3] scores = dets[:, 4] areas = (x2 - x1 + 1) * (y2 - y1 + 1) # 将概率值从大到小排列 order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter+1e-10) # 保留小于阈值的下标,因为order[0]拿出来做比较了,所以inds+1是原来对应的下标 inds = np.where(ovr <= thresh)[0] order = order[inds + 1] return keep class MtcnnDetector: """ 来生成人脸的图像 """ def __init__(self, detectors, min_face_size=20, stride=2, threshold=[0.6, 0.7, 0.7], scale_factor=0.79 # 图像金字塔的缩小率 ): self.pnet_detector = detectors[0] self.rnet_detector = detectors[1] self.onet_detector = detectors[2] self.min_face_size = min_face_size self.stride = stride self.thresh = threshold self.scale_factor = scale_factor def detect_face(self, test_data): all_boxes = [] landmarks = [] batch_idx = 0 num_of_img = test_data.size empty_array = np.array([]) for databatch in tqdm(test_data): batch_idx += 1 im = databatch if self.pnet_detector: boxes, boxes_c, landmark = self.detect_pnet(im) if boxes_c is None: all_boxes.append(empty_array) landmarks.append(empty_array) continue if self.rnet_detector: boxes, boxes_c, landmark = self.detect_rnet(im, boxes_c) if boxes_c is None: all_boxes.append(empty_array) landmarks.append(empty_array) continue if self.onet_detector: boxes, boxes_c, landmark = self.detect_onet(im, boxes_c) if boxes_c is None: all_boxes.append(empty_array) landmarks.append(empty_array) continue all_boxes.append(boxes_c) landmark = [1] landmarks.append(landmark) return all_boxes, landmarks def detect_pnet(self, im): """ 通过PNet筛选box和landmark 参数: im:输入图像[h,2,3] """ h, w, c = im.shape net_size = 12 # 人脸和输入图像的比率 current_scale = float(net_size) / self.min_face_size im_resized = self.processed_image(im, current_scale) current_height, current_width, _ = im_resized.shape all_boxes = list() # 图像金字塔, 不断地去resize图片 while min(current_height, current_width) > net_size: # 类别和box # 这里是测试流程, 输入是一张图片(size不一定是12*12) # 因此这里面输出得到的cls_cls_map形状是feature map(n * m * 2) # reg形状是是(n * m * 4) cls_cls_map, reg = self.pnet_detector.predict(im_resized) boxes = self.generate_bbox(cls_cls_map[:, :, 1], reg, current_scale, self.thresh[0]) current_scale *= self.scale_factor # 继续缩小图像做金字塔 im_resized = self.processed_image(im, current_scale) current_height, current_width, _ = im_resized.shape if boxes.size == 0: continue # 非极大值抑制留下重复低的box keep = py_nms(boxes[:, :5], 0.5) boxes = boxes[keep] all_boxes.append(boxes) if len(all_boxes) == 0: return None, None, None all_boxes = np.vstack(all_boxes) # 将金字塔之后的box也进行非极大值抑制 keep = py_nms(all_boxes[:, 0:5], 0.7) all_boxes = all_boxes[keep] boxes = all_boxes[:, :5] # box的长宽 bbw = all_boxes[:, 2] - all_boxes[:, 0] + 1 bbh = all_boxes[:, 3] - all_boxes[:, 1] + 1 # 对应原图的box坐标和分数, 训练数据是相对于人脸框bbox的归一化的offset, 因此这里dx、dy也都是归一化的. boxes_c = np.vstack([all_boxes[:, 0] + all_boxes[:, 5] * bbw, # all_boxes[:, 5]--> dx1 all_boxes[:, 1] + all_boxes[:, 6] * bbh, # all_boxes[:, 6]--> dy1 all_boxes[:, 2] + all_boxes[:, 7] * bbw, # all_boxes[:, 7]--> dx2 all_boxes[:, 3] + all_boxes[:, 8] * bbh, # all_boxes[:, 8]--> dy2 all_boxes[:, 4]]) boxes_c = boxes_c.T return boxes, boxes_c, None def detect_rnet(self, im, dets): """ 通过rent选择box 参数: im:输入图像 dets: PNet选择的box,是相对原图的绝对坐标 返回值: box绝对坐标 """ h, w, c = im.shape # 将PNet的box变成包含它的正方形,可以避免信息损失 dets = convert_to_square(dets) dets[:, 0:4] = np.round(dets[:, 0:4]) # 调整超出图像的box [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h) delete_size = np.ones_like(tmpw)*20 ones = np.ones_like(tmpw) zeros = np.zeros_like(tmpw) num_boxes = np.sum(np.where((np.minimum(tmpw, tmph) >= delete_size), ones, zeros)) cropped_ims = np.zeros((num_boxes, 24, 24, 3), dtype=np.float32) for i in range(num_boxes): # 将PNet生成的box相对与原图进行裁剪, 超出部分用0补 if tmph[i] < 20 or tmpw[i] < 20: continue tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8) tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] cropped_ims[i, :, :, :] = (cv2.resize(tmp, (24, 24)) - 127.5) / 128 cls_scores, reg, _ = self.rnet_detector.predict(cropped_ims) cls_scores = cls_scores[:, 1] keep_inds = np.where(cls_scores > self.thresh[1])[0] if len(keep_inds) > 0: boxes = dets[keep_inds] boxes[:, 4] = cls_scores[keep_inds] reg = reg[keep_inds] else: return None, None, None keep = py_nms(boxes, 0.6) boxes = boxes[keep] # 对PNet截取的图像的坐标进行校准,生成RNet的人脸框对于原图的绝对坐标 boxes_c = self.calibrate_box(boxes, reg[keep]) return boxes, boxes_c, None def detect_onet(self, im, dets): """ 将ONet的选框继续筛选基本和RNet差不多但多返回了landmark """ h, w, c = im.shape dets = convert_to_square(dets) dets[:, 0:4] = np.round(dets[:, 0:4]) [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h) num_boxes = dets.shape[0] cropped_ims = np.zeros((num_boxes, 48, 48, 3), dtype=np.float32) for i in range(num_boxes): tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8) tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] cropped_ims[i, :, :, :] = (cv2.resize(tmp, (48, 48)) - 127.5) / 128 cls_scores, reg, landmark = self.onet_detector.predict(cropped_ims) cls_scores = cls_scores[:, 1] keep_inds = np.where(cls_scores > self.thresh[2])[0] if len(keep_inds) > 0: boxes = dets[keep_inds] boxes[:, 4] = cls_scores[keep_inds] reg = reg[keep_inds] landmark = landmark[keep_inds] else: return None, None, None w = boxes[:, 2] - boxes[:, 0] + 1 h = boxes[:, 3] - boxes[:, 1] + 1 landmark[:, 0::2] = (np.tile(w, (5, 1)) * landmark[:, 0::2].T + np.tile(boxes[:, 0], (5, 1)) - 1).T landmark[:, 1::2] = (np.tile(h, (5, 1)) * landmark[:, 1::2].T + np.tile(boxes[:, 1], (5, 1)) - 1).T boxes_c = self.calibrate_box(boxes, reg) boxes = boxes[py_nms(boxes, 0.6)] keep = py_nms(boxes_c, 0.6) boxes_c = boxes_c[keep] landmark = landmark[keep] return boxes, boxes_c, landmark def processed_image(self, img, scale): """ 预处理数据,转化图像尺度并对像素归一到[-1, 1] """ height, width, channels = img.shape new_height = int(height * scale) new_width = int(width * scale) new_dim = (new_width, new_height) img_resized = cv2.resize(img, new_dim, interpolation=cv2.INTER_LINEAR) img_resized = (img_resized - 127.5) / 128 return img_resized def generate_bbox(self, cls_map, reg, scale, threshold): """ 得到对应原图的box坐标,分类分数,box偏移量 cls_map: n * m(输入是cls_cls_map[:, :, 1], 第一维, 人脸框的概率.) reg: n * m * 4 """ # pnet大致将图像size缩小2倍 stride = 2 cellsize = 12 # 将置信度高的留下, 即为预测的人脸框. 二维的. t_index = np.where(cls_map > threshold) # 没有人脸, 这里也可以是t_index[1].size # 使用np.where(二维数组), 得到包括两个元素的列表, 第一个元素是第一维的坐标, 第二个元素是第二维的坐标. if t_index[0].size == 0: return np.array([]) # 偏移量 dx1, dy1, dx2, dy2 = [reg[t_index[0], t_index[1], i] for i in range(4)] reg = np.array([dx1, dy1, dx2, dy2]) score = cls_map[t_index[0], t_index[1]] # 对应原图的box坐标,分类分数,box偏移量 # 原始图片中回归框坐标需要经过反向运算,计算方式如下,其中cellSize=12,是因为12*12的图片进去后变成1*1 # stride=2是因为几层卷积中只有一个stride为2,scale代表的是我们在哪个尺度金字塔的图像, boundingbox = np.vstack([np.round((stride * t_index[1]) / scale), np.round((stride * t_index[0]) / scale), np.round((stride * t_index[1] + cellsize) / scale), np.round((stride * t_index[0] + cellsize) / scale), score, reg]) # shape[n,9] return boundingbox.T def pad(self, bboxes, w, h): """ 将超出图像的box进行处理 参数: bboxes: 人脸框 w, h: 图像长宽 返回值: dy, dx : 为调整后的box的左上角坐标相对于原box左上角的坐标 edy, edx : n为调整后的box右下角相对原box左上角的相对坐标 y, x : 调整后的box在原图上左上角的坐标 ey, ex : 调整后的box在原图上右下角的坐标 tmph, tmpw: 原始box的长宽 """ # box的长宽 tmpw, tmph = bboxes[:, 2] - bboxes[:, 0] + 1, bboxes[:, 3] - bboxes[:, 1] + 1 num_box = bboxes.shape[0] dx, dy = np.zeros((num_box, )), np.zeros((num_box, )) edx, edy = tmpw.copy() - 1, tmph.copy() - 1 # box左上右下的坐标 x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3] # 找到超出右下边界的box并将ex, ey归为图像的w, h # edx, edy为调整后的box右下角相对原box左上角的相对坐标 tmp_index = np.where(ex > w - 1) # w -1 + tmpw -1 - edx= ex -> edx = w + tmpw - ex - 2 edx[tmp_index] = tmpw[tmp_index] + w - 2 - ex[tmp_index] ex[tmp_index] = w - 1 tmp_index = np.where(ey > h - 1) # h -1 + tmph -1 - edy = ey -> edy = h + tmph - ey - 2 edy[tmp_index] = tmph[tmp_index] + h - 2 - ey[tmp_index] ey[tmp_index] = h - 1 # 找到超出左上角的box并将x,y归为0 # dx, dy为调整后的box的左上角坐标相对于原box左上角的坐标 tmp_index = np.where(x < 0) dx[tmp_index] = 0 - x[tmp_index] x[tmp_index] = 0 tmp_index = np.where(y < 0) dy[tmp_index] = 0 - y[tmp_index] y[tmp_index] = 0 return_list = [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] return_list = [item.astype(np.int32) for item in return_list] return return_list def calibrate_box(self, bbox, reg): """ 校准box 参数: bbox: PNet生成的box reg: RNet生成的box偏移值 返回值: 调整后的box是针对原图的绝对坐标 """ bbox_c = bbox.copy() w = bbox[:, 2] - bbox[:, 0] + 1 w = np.expand_dims(w, 1) h = bbox[:, 3] - bbox[:, 1] + 1 h = np.expand_dims(h, 1) reg_m = np.hstack([w, h, w, h]) aug = reg_m * reg bbox_c[:, 0:4] = bbox_c[:, 0:4] + aug return bbox_c def detect(self, img): """ 用于测试当个图像的 """ boxes = None # PNet if self.pnet_detector: boxes, boxes_c, _ = self.detect_pnet(img) if boxes_c is None: return np.array([]), np.array([]) # RNet if self.rnet_detector: boxes, boxes_c, _ = self.detect_rnet(img, boxes_c) if boxes_c is None: return np.array([]), np.array([]) # ONet if self.onet_detector: boxes, boxes_c, landmark = self.detect_onet(img, boxes_c) if boxes_c is None: return np.array([]), np.array([]) return boxes_c, landmark
测试验证过程:
python test.py
结果:
图片数据来源网络,仅供学习使用,如有侵权,请联系删除,谢谢!
参考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html