論文:《Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Networks》
論文網址:https://arxiv.org/abs/1604.02878v1
一、總體框架
MTCNN通過不同的卷積神經網絡,實現對人臉的識別以及人臉關鍵點檢測。總的框架如下:
圖1 Pipeline
如圖1所示為MTCNN的整體框架(檢測實現流程—測試流程)。
給定一張圖片,需要將其resize成不同大小的圖片,建立圖像金字塔。這些不同size的圖片是下面三個stage的輸入。
stage1:首先使用全卷積網絡(P-Net)獲取候選框和他們的回歸向量。然后使用估計的bounding box回歸向量去標定候選框。然后再使用非極大值抑制(NMS)去合並高度重疊的候選框;
stage2:這一層使用一個提煉網絡(Refine Network, R-Net)。所有stage1中的候選框傳入R-Net,使用邊界框回歸(bounding box regression)以及NMS,使得消除更多假的候選框(false candidates);
stage3:stage3使用一個輸出網絡(O-Net),該階段與stage2相似。但是在這個階段,我們的目標是更加詳細地描述人臉。尤其是該網絡將要輸出5個人臉標記位置(facial landmarks' positions)。
二、CNN結構:
許多的論文都設計CNN用於人臉檢測。但是,這些論文都受到以下幾個原因的限制:
1)許多filter缺乏權重多樣性限制了他們產生有判別力的描述。
2)相比於其他的多分類物體檢測和分類任務,人臉檢測是一個具有挑戰性的二分類任務,因此可能需要更少數量的filters但是需要對人臉更具有辨別力。為了這個目的,我們減少filter的數量,將5×5的filter變成3×3的filter,減少計算量,盡管增加filter的深度能夠獲取到更好的性能。通過這些改進,我們可以得到更好的性能,但是運行時間變少。CNN的結構如圖2所示。
圖2 CNN的結構(MP-max polling,Conv—convolution, 卷積和池化的step分別為1和2)
三、訓練
使用三個tasks訓練CNN detector,分別為:人臉/非人臉分類,邊界框回歸以及人臉標記定位。
1)人臉分類
學習目標可以表述為二分類任務。對於每個樣本$x_{i}$,我們使用交叉熵損失函數(cross-entropy loss):
$L_{i}^{det}=-(y_{i}^{det}log(p_{i})+(1-y_{i}^{det})(1-log(p_{i})))$ (1)
其中,$p_{i}$是神經網絡輸出的概率,表示了一個樣本是人臉的概率。$y_{i}^{det}\in \left \{0, 1\right \}$,表示ground truth的標簽。
2)邊界框回歸
對於每一個候選框,我們需要預測它與最近的ground truth的偏移,包括:左上坐標、高度和寬度。學習目標可以表述為回歸問題,對於每一個樣本$x_{i}$,我們使用歐式損失(Euclidean loss):
$L_{i}^{box}=\left \| \hat{y}_{i}^{box} - y_{i}^{box}\right \|_{2}^{2}$ (2)
其中,$\hat{y}_{i}^{box}$回歸目標是從神經網絡獲得的(即網絡的輸出),$y_{i}^{box}$是ground truth。有4個坐標,包括:左上、高度和寬度,因此$y_{i}^{box}\in \mathbb{R}^{4}$。
3)人臉標記定位
和邊界框回歸任務相似,人臉標記定位可以表示為回歸任務問題,使用最小化歐式損失:
$L_{i}^{landmark}=\left \| \hat{y}_{i}^{landmark} - y_{i}^{landmark} \right \|_{2}^{2}$ (3)
其中,$\hat{y}_{i}^{landmark}$是從神經網絡輸出獲得的人臉標記坐標,$ y_{i}^{landmark}$是ground truth。因為有5個人臉標記,包括:左眼睛、右眼睛、鼻子、嘴巴左邊界和嘴巴右邊界,因此$y_{i}^{landmark}\in \mathbb{R}^{10}$。
4)多數據源訓練
因為我們在不同的CNN中執行不同的任務,所以在訓練過程中,使用不同類型的訓練圖像數據,例如:人臉、非人臉和部分人臉數據。所以,一些損失函數(1-3公式)不會使用。例如,對於背景區域,我們僅僅計算$L_{i}^{det}$,另外兩個損失設置為0,這個可以使用采樣類型指示器實現。總的學習目標可以表示為:
$min\sum _{i=1}^{N}\sum _{j\in det,box,landmark}\alpha _{j}\beta _{i}^{j}L_{i}^{j}$ (4)
其中,$N$為訓練樣本的數量。$\alpha_{j}$表示人物的重要性(分別設置為:P-Net和R-Net中,$\alpha_{det}=1,\alpha_{box}=0.5,\alpha_{landmark}=0.5$;O-Net為了獲得更加精確的人臉標記點定位,在O-Net中,$\alpha_{det}=1,\alpha_{box}=0.5,\alpha_{landmark}=1$)。$\beta _{i}^{j}\in \left \{ 0,1 \right \}$為采樣類型指示器。使用隨機梯度下降算法(SGD)訓練CNNs。
5)在線困難樣本挖掘
不同於在原始分類訓練完成之后執行傳統的困難樣本挖掘,我們采用在線困難樣挖掘適應訓練過程。
特別地,我們對前向傳播過程計算出的損失進行分類,然后只采用其中的70%作為困難樣本。然后我們在后向傳播過程中,只計算困難樣本的梯度。這也就意味着我們忽略簡單樣本,這些簡單樣本對增強訓練過程的探測功能不太有幫助。
6)訓練數據
因為我們聯合執行人臉檢測和人臉對齊,因此我們在訓練過程中使用四種不同的數據標記。分別為:
6.1負樣本:與圖片中任何一個ground truth的IOU小於0.3的區域;
6.2正樣本:與圖片中任何一個ground truth的IOU大於0.65的區域;
6.3部分人臉:IOU介於0.4和0.65之間;
6.4標記人臉:標記5個人臉標記位置的圖片;
其中,負樣本和正樣本用於人臉分類任務(即判別是人臉還是非人臉);正樣本和部分人臉用於邊界框回歸;人臉編輯樣本用於人臉標記定位。每一個網絡的訓練數據可以如下表示:
①P-Net:從WIDER FACE數據集中隨機裁剪獲取正樣本、負樣本和部分人臉樣本。然后,從CelebA數據裁剪人臉標記數據,需要resize成12×12;
②R-Net:將框架第一階段的輸出的proposal作為R-Net的輸入,需要resize成24×24;
③O-Net :輸入是經過第二步篩選和refine過的人臉框,同樣從原圖摳出后統一resize到48*48,成批輸入ONet。
后面階段都是在前面階段的基礎上對訓練結果進行調整。
四、測試階段
如第一節的總體架構,首先使圖像生成圖像金字塔,生成多尺度的圖像,然后輸入P-Net(因為P-Net是全卷積網絡,該網絡的輸出的featuremap上的每一個特征點都對應於輸入圖像上的12×12的區域,因此)。PNet由於尺寸很小,所以可以很快的選出候選區域,但是准確率不高,不同尺度上的判斷出來的人臉檢測框,然后采用NMS算法,合並候選框,然后根據候選框提取圖像,之后縮放到24*24的大小,作為RNet的輸入,RNet可以精確的選取邊框,一般最后只剩幾個邊框,最后縮放到48*48的大小,輸入ONet,判斷后選框是不是人臉,ONet雖然速度較慢,但是由於經過前兩個網絡,已經得到了高概率的邊框,所以輸入ONet的圖像較少,然后ONet輸出精確的邊框和關鍵點信息,只是在第三個階段上才顯示人臉特征定位;前兩個階段只是分類,不顯示人臉定點的結果。
參考:https://blog.csdn.net/wfei101/article/details/79935037
五、項目實踐
參考項目地址:GitHub
根據參考項目做一些調整,模型實現。
數據集下載:
這里使用的數據集是WIDER FACE以及CelebA。
代碼講解如下:
參考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html
主要代碼理解如下:
生成P-Net數據:
gen_12net_data.py

# coding: utf-8 """ 截取pos,neg,part三種類型圖片並resize成12x12大小作為PNet的輸入 """ import os import cv2 import numpy as np npr = np.random from tqdm import tqdm from utils import IOU # face的id對應label的txt anno_file = '../data/wider_face_train.txt' # 圖片地址 im_dir = '../data/WIDER_train/images' # pos,part,neg裁剪圖片放置位置 pos_save_dir = '../data/12/positive' part_save_dir = '../data/12/part' neg_save_dir = '../data/12/negative' # PNet數據地址 save_dir = '../data/12' if not os.path.exists(save_dir): os.mkdir(save_dir) if not os.path.exists(pos_save_dir): os.mkdir(pos_save_dir) if not os.path.exists(part_save_dir): os.mkdir(part_save_dir) if not os.path.exists(neg_save_dir): os.mkdir(neg_save_dir) f1 = open(os.path.join(save_dir, 'pos_12.txt'), 'w') f2 = open(os.path.join(save_dir, 'neg_12.txt'), 'w') f3 = open(os.path.join(save_dir, 'part_12.txt'), 'w') with open(anno_file, 'r') as f: annotations = f.readlines() num = len(annotations) print('總共的圖片數: %d' % num) # 記錄pos, neg, part三類生成數 p_idx = 0 n_idx = 0 d_idx = 0 # 記錄讀取圖片數 idx = 0 for annotation in tqdm(annotations): # 進度條顯示 annotation = annotation.strip().split(' ') im_path = annotation[0] box = list(map(float, annotation[1:])) boxes = np.array(box, dtype=np.float32).reshape(-1, 4) # numpy.array.reshape -> 4列, 每一行是box img = cv2.imread(os.path.join(im_dir, im_path+'.jpg')) idx += 1 height, width, channel = img.shape neg_num = 0 # 先采樣一定數量neg圖片 while neg_num < 50: # 隨機選取截取圖像大小 size = npr.randint(12, min(width, height)/2) # 隨機選取左上坐標 nx = npr.randint(0, width-size) ny = npr.randint(0, height-size) # 截取box crop_box = np.array([nx, ny, nx+size, ny+size]) # 計算iou值 Iou = IOU(crop_box, boxes) # 截取圖片並resize成12x12大小 cropped_im = img[ny:ny+size, nx:nx+size, :] # cv2.imread讀取的圖片第一維度是y resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR) # P-Net的訓練輸入圖像大小為12 × 12 # iou值小於0.3判定為neg圖像 if np.max(Iou) < 0.3: save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx) # neg的圖片的絕對路徑 f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0\n') # neg_12.txt文件保存neg的圖片的絕對路徑 cv2.imwrite(save_file, resized_im) # 將截取的圖片保存 n_idx += 1 neg_num += 1 for box in boxes: # 以每個box為基礎選取截圖 # 左上右下坐標 x1, y1, x2, y2 = box w = x2 - x1 + 1 h = y2 - y1 + 1 # 舍去圖像過小和box在圖片外的圖像 if max(w, h) < 20 or x1 < 0 or y1 < 0: continue for i in range(5): # 每個box附近截取5個截圖用於判斷是否為negative訓練樣本 size = npr.randint(12, min(width, height)/2) # 隨機生成的關於x1, y1的偏移量,並且保證x1+delta_x>0,y1+delta_y>0 delta_x = npr.randint(max(-size, -x1), w) delta_y = npr.randint(max(-size, -y1), h) # 截取后的左上角坐標 # 這里面是獲取negative的截圖, 所以可以(最好是)隨意選取, 因此左上角坐標和偏移量都是隨意選取的. nx1 = int(max(0, x1+delta_x)) ny1 = int(max(0, y1+delta_y)) # 排除大於圖片尺度的 if nx1 + size > width or ny1 + size > height: continue crop_box = np.array([nx1, ny1, nx1+size, ny1+size]) Iou = IOU(crop_box, boxes) cropped_im = img[ny1:ny1+size, nx1:nx1+size, :] resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR) if np.max(Iou) < 0.3: save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx) f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0\n') cv2.imwrite(save_file, resized_im) n_idx += 1 for i in range(20): # 每個box附近截取20個截圖用於判斷是否為positive或者是part訓練樣本 # 這里是截取positive和part圖片, 目的是需要截取box附近的圖片, 因此下面size的大小也需要接近w, h. 不然取不到positive、part的幾率大. size = npr.randint(int(min(w, h)*0.8), np.ceil(1.25*max(w, h))) # 除去尺度小的box # 注意:w, h是box的尺寸. width、height是整個訓練圖片的尺寸. if w < 5: continue # 在box附近截取圖片, 偏移量取值, 稍微小一點好. delta_x = npr.randint(-w*0.2, w*0.2) delta_y = npr.randint(-h*0.2, h*0.2) # 截取圖像左上坐標計算是先計算x1+w/2表示的中心坐標,再+delta_x偏移量,再-size/2, nx1 = int(max(x1+w/2+delta_x-size/2, 0)) ny1 = int(max(y1+h/2+delta_y-size/2, 0)) nx2 = nx1 + size ny2 = ny1 + size # 排除超出的圖像 if nx2 > width or ny2 > height: continue crop_box = np.array([nx1, ny1, nx2, ny2]) # 人臉框相對於截取圖片的偏移量並做歸一化處理 # 這里訓練數據使用相對於人臉框歸一化處理的offset, 實際測試時得到的也是歸一化的offset. 因此訓練就是獲取歸一化的offset. offset_x1 = (x1-nx1)/float(size) offset_y1 = (y1-ny1)/float(size) offset_x2 = (x2-nx2)/float(size) offset_y2 = (y2-ny2)/float(size) cropped_im = img[ny1:ny2, nx1:nx2, :] resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR) # box擴充一個維度作為iou輸入 box_ = box.reshape(1, -1) # 這里是每一個box, 對每一個box和截取的圖像進行IOU計算 iou = IOU(crop_box, box_) if iou >= 0.65: save_file = os.path.join(pos_save_dir, '%s.jpg'%p_idx) f1.write(pos_save_dir+'/%s.jpg'%p_idx+' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) p_idx += 1 elif iou >= 0.4: save_file = os.path.join(part_save_dir, '%s.jpg'%d_idx) f3.write(part_save_dir+'/%s.jpg'%d_idx+' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) d_idx += 1 print('%s 個圖片已處理,pos:%s part: %s neg:%s' %(idx, p_idx, d_idx, n_idx)) f1.close() f2.close() f3.close()
生成landmark數據 :
gen_landmark_aug.py

# coding: utf-8 import os import random import sys import cv2 import numpy as np npr = np.random import argparse from tqdm import tqdm from utils import IOU from BBox_utils import getDataFromTxt, BBox data_dir = '../data' def main(args): """ 用於處理帶有landmark的數據 """ size = args.input_size # 是否對圖像變換 argument = True if size == 12: net = 'PNet' elif size == 24: net = 'RNet' elif size == 48: net = 'ONet' image_id = 0 # 數據輸出路徑 OUTPUT = os.path.join(data_dir, str(size)) if not os.path.exists(OUTPUT): os.mkdir(OUTPUT) # 圖片處理后輸出路徑 dstdir = os.path.join(OUTPUT, 'train_%s_landmark_aug' %(net)) if not os.path.exists(dstdir): os.mkdir(dstdir) # label記錄txt ftxt = os.path.join(data_dir, 'trainImageList.txt') # trainImageList.txt記錄了CelebA數據的路徑以及關鍵點信息. # 記錄label的txt f = open(os.path.join(OUTPUT, 'landmark_%d_aug.txt' %(size)), 'w') # 獲取圖像路徑,box,關鍵點 data = getDataFromTxt(ftxt, data_dir) idx = 0 for (imgPath, box, landmarkGt) in tqdm(data): # 存儲人臉圖片和關鍵點 F_imgs = [] F_landmarks = [] img = cv2.imread(imgPath) img_h, img_w, img_c = img.shape gt_box = np.array([box.left, box.top, box.right, box.bottom]) # 人臉圖片 f_face = img[box.top:box.bottom+1, box.left:box.right+1] # resize成網絡輸入大小 f_face = cv2.resize(f_face, (size, size)) landmark = np.zeros((5, 2)) for index, one in enumerate(landmarkGt): # 關鍵點相對於左上坐標偏移量並歸一化 rv = ((one[0]-gt_box[0])/(gt_box[2]-gt_box[0]), (one[1]-gt_box[1])/(gt_box[3]-gt_box[1])) landmark[index] = rv F_imgs.append(f_face) F_landmarks.append(landmark.reshape(10)) landmark = np.zeros((5, 2)) if argument: # 對圖像變換 idx = idx+1 x1, y1, x2, y2 = gt_box gt_w = x2 - x1 + 1 gt_h = y2 - y1 + 1 # 除去過小的人臉圖像 if max(gt_w, gt_h) < 40 or x1 < 0 or y1 < 0: continue for i in range(10): # 隨機裁剪圖像大小 # 每張圖片截取10個, x下面計算方法類似於在positive和part的截圖過程. box_size = npr.randint(int(min(gt_w, gt_h)*0.8), np.ceil(1.25*max(gt_w, gt_h))) # 隨機左上坐標偏移量 delta_x = npr.randint(-gt_w*0.2, gt_w*0.2) delta_y = npr.randint(-gt_h*0.2, gt_h*0.2) # 計算左上坐標 nx1 = int(max(x1+gt_w/2-box_size/2+delta_x, 0)) ny1 = int(max(y1+gt_h/2-box_size/2+delta_y, 0)) nx2 = nx1 + box_size ny2 = ny1 + box_size # 除去超過邊界的 if nx2 > img_w or ny2 > img_h: continue # 裁剪邊框, 圖片 crop_box = np.array([nx1, ny1, nx2, ny2]) cropped_im = img[ny1:ny2+1, nx1:nx2+1, :] resized_im = cv2.resize(cropped_im, (size, size)) iou = IOU(crop_box, np.expand_dims(gt_box, 0)) # 擴展數組形狀. -> 1 * 1 * 4 # 只保留pos圖像 if iou > 0.65: F_imgs.append(resized_im) # 關鍵點相對偏移 for index, one in enumerate(landmarkGt): rv = ((one[0]-nx1)/box_size, (one[1]-ny1)/box_size) landmark[index] = rv F_landmarks.append(landmark.reshape(10)) landmark = np.zeros((5, 2)) landmark_ = F_landmarks[-1].reshape(-1, 2) box = BBox([nx1, ny1, nx2, ny2]) # 鏡像 if random.choice([0, 1]) > 0: face_flipped, landmark_flipped = flip(resized_im, landmark_) face_flipped = cv2.resize(face_flipped, (size, size)) F_imgs.append(face_flipped) F_landmarks.append(landmark_flipped.reshape(10)) # 逆時針翻轉 if random.choice([0, 1]) > 0: face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), 5) # 關鍵點偏移 landmark_rorated = box.projectLandmark(landmark_rorated) face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size)) F_imgs.append(face_rotated_by_alpha) F_landmarks.append(landmark_rorated.reshape(10)) # 左右翻轉 face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated) face_flipped = cv2.resize(face_flipped, (size, size)) F_imgs.append(face_flipped) F_landmarks.append(landmark_flipped.reshape(10)) # 順時針翻轉 if random.choice([0, 1]) > 0: face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), -5) # 關鍵點偏移 landmark_rorated = box.projectLandmark(landmark_rorated) face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size)) F_imgs.append(face_rotated_by_alpha) F_landmarks.append(landmark_rorated.reshape(10)) # 左右翻轉 face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated) face_flipped = cv2.resize(face_flipped, (size, size)) F_imgs.append(face_flipped) F_landmarks.append(landmark_flipped.reshape(10)) F_imgs, F_landmarks = np.asarray(F_imgs), np.asarray(F_landmarks) for i in range(len(F_imgs)): # 剔除數據偏移量在[0,1]之間 if np.sum(np.where(F_landmarks[i] <= 0, 1, 0)) > 0: continue if np.sum(np.where(F_landmarks[i] >= 1, 1, 0)) > 0: continue cv2.imwrite(os.path.join(dstdir, '%d.jpg' %(image_id)), F_imgs[i]) landmarks = list(map(str, list(F_landmarks[i]))) f.write(os.path.join(dstdir, '%d.jpg' %(image_id))+' -2 '+' '.join(landmarks)+'\n') image_id += 1 f.close() return F_imgs, F_landmarks def flip(face, landmark): # 鏡像 face_flipped_by_x = cv2.flip(face, 1) landmark_ = np.asarray([(1-x, y) for (x, y) in landmark]) landmark_[[0, 1]] = landmark_[[1, 0]] landmark_[[3, 4]] = landmark_[[4, 3]] return (face_flipped_by_x, landmark_) def rotate(img, box, landmark, alpha): # 旋轉 center = ((box.left+box.right)/2, (box.top+box.bottom)/2) rot_mat = cv2.getRotationMatrix2D(center, alpha, 1) img_rotated_by_alpha = cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0])) landmark_ = np.asarray([(rot_mat[0][0]*x+rot_mat[0][1]*y+rot_mat[0][2], rot_mat[1][0]*x+rot_mat[1][1]*y+rot_mat[1][2]) for (x, y) in landmark]) face = img_rotated_by_alpha[box.top:box.bottom+1, box.left:box.right+1] return (face, landmark_) def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
合並生成P-Net訓練使用的數據:
gen_imglist_pnet.py

# coding: utf-8 import numpy as np npr = np.random import os data_dir = '../data/' """ 將pos, part, neg, landmark四者混在一起 """ size = 12 with open(os.path.join(data_dir, '12/pos_12.txt'), 'r') as f: pos = f.readlines() with open(os.path.join(data_dir, '12/neg_12.txt'), 'r') as f: neg = f.readlines() with open(os.path.join(data_dir, '12/part_12.txt'), 'r') as f: part = f.readlines() with open(os.path.join(data_dir, '12/landmark_12_aug.txt'), 'r') as f: landmark = f.readlines() dir_path = os.path.join(data_dir, '12') if not os.path.exists(dir_path): os.makedirs(dir_path) with open(os.path.join(dir_path, 'train_pnet_landmark.txt'), 'w') as f: nums = [len(neg), len(pos), len(part)] base_num = 250000 print('neg數量:{} pos數量:{} part數量:{} 基數:{}'.format(len(neg), len(pos), len(part), base_num)) if len(neg) > base_num*3: neg_keep = npr.choice(len(neg), size=base_num*3, replace=True) else: neg_keep = npr.choice(len(neg), size=len(neg), replace=True) sum_p = len(neg_keep)//3 # pos : part : neg = 1 : 1 : 3 pos_keep = npr.choice(len(pos), sum_p, replace=True) part_keep = npr.choice(len(part), sum_p, replace=True) print('neg數量:{} pos數量:{} part數量:{}'.format(len(neg_keep), len(pos_keep), len(part_keep))) for i in pos_keep: f.write(pos[i]) for i in neg_keep: f.write(neg[i]) for i in part_keep: f.write(part[i]) for item in landmark: f.write(item)
將訓練數據轉換成TFRecords個數文件:
gen_tfrecords.py

# coding: utf-8 import os import random import sys import time import tensorflow as tf import cv2 from tqdm import tqdm import argparse def main(args): """ 生成tfrecords文件 """ size = args.input_size # 數據存放地址 dataset_dir = '../data/' # tfrecord存放地址 output_dir = os.path.join(dataset_dir, str(size)+'/tfrecord') if not os.path.exists(output_dir): os.mkdir(output_dir) # pnet只生成一個混合的tfrecords, rnet和onet要分別生成4個 if size == 12: net = 'PNet' tf_filenames = [os.path.join(output_dir, 'train_%s_landmark.tfrecord' % net)] items = ['12/train_pnet_landmark.txt'] elif size == 24: net = 'RNet' tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord') item1 = '%d/pos_%d.txt' % (size, size) tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord') item2 = '%d/part_%d.txt' % (size, size) tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord') item3 = '%d/neg_%d.txt' % (size, size) tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord') item4 = '%d/landmark_%d_aug.txt' % (size, size) tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4] items = [item1, item2, item3, item4] elif size == 48: net = 'ONet' tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord') item1 = '%d/pos_%d.txt' % (size, size) tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord') item2 = '%d/part_%d.txt' % (size, size) tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord') item3 = '%d/neg_%d.txt' % (size, size) tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord') item4 = '%d/landmark_%d_aug.txt' % (size, size) tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4] items = [item1, item2, item3, item4] if tf.gfile.Exists(tf_filenames[0]): print('tfrecords文件早已生成,無需此操作') return # 獲取數據 for tf_filename, item in zip(tf_filenames, items): print('開始讀取數據') dataset = get_dataset(dataset_dir, item) tf_filename = tf_filename+'_shuffle' random.shuffle(dataset) # 數據進行打亂 print('開始轉換tfrecords') with tf.python_io.TFRecordWriter(tf_filename) as tfrecord_writer: for image_example in tqdm(dataset): filename = image_example['filename'] try: _add_to_tfrecord(filename, image_example, tfrecord_writer) except: print(filename) print('完成轉換') def get_dataset(dir, item): """ 從txt獲取數據 參數: dir:存放數據目錄 item:txt目錄 返回值: 包含label,box,關鍵點的data """ dataset_dir = os.path.join(dir, item) imagelist = open(dataset_dir, 'r') dataset = [] for line in tqdm(imagelist.readlines()): # 進度條顯示 info = line.strip().split(' ') data_example = dict() bbox = dict() data_example['filename'] = info[0] data_example['label'] = int(info[1]) # neg的box默認為0,part,pos的box只包含人臉框,landmark的box只包含關鍵點 bbox['xmin'] = 0 bbox['ymin'] = 0 bbox['xmax'] = 0 bbox['ymax'] = 0 bbox['xlefteye'] = 0 bbox['ylefteye'] = 0 bbox['xrighteye'] = 0 bbox['yrighteye'] = 0 bbox['xnose'] = 0 bbox['ynose'] = 0 bbox['xleftmouth'] = 0 bbox['yleftmouth'] = 0 bbox['xrightmouth'] = 0 bbox['yrightmouth'] = 0 if len(info) == 6: # 長度為6, 說明只有人臉框標記(6-2) bbox['xmin'] = float(info[2]) bbox['ymin'] = float(info[3]) bbox['xmax'] = float(info[4]) bbox['ymax'] = float(info[5]) if len(info) == 12: # 長度為12, 說明是人臉關鍵點關鍵點(12-2) bbox['xlefteye'] = float(info[2]) bbox['ylefteye'] = float(info[3]) bbox['xrighteye'] = float(info[4]) bbox['yrighteye'] = float(info[5]) bbox['xnose'] = float(info[6]) bbox['ynose'] = float(info[7]) bbox['xleftmouth'] = float(info[8]) bbox['yleftmouth'] = float(info[9]) bbox['xrightmouth'] = float(info[10]) bbox['yrightmouth'] = float(info[11]) data_example['bbox'] = bbox dataset.append(data_example) return dataset def _add_to_tfrecord(filename, image_example, tfrecord_writer): """ 轉換成tfrecord文件 參數: filename:圖片文件名 image_example:數據 tfrecord_writer:寫入文件 """ image_data, height, width = _process_image_withoutcoder(filename) example = _convert_to_example_simple(image_example, image_data) tfrecord_writer.write(example.SerializeToString()) def _process_image_withoutcoder(filename): """ 讀取圖片文件,返回圖片大小 """ image = cv2.imread(filename) image_data = image.tostring() assert len(image.shape) == 3 height = image.shape[0] width = image.shape[1] assert image.shape[2] == 3 return image_data, height, width # 不同類型數據的轉換 def _int64_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) def _float_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(float_list=tf.train.FloatList(value=value)) def _bytes_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(bytes_list=tf.train.BytesList(value=value)) def _convert_to_example_simple(image_example, image_buffer): """ 轉換成tfrecord接受形式 """ class_label = image_example['label'] bbox = image_example['bbox'] roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']] landmark = [bbox['xlefteye'], bbox['ylefteye'], bbox['xrighteye'], bbox['yrighteye'], bbox['xnose'], bbox['ynose'], bbox['xleftmouth'], bbox['yleftmouth'], bbox['xrightmouth'], bbox['yrightmouth']] example = tf.train.Example(features=tf.train.Features(feature={ 'image/encoded': _bytes_feature(image_buffer), 'image/label': _int64_feature(class_label), 'image/roi': _float_feature(roi), 'image/landmark': _float_feature(landmark) })) return example def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
訓練:
train_model.py

# coding: utf-8 import os import sys from datetime import datetime import numpy as np import tensorflow as tf import config as FLAGS import random import cv2 def train(net_factory, prefix, end_epoch, base_dir, display, base_lr): """ 訓練模型 """ size = int(base_dir.split('/')[-1]) # 獲取得到網絡大小(因為base_dir保存的路徑為:../data/12, ../data/24, ../data/48) # 論文中的alpha, 代表了任務的重要性. 和論文中保持一致. if size == 12: net = 'PNet' radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5; elif size == 24: net = 'RNet' radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5; elif size == 48: net = 'ONet' radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 1; if net == 'PNet': # 計算一共多少組數據 label_file = os.path.join(base_dir, 'train_pnet_landmark.txt') f = open(label_file, 'r') num = len(f.readlines()) dataset_dir = os.path.join(base_dir, 'tfrecord/train_PNet_landmark.tfrecord_shuffle') # 從tfrecord讀取數據 image_batch, label_batch, bbox_batch, landmark_batch = read_single_tfrecord(dataset_dir, FLAGS.batch_size, net) else: # 計算一共多少組數據 label_file1 = os.path.join(base_dir, 'pos_%d.txt' % size) f1 = open(label_file1, 'r') label_file2 = os.path.join(base_dir, 'part_%d.txt' % size) f2 = open(label_file2, 'r') label_file3 = os.path.join(base_dir, 'neg_%d.txt' % size) f3 = open(label_file3, 'r') label_file4 = os.path.join(base_dir, 'landmark_%d_aug.txt' % size) f4 = open(label_file4, 'r') num = len(f1.readlines())+len(f2.readlines())+len(f3.readlines())+len(f4.readlines()) pos_dir = os.path.join(base_dir, 'tfrecord/pos_landmark.tfrecord_shuffle') part_dir = os.path.join(base_dir, 'tfrecord/part_landmark.tfrecord_shuffle') neg_dir = os.path.join(base_dir, 'tfrecord/neg_landmark.tfrecord_shuffle') landmark_dir = os.path.join(base_dir, 'tfrecord/landmark_landmark.tfrecord_shuffle') dataset_dirs = [pos_dir, part_dir, neg_dir, landmark_dir] # 各數據占比 # 目的是使每一個batch的數據占比都相同 # 訓練數據的比例, pos : part : landmark, neg = 1 : 1 : 1 : 3. pos_radio, part_radio, landmark_radio, neg_radio = 1.0/6, 1.0/6, 1.0/6, 3.0/6 pos_batch_size = int(np.ceil(FLAGS.batch_size*pos_radio)) assert pos_batch_size != 0, "Batch Size 有誤 " part_batch_size = int(np.ceil(FLAGS.batch_size*part_radio)) assert part_batch_size != 0, "BBatch Size 有誤 " neg_batch_size = int(np.ceil(FLAGS.batch_size*neg_radio)) assert neg_batch_size != 0, "Batch Size 有誤 " landmark_batch_size = int(np.ceil(FLAGS.batch_size*landmark_radio)) assert landmark_batch_size != 0, "Batch Size 有誤 " batch_sizes = [pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size] image_batch, label_batch, bbox_batch, landmark_batch = read_multi_tfrecords(dataset_dirs, batch_sizes, net) # 定義占位符, 訓練時使用, 后續將讀取的tfrecords數據傳入. input_image = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, size, size, 3], name='input_image') label = tf.placeholder(tf.float32, shape=[FLAGS.batch_size], name='label') bbox_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 4], name='bbox_target') landmark_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 10], name='landmark_target') # 圖像色相變換 input_image = image_color_distort(input_image) cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, accuracy_op = net_factory(input_image, label, bbox_target, landmark_target, training=True) # 計算訓練損失, 論文中公式實現. total_loss_op = radio_cls_loss*cls_loss_op+radio_bbox_loss*bbox_loss_op+radio_landmark_loss*landmark_loss_op+L2_loss_op train_op, lr_op = optimize(base_lr, total_loss_op, num) # 將變量添加到tensorboard, 實現可視化. tf.summary.scalar("cls_loss", cls_loss_op) # cls_loss tf.summary.scalar("bbox_loss", bbox_loss_op) # bbox_loss tf.summary.scalar("landmark_loss", landmark_loss_op) # landmark_loss tf.summary.scalar("cls_accuracy", accuracy_op) # cls_acc tf.summary.scalar("total_loss", total_loss_op) # cls_loss, bbox loss, landmark loss and L2 loss add together summary_op = tf.summary.merge_all() logs_dir = "../graph/%s" % net if not os.path.exists(logs_dir): # if os.path.exists(logs_dir) == False: os.mkdir(logs_dir) # 模型訓練 init = tf.global_variables_initializer() sess = tf.Session() saver = tf.train.Saver(max_to_keep=3) sess.run(init) # 模型的graph writer = tf.summary.FileWriter(logs_dir, sess.graph) # 使用 tf.train.Coordinator()來創建一個線程管理器(協調器)對象, 管理線程. coord = tf.train.Coordinator() # 啟動QueueRunner threads = tf.train.start_queue_runners(sess=sess, coord=coord) i = 0 MAX_STEP = int(num / FLAGS.batch_size + 1) * end_epoch epoch = 0 sess.graph.finalize() try: for step in range(MAX_STEP): i = i + 1 if coord.should_stop(): break image_batch_array, label_batch_array, bbox_batch_array, landmark_batch_array = sess.run([image_batch, label_batch, bbox_batch, landmark_batch]) # 隨機翻轉圖像 image_batch_array, landmark_batch_array = random_flip_images(image_batch_array, label_batch_array, landmark_batch_array) _, _, summary = sess.run([train_op, lr_op, summary_op], feed_dict={input_image: image_batch_array, label: label_batch_array, bbox_target: bbox_batch_array, landmark_target: landmark_batch_array}) # 訓練過程 if (step+1) % display == 0: cls_loss, bbox_loss, landmark_loss, L2_loss, lr, acc = sess.run([cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, lr_op, accuracy_op], feed_dict={input_image: image_batch_array, label: label_batch_array, bbox_target: bbox_batch_array, landmark_target: landmark_batch_array}) total_loss = radio_cls_loss*cls_loss + radio_bbox_loss*bbox_loss + radio_landmark_loss*landmark_loss + L2_loss print('epoch: %d/%d' % (epoch+1, end_epoch)) print("Step: %d/%d, accuracy: %3f, cls loss: %4f, bbox loss: %4f, Landmark loss :%4f, L2 loss: %4f, Total Loss: %4f, lr:%f" % (step+1, MAX_STEP, acc, cls_loss, bbox_loss, landmark_loss, L2_loss, total_loss, lr)) # 每一次epoch保留一次模型 if i * FLAGS.batch_size > num: epoch = epoch + 1 i = 0 path_prefix = saver.save(sess, prefix, global_step=epoch) writer.add_summary(summary, global_step=step) except tf.errors.OutOfRangeError: print("完成!!!") finally: coord.request_stop() writer.close() coord.join(threads) sess.close() def optimize(base_lr, loss, data_num): """ 參數優化 """ lr_factor = 0.1 global_step = tf.Variable(0, trainable=False) # 計算訓練次數 data_num / batch 為整個訓練集完成一次訓練需要的次數. 再乘以epoch(整個數據集訓練次數), 即為總的訓練次數. # 這里使用階梯式的學習率lr, 所以lr也區分三個. base_lr * lr_factor ^ x ---> x=(0, 1, 2, 3) boundaries = [int(epoch * data_num / FLAGS.batch_size) for epoch in FLAGS.LR_EPOCH] lr_values = [base_lr * (lr_factor ** x) for x in range(0, len(FLAGS.LR_EPOCH) + 1)] lr_op = tf.train.piecewise_constant(global_step, boundaries, lr_values) # 使用momentum優化器 optimizer = tf.train.MomentumOptimizer(lr_op, 0.9) train_op = optimizer.minimize(loss, global_step) return train_op, lr_op def read_single_tfrecord(tfrecord_file, batch_size, net): """ 讀取tfrecord數據 """ filename_queue = tf.train.string_input_producer([tfrecord_file], shuffle=True) reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) image_features = tf.parse_single_example(serialized_example, features={ 'image/encoded': tf.FixedLenFeature([], tf.string), 'image/label': tf.FixedLenFeature([], tf.int64), 'image/roi': tf.FixedLenFeature([4], tf.float32), 'image/landmark': tf.FixedLenFeature([10], tf.float32)}) if net == 'PNet': image_size = 12 elif net == 'RNet': image_size = 24 elif net == 'ONet': image_size = 48 # _bytes_feature將原始圖像進行轉換保存到tfrecords文件, tf.decode_raw將原來編碼為字符串類型的變量重新變回來原始圖像數據 image = tf.decode_raw(image_features['image/encoded'], tf.uint8) image = tf.reshape(image, [image_size, image_size, 3]) # 將值規划在[-1,1]內 image = (tf.cast(image, tf.float32)-127.5)/128 # 上面將數據轉換成uint8, 即8位無符號整型(0-255). label = tf.cast(image_features['image/label'], tf.float32) roi = tf.cast(image_features['image/roi'], tf.float32) landmark = tf.cast(image_features['image/landmark'], tf.float32) image, label, roi, landmark = tf.train.batch([image, label, roi, landmark], batch_size=batch_size, num_threads=2, capacity=batch_size) # tf.train.batch獲取一個batch的數據, 所以下面將數據的第一維reshape成batch_size. label = tf.reshape(label, [batch_size]) roi = tf.reshape(roi, [batch_size, 4]) landmark = tf.reshape(landmark, [batch_size, 10]) return image, label, roi, landmark def read_multi_tfrecords(tfrecord_files, batch_sizes, net): """ 讀取多個tfrecord文件放一起 """ pos_dir, part_dir, neg_dir, landmark_dir = tfrecord_files pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size = batch_sizes pos_image, pos_label, pos_roi, pos_landmark = read_single_tfrecord(pos_dir, pos_batch_size, net) part_image, part_label, part_roi, part_landmark = read_single_tfrecord(part_dir, part_batch_size, net) neg_image, neg_label, neg_roi, neg_landmark = read_single_tfrecord(neg_dir, neg_batch_size, net) landmark_image, landmark_label, landmark_roi, landmark_landmark = read_single_tfrecord(landmark_dir, landmark_batch_size, net) images = tf.concat([pos_image, part_image, neg_image, landmark_image], 0, name="concat/image") labels = tf.concat([pos_label, part_label, neg_label, landmark_label], 0, name="concat/label") assert isinstance(labels, object) rois = tf.concat([pos_roi, part_roi, neg_roi, landmark_roi], 0, name="concat/roi") landmarks = tf.concat([pos_landmark, part_landmark, neg_landmark, landmark_landmark], 0, name="concat/landmark") return images, labels, rois, landmarks def image_color_distort(inputs): inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5) inputs = tf.image.random_brightness(inputs, max_delta=0.2) inputs = tf.image.random_hue(inputs,max_delta= 0.2) inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5) return inputs def random_flip_images(image_batch,label_batch,landmark_batch): '''隨機翻轉圖像''' if random.choice([0,1]) > 0: num_images = image_batch.shape[0] fliplandmarkindexes = np.where(label_batch==-2)[0] flipposindexes = np.where(label_batch==1)[0] flipindexes = np.concatenate((fliplandmarkindexes,flipposindexes)) for i in flipindexes: cv2.flip(image_batch[i],1,image_batch[i]) for i in fliplandmarkindexes: landmark_ = landmark_batch[i].reshape((-1,2)) landmark_ = np.asarray([(1-x, y) for (x, y) in landmark_]) landmark_[[0, 1]] = landmark_[[1, 0]] landmark_[[3, 4]] = landmark_[[4, 3]] landmark_batch[i] = landmark_.ravel() return image_batch,landmark_batch
train.py

# coding: utf-8 from model import P_Net, R_Net, O_Net import argparse import os import sys import config as FLAGS from train_model import train net_factorys = [P_Net, R_Net, O_Net] def main(args): size = args.input_size base_dir = os.path.join('../data/', str(size)) if size == 12: net = 'PNet' net_factory = net_factorys[0] end_epoch = FLAGS.end_epoch[0] elif size == 24: net = 'RNet' net_factory = net_factorys[1] end_epoch = FLAGS.end_epoch[1] elif size == 48: net = 'ONet' net_factory = net_factorys[2] end_epoch = FLAGS.end_epoch[2] model_path = os.path.join('../model/', net) if not os.path.exists(model_path): os.mkdir(model_path) prefix = os.path.join(model_path, net) display = FLAGS.display lr = FLAGS.lr train(net_factory, prefix, end_epoch, base_dir, display, lr) def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
其中,模型文件:
model.py

# coding: utf-8 # In[1]: import tensorflow as tf slim = tf.contrib.slim import numpy as np # 只把70%數據用作參數更新 num_keep_radio = 0.7 def P_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True): """ PNet的結構 """ with tf.variable_scope('PNet'): # 使用tensorflow slim構建神經網絡 with slim.arg_scope([slim.conv2d], activation_fn=prelu, weights_initializer=slim.xavier_initializer(), weights_regularizer=slim.l2_regularizer(0.0005), padding='VALID'): net = slim.conv2d(inputs, 10, 3, scope='conv1') # 第一層:輸出為10, kernel_size為3 net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool1') net = slim.conv2d(net, 16, 3, scope='conv2') net = slim.conv2d(net, 32, 3, scope='conv3') # 二分類輸出通道數為2 conv4_1 = slim.conv2d(net, 2, 1, activation_fn=tf.nn.softmax, scope='conv4_1') # 二分類預測是不是人臉框 bbox_pred = slim.conv2d(net, 4, 1, activation_fn=None, scope='conv4_2') # 4回歸獲取人臉框坐標 landmark_pred = slim.conv2d(net, 10, 1, activation_fn=None, scope='conv4_3') # 10回歸獲取人臉特征點坐標 if training: # 刪除維度1, 2, size為1的維度, 即:[batch 1 1 2] -> [batch, 2] cls_prob = tf.squeeze(conv4_1, [1, 2], name='cls_prob') cls_loss = cls_ohem(cls_prob, label) bbox_pred = tf.squeeze(bbox_pred, [1, 2], name='bbox_pred') # [batch, 4] bbox_loss = bbox_ohem(bbox_pred, bbox_target, label) landmark_pred = tf.squeeze(landmark_pred, [1, 2], name='landmark_pred') # [batch, 10] landmark_loss = landmark_ohem(landmark_pred, landmark_target, label) accuracy = cal_accuracy(cls_prob, label) L2_loss = tf.add_n(slim.losses.get_regularization_losses()) return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy else: # 測試時batch_size=1 cls_pro_test = tf.squeeze(conv4_1, axis=0) bbox_pred_test = tf.squeeze(bbox_pred, axis=0) landmark_pred_test = tf.squeeze(landmark_pred, axis=0) return cls_pro_test, bbox_pred_test, landmark_pred_test def R_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True): """ RNet的結構 """ with tf.variable_scope('RNet'): with slim.arg_scope([slim.conv2d], activation_fn=prelu, weights_initializer=slim.xavier_initializer(), weights_regularizer=slim.l2_regularizer(0.0005), padding='VALID'): net = slim.conv2d(inputs, 28, 3, scope='conv1') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1') net = slim.conv2d(net, 48, 3, scope='conv2') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2') net = slim.conv2d(net, 64, 2, scope='conv3') fc_flatten = slim.flatten(net) fc1 = slim.fully_connected(fc_flatten, num_outputs=128, scope='fc1') cls_prob = slim.fully_connected(fc1, num_outputs=2,activation_fn=tf.nn.softmax, scope='cls_fc') bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc') landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc') if training: cls_loss = cls_ohem(cls_prob, label) bbox_loss = bbox_ohem(bbox_pred, bbox_target, label) landmark_loss = landmark_ohem(landmark_pred, landmark_target, label) accuracy = cal_accuracy(cls_prob, label) L2_loss = tf.add_n(slim.losses.get_regularization_losses()) return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy else: return cls_prob, bbox_pred, landmark_pred def O_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True): """ ONet結構 """ with tf.variable_scope('ONet'): with slim.arg_scope([slim.conv2d], activation_fn=prelu, weights_initializer=slim.xavier_initializer(), weights_regularizer=slim.l2_regularizer(0.0005), padding='VALID'): net = slim.conv2d(inputs, 32, 3, scope='conv1') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1') net = slim.conv2d(net, 64, 3, scope='conv2') net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2') net = slim.conv2d(net, 64, 3, scope='conv3') net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool3') net = slim.conv2d(net, 128, 2, scope='conv4') fc_flatten = slim.flatten(net) fc1 = slim.fully_connected(fc_flatten, num_outputs=256, scope='fc1') cls_prob = slim.fully_connected(fc1, num_outputs=2, activation_fn=tf.nn.softmax, scope='cls_fc') bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc') landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc') if training: cls_loss = cls_ohem(cls_prob, label) bbox_loss = bbox_ohem(bbox_pred, bbox_target, label) landmark_loss = landmark_ohem(landmark_pred, landmark_target, label) accuracy = cal_accuracy(cls_prob, label) L2_loss = tf.add_n(slim.losses.get_regularization_losses()) return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy else: return cls_prob, bbox_pred, landmark_pred def prelu(inputs): """ prelu函數定義 """ alphas = tf.get_variable('alphas', shape=inputs.get_shape()[-1], dtype=tf.float32, initializer=tf.constant_initializer(0.25)) pos = tf.nn.relu(inputs) neg = alphas*(inputs-abs(inputs))*0.5 return pos+neg def cls_ohem(cls_prob, label): """ 計算類別損失 參數: cls_prob:預測類別,是否有人 label:真實值 返回值: 損失 """ zeros = tf.zeros_like(label) # neg: 0, pos: 1, part: -1 # negatives and positives are used for face classification tasks # 這里只把pos的label置1, neg和part的label置0. # neg: label->0, pos: label->1, part: 0 label_filter_invalid = tf.where(tf.less(label, 0), zeros, label) num_cls_prob = tf.size(cls_prob) # 計算類別的size=batch*2 cls_prob_reshape = tf.reshape(cls_prob, [num_cls_prob, -1]) # 將類別數組轉換成1維的 label_int = tf.cast(label_filter_invalid, tf.int32) # 將置0, 1的數組轉換成int32的 num_row = tf.to_int32(cls_prob.get_shape()[0]) # 獲取batch數 # 對應某一batch而言,batch*2為非人類別概率,batch*2+1為人概率類別,indices為對應 cls_prob_reshape # 應該的真實值,后續用交叉熵計算損失 row = tf.range(num_row) * 2 # 生成每一個類別的基址:(0, 2, 4, 6, ..., (num_row - 1) * 2) # 以上面為基址, 即每個樣本的neg類別, label_int為是neg還是pos. 訓練樣本的label_int=0, neg; label_int=1, pos. indices_ = row + label_int # 獲取真實標簽對應的概率, indices_顯示了實際標簽的類別. 是neg還是pos. label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_)) loss = -tf.log(label_prob+1e-10) # 這里有點疑問, 交叉熵損失函數公式不是這樣的吧?????? zeros = tf.zeros_like(label_prob, dtype=tf.float32) ones = tf.ones_like(label_prob, dtype=tf.float32) # 統計neg和pos的數量loss, 這里篩選neg和pos的loss用於后續訓練 # label小於0(即part: -1)-> 0, 否則:pos、part均為1. # 上面全部計算了所有的 valid_inds = tf.where(label < zeros, zeros, ones) num_valid = tf.reduce_sum(valid_inds) # 選取70%的數據 keep_num = tf.cast(num_valid*num_keep_radio, dtype=tf.int32) # 只選取neg, pos的70%損失 # loss * valid_inds 數組想乘只保留valid_inds為1的元素 loss = loss * valid_inds loss, _ = tf.nn.top_k(loss, k=keep_num) return tf.reduce_mean(loss) def bbox_ohem(bbox_pred, bbox_target, label): """ 計算box的損失 """ zeros_index = tf.zeros_like(label, dtype=tf.float32) ones_index = tf.ones_like(label, dtype=tf.float32) # 保留pos和part的數據 valid_inds = tf.where(tf.equal(tf.abs(label), 1), ones_index, zeros_index) # 計算平方差損失 square_error = tf.square(bbox_pred-bbox_target) square_error = tf.reduce_sum(square_error, axis=1) # 保留的數據的個數 num_valid = tf.reduce_sum(valid_inds) keep_num = tf.cast(num_valid, dtype=tf.int32) # 保留pos和part部分的損失 square_error = square_error*valid_inds square_error, _ = tf.nn.top_k(square_error, k=keep_num) return tf.reduce_mean(square_error) def landmark_ohem(landmark_pred, landmark_target, label): """ 計算關鍵點損失 """ ones = tf.ones_like(label, dtype=tf.float32) zeros = tf.zeros_like(label, dtype=tf.float32) # 只保留landmark數據 valid_inds = tf.where(tf.equal(label, -2), ones, zeros) # 計算平方差損失 square_error = tf.square(landmark_pred-landmark_target) square_error = tf.reduce_sum(square_error, axis=1) # 保留數據個數 num_valid = tf.reduce_sum(valid_inds) keep_num = tf.cast(num_valid, dtype=tf.int32) # 保留landmark部分數據損失 square_error = square_error*valid_inds square_error, _ = tf.nn.top_k(square_error, k=keep_num) return tf.reduce_mean(square_error) def cal_accuracy(cls_prob, label): """ 計算分類准確率 """ # 預測最大概率的類別,0代表無人,1代表有人 pred = tf.argmax(cls_prob, axis=1) label_int = tf.cast(label, tf.int64) # 保留label>=0的數據,即pos和neg的數據 cond = tf.where(tf.greater_equal(label_int, 0)) picked = tf.squeeze(cond) # 獲取pos和neg的label值 label_picked = tf.gather(label_int, picked) pred_picked = tf.gather(pred, picked) # 計算准確率 accuracy_op = tf.reduce_mean(tf.cast(tf.equal(label_picked, pred_picked), tf.float32)) return accuracy_op
生成下一個網絡的輸入:
gen_hard_example.py

# coding: utf-8 import sys from utils import * import numpy as np import argparse import os import pickle import cv2 from tqdm import tqdm from loader import TestLoader sys.path.append('../') from train.model import P_Net, R_Net, O_Net import train.config as config from detection.detector import Detector from detection.fcn_detector import FcnDetector from detection.MtcnnDetector import MtcnnDetector def main(args): """ 通過PNet或RNet生成下一個網絡的輸入 """ size = args.input_size batch_size = config.batches min_face_size = config.min_face stride = config.stride thresh = config.thresh # 模型地址 model_path = ['../model/PNet/', '../model/RNet/', '../model/ONet'] if size == 12: net = 'PNet' save_size = 24 elif size == 24: net = 'RNet' save_size = 48 # 圖片數據地址 base_dir = '../data/WIDER_train/' # 處理后的圖片存放地址 data_dir = '../data/%d' % save_size neg_dir = os.path.join(data_dir, 'negative') pos_dir = os.path.join(data_dir, 'positive') part_dir = os.path.join(data_dir, 'part') for dir_path in [neg_dir, pos_dir, part_dir]: if not os.path.exists(dir_path): os.makedirs(dir_path) detectors = [None, None, None] PNet = FcnDetector(P_Net, model_path[0]) detectors[0] = PNet if net == 'RNet': RNet = Detector(R_Net, 24, batch_size[1], model_path[1]) detectors[1] = RNet basedir = '../data/' filename = '../data/wider_face_train_bbx_gt.txt' # 讀取文件的image和box對應函數在utils中 data = read_annotation(base_dir, filename) mtcnn_detector = MtcnnDetector(detectors, min_face_size=min_face_size, stride=stride, threshold=thresh) save_path = data_dir save_file = os.path.join(save_path, 'detections.pkl') if not os.path.exists(save_file): # 將data制作成迭代器 print('載入數據') test_data = TestLoader(data['images']) detectors, _ = mtcnn_detector.detect_face(test_data) print('完成識別') with open(save_file, 'wb') as f: pickle.dump(detectors, f, 1) print('開始生成圖像') save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path) def save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path): """ 將網絡識別的box用來裁剪原圖像作為下一個網絡的輸入 """ im_idx_list = data['images'] gt_boxes_list = data['bboxes'] num_of_images = len(im_idx_list) neg_label_file = "../data/%d/neg_%d.txt" % (save_size, save_size) neg_file = open(neg_label_file, 'w') pos_label_file = "../data/%d/pos_%d.txt" % (save_size, save_size) pos_file = open(pos_label_file, 'w') part_label_file = "../data/%d/part_%d.txt" % (save_size, save_size) part_file = open(part_label_file, 'w') # read detect result det_boxes = pickle.load(open(os.path.join(save_path, 'detections.pkl'), 'rb')) # print(len(det_boxes), num_of_images) assert len(det_boxes) == num_of_images, "弄錯了" n_idx = 0 p_idx = 0 d_idx = 0 image_done = 0 for im_idx, dets, gts in tqdm(zip(im_idx_list, det_boxes, gt_boxes_list)): gts = np.array(gts, dtype=np.float32).reshape(-1, 4) image_done += 1 if dets.shape[0] == 0: continue img = cv2.imread(im_idx) # 轉換成正方形 dets = convert_to_square(dets) dets[:, 0:4] = np.round(dets[:, 0:4]) neg_num = 0 for box in dets: x_left, y_top, x_right, y_bottom, _ = box.astype(int) width = x_right - x_left + 1 height = y_bottom - y_top + 1 # 除去過小的box框 if width < 20 or x_left < 0 or y_top < 0 or x_right > img.shape[1] - 1 or y_bottom > img.shape[0] - 1: continue Iou = IOU(box, gts) cropped_im = img[y_top:y_bottom + 1, x_left:x_right + 1, :] # 截取圖片得到box. resized_im = cv2.resize(cropped_im, (save_size, save_size), interpolation=cv2.INTER_LINEAR) # 划分種類, 選取60張neg人臉框用於后續網絡的訓練. if np.max(Iou) < 0.3 and neg_num < 60: save_file = os.path.join(neg_dir, "%s.jpg" % n_idx) neg_file.write(save_file + ' 0\n') cv2.imwrite(save_file, resized_im) n_idx += 1 neg_num += 1 else: idx = np.argmax(Iou) # 獲取IOU最大的索引 assigned_gt = gts[idx] # 得到IOU最大的人臉框 x1, y1, x2, y2 = assigned_gt # 偏移量 offset_x1 = (x1 - x_left) / float(width) offset_y1 = (y1 - y_top) / float(height) offset_x2 = (x2 - x_right) / float(width) offset_y2 = (y2 - y_bottom) / float(height) # pos和part if np.max(Iou) >= 0.65: save_file = os.path.join(pos_dir, "%s.jpg" % p_idx) pos_file.write(save_file + ' 1 %.2f %.2f %.2f %.2f\n' % ( offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) p_idx += 1 elif np.max(Iou) >= 0.4: save_file = os.path.join(part_dir, "%s.jpg" % d_idx) part_file.write(save_file + ' -1 %.2f %.2f %.2f %.2f\n' % ( offset_x1, offset_y1, offset_x2, offset_y2)) cv2.imwrite(save_file, resized_im) d_idx += 1 neg_file.close() part_file.close() pos_file.close() def parse_arguments(argv): parser = argparse.ArgumentParser() parser.add_argument('input_size', type=int, help='The input size for specific net') return parser.parse_args(argv) if __name__ == '__main__': main(parse_arguments(sys.argv[1:]))
訓練過程如下:
source activate tensorflow
進入到preprocess目錄下:
python gen_12net_data.py生成三種pnet數據
python gen_landmark_aug.py 12 生成pnet的landmark數據
python gen_imglist_pnet.py整理到一起
python gen_tfrecords.py 12生成tfrecords文件
進入到train目錄下:
python train.py 12 訓練pnet
tensorboard顯示loss:
進入到preprocess目錄:
python gen_hard_example.py 12 生成三種rnet數據,
python gen_landmark_aug.py 24 生成rnet的landmark數據,
python gen_tfrecords.py 24生成tfrecords文件
將目錄cd到train上python train.py 24 訓練rnet
將目錄cd到preprocess上,
python gen_hard_example.py 24 生成三種onet數據,
python gen_landmark_aug.py 48 生成onet的landmark數據,
python gen_tfrecords.py 48生成tfrecords文件
將目錄cd到train上python train.py 48 訓練onet
測試文件:
test.py

# coding: utf-8 import sys from detection.MtcnnDetector import MtcnnDetector from detection.detector import Detector from detection.fcn_detector import FcnDetector from train.model import P_Net, R_Net, O_Net import cv2 import os import numpy as np import train.config as config test_mode = config.test_mode thresh = config.thresh min_face_size = config.min_face stride = config.stride detectors = [None, None, None] # 模型放置位置 model_path = ['model/PNet/', 'model/RNet/', 'model/ONet'] batch_size = config.batches PNet = FcnDetector(P_Net, model_path[0]) detectors[0] = PNet if test_mode in ["RNet", "ONet"]: RNet = Detector(R_Net, 24, batch_size[1], model_path[1]) detectors[1] = RNet if test_mode == "ONet": ONet = Detector(O_Net, 48, batch_size[2], model_path[2]) detectors[2] = ONet mtcnn_detector = MtcnnDetector(detectors=detectors, min_face_size=min_face_size, stride=stride, threshold=thresh) out_path = config.out_path if config.input_mode == '1': # 選用圖片 path = config.test_dir # print(path) for item in os.listdir(path): img_path = os.path.join(path, item) img = cv2.imread(img_path) boxes_c, landmarks = mtcnn_detector.detect(img) for i in range(boxes_c.shape[0]): bbox = boxes_c[i, :4] score = boxes_c[i, 4] corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])] # 畫人臉框 cv2.rectangle(img, (corpbbox[0], corpbbox[1]), (corpbbox[2], corpbbox[3]), (255, 0, 0), 1) # 判別為人臉的置信度 cv2.putText(img, '{:.2f}'.format(score), (corpbbox[0], corpbbox[1] - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # 畫關鍵點 for i in range(landmarks.shape[0]): for j in range(len(landmarks[i])//2): cv2.circle(img, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255)) cv2.imshow('im', img) k = cv2.waitKey(0) & 0xFF if k == 27: cv2.imwrite(out_path + item, img) cv2.destroyAllWindows() if config.input_mode == '2': cap = cv2.VideoCapture(0) fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(out_path+'out.mp4', fourcc, 10, (640, 480)) while True: t1 = cv2.getTickCount() ret, frame = cap.read() if ret: boxes_c, landmarks = mtcnn_detector.detect(frame) t2 = cv2.getTickCount() t = (t2-t1)/cv2.getTickFrequency() fps = 1.0/t for i in range(boxes_c.shape[0]): bbox = boxes_c[i, :4] score = boxes_c[i, 4] corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])] # 畫人臉框 cv2.rectangle(frame, (corpbbox[0], corpbbox[1]), (corpbbox[2], corpbbox[3]), (255, 0, 0), 1) # 畫置信度 cv2.putText(frame, '{:.2f}'.format(score), (corpbbox[0], corpbbox[1] - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # 畫fps值 cv2.putText(frame, '{:.4f}'.format(t) + " " + '{:.3f}'.format(fps), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2) # 畫關鍵點 for i in range(landmarks.shape[0]): for j in range(len(landmarks[i])//2): cv2.circle(frame, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255)) a = out.write(frame) cv2.imshow("result", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break else: break cap.release() out.release() cv2.destroyAllWindows()
其中使用到的模塊:
detector.py

# coding: utf-8 import tensorflow as tf import numpy as np class Detector: """ 識別多組圖片 """ def __init__(self, net_factory, data_size, batch_size, model_path): graph = tf.Graph() with graph.as_default(): self.image_op = tf.placeholder(tf.float32, [None, data_size, data_size, 3]) self.cls_prob, self.bbox_pred, self.landmark_pred = net_factory(self.image_op, training=False) self.sess = tf.Session() # 重載模型 saver = tf.train.Saver() model_file = tf.train.latest_checkpoint(model_path) saver.restore(self.sess, model_file) self.data_size = data_size self.batch_size = batch_size def predict(self, databatch): scores = [] batch_size = self.batch_size minibatch = [] cur = 0 # 所有數據總數 n = databatch.shape[0] # 將數據整理成固定batch while cur < n: minibatch.append(databatch[cur:min(cur+batch_size, n), :, :, :]) cur += batch_size cls_prob_list = [] bbox_pred_list = [] landmark_pred_list = [] for idx, data in enumerate(minibatch): m = data.shape[0] real_size = self.batch_size # 最后一組數據不夠一個batch的處理 if m < batch_size: keep_inds = np.arange(m) gap = self.batch_size-m while gap >= len(keep_inds): gap -= len(keep_inds) keep_inds = np.concatenate((keep_inds, keep_inds)) if gap != 0: keep_inds = np.concatenate((keep_inds, keep_inds[:gap])) data = data[keep_inds] real_size = m cls_prob, bbox_pred, landmark_pred = self.sess.run([self.cls_prob, self.bbox_pred, self.landmark_pred], feed_dict={self.image_op: data}) cls_prob_list.append(cls_prob[:real_size]) bbox_pred_list.append(bbox_pred[:real_size]) landmark_pred_list.append(landmark_pred[:real_size]) return np.concatenate(cls_prob_list, axis=0), np.concatenate(bbox_pred_list, axis=0), np.concatenate(landmark_pred_list, axis=0)
fcn_detector.py

# coding: utf-8 import tensorflow as tf import sys sys.path.append('../') import train.config as config class FcnDetector: """ 識別單張圖片 """ def __init__(self, net_factory, model_path): graph = tf.Graph() with graph.as_default(): self.image_op = tf.placeholder(tf.float32, name='input_image') self.width_op = tf.placeholder(tf.int32, name='image_width') self.height_op = tf.placeholder(tf.int32, name='image_height') image_reshape = tf.reshape(self.image_op, [1, self.height_op, self.width_op, 3]) # 預測值 self.cls_prob, self.bbox_pred, _ = net_factory(image_reshape, training=False) self.sess = tf.Session() # 重載模型 saver = tf.train.Saver() model_file = tf.train.latest_checkpoint(model_path) saver.restore(self.sess, model_file) def predict(self, databatch): height, width, _ = databatch.shape cls_prob, bbox_pred = self.sess.run([self.cls_prob, self.bbox_pred], feed_dict={self.image_op: databatch, self.width_op: width, self.height_op: height}) return cls_prob, bbox_pred
MtcnnDetector.py

# coding: utf-8 import cv2 import numpy as np import sys sys.path.append('../') from preprocess.utils import * from tqdm import tqdm def py_nms(dets, thresh): """ 剔除太相似的box """ x1 = dets[:, 0] y1 = dets[:, 1] x2 = dets[:, 2] y2 = dets[:, 3] scores = dets[:, 4] areas = (x2 - x1 + 1) * (y2 - y1 + 1) # 將概率值從大到小排列 order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter+1e-10) # 保留小於閾值的下標,因為order[0]拿出來做比較了,所以inds+1是原來對應的下標 inds = np.where(ovr <= thresh)[0] order = order[inds + 1] return keep class MtcnnDetector: """ 來生成人臉的圖像 """ def __init__(self, detectors, min_face_size=20, stride=2, threshold=[0.6, 0.7, 0.7], scale_factor=0.79 # 圖像金字塔的縮小率 ): self.pnet_detector = detectors[0] self.rnet_detector = detectors[1] self.onet_detector = detectors[2] self.min_face_size = min_face_size self.stride = stride self.thresh = threshold self.scale_factor = scale_factor def detect_face(self, test_data): all_boxes = [] landmarks = [] batch_idx = 0 num_of_img = test_data.size empty_array = np.array([]) for databatch in tqdm(test_data): batch_idx += 1 im = databatch if self.pnet_detector: boxes, boxes_c, landmark = self.detect_pnet(im) if boxes_c is None: all_boxes.append(empty_array) landmarks.append(empty_array) continue if self.rnet_detector: boxes, boxes_c, landmark = self.detect_rnet(im, boxes_c) if boxes_c is None: all_boxes.append(empty_array) landmarks.append(empty_array) continue if self.onet_detector: boxes, boxes_c, landmark = self.detect_onet(im, boxes_c) if boxes_c is None: all_boxes.append(empty_array) landmarks.append(empty_array) continue all_boxes.append(boxes_c) landmark = [1] landmarks.append(landmark) return all_boxes, landmarks def detect_pnet(self, im): """ 通過PNet篩選box和landmark 參數: im:輸入圖像[h,2,3] """ h, w, c = im.shape net_size = 12 # 人臉和輸入圖像的比率 current_scale = float(net_size) / self.min_face_size im_resized = self.processed_image(im, current_scale) current_height, current_width, _ = im_resized.shape all_boxes = list() # 圖像金字塔, 不斷地去resize圖片 while min(current_height, current_width) > net_size: # 類別和box # 這里是測試流程, 輸入是一張圖片(size不一定是12*12) # 因此這里面輸出得到的cls_cls_map形狀是feature map(n * m * 2) # reg形狀是是(n * m * 4) cls_cls_map, reg = self.pnet_detector.predict(im_resized) boxes = self.generate_bbox(cls_cls_map[:, :, 1], reg, current_scale, self.thresh[0]) current_scale *= self.scale_factor # 繼續縮小圖像做金字塔 im_resized = self.processed_image(im, current_scale) current_height, current_width, _ = im_resized.shape if boxes.size == 0: continue # 非極大值抑制留下重復低的box keep = py_nms(boxes[:, :5], 0.5) boxes = boxes[keep] all_boxes.append(boxes) if len(all_boxes) == 0: return None, None, None all_boxes = np.vstack(all_boxes) # 將金字塔之后的box也進行非極大值抑制 keep = py_nms(all_boxes[:, 0:5], 0.7) all_boxes = all_boxes[keep] boxes = all_boxes[:, :5] # box的長寬 bbw = all_boxes[:, 2] - all_boxes[:, 0] + 1 bbh = all_boxes[:, 3] - all_boxes[:, 1] + 1 # 對應原圖的box坐標和分數, 訓練數據是相對於人臉框bbox的歸一化的offset, 因此這里dx、dy也都是歸一化的. boxes_c = np.vstack([all_boxes[:, 0] + all_boxes[:, 5] * bbw, # all_boxes[:, 5]--> dx1 all_boxes[:, 1] + all_boxes[:, 6] * bbh, # all_boxes[:, 6]--> dy1 all_boxes[:, 2] + all_boxes[:, 7] * bbw, # all_boxes[:, 7]--> dx2 all_boxes[:, 3] + all_boxes[:, 8] * bbh, # all_boxes[:, 8]--> dy2 all_boxes[:, 4]]) boxes_c = boxes_c.T return boxes, boxes_c, None def detect_rnet(self, im, dets): """ 通過rent選擇box 參數: im:輸入圖像 dets: PNet選擇的box,是相對原圖的絕對坐標 返回值: box絕對坐標 """ h, w, c = im.shape # 將PNet的box變成包含它的正方形,可以避免信息損失 dets = convert_to_square(dets) dets[:, 0:4] = np.round(dets[:, 0:4]) # 調整超出圖像的box [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h) delete_size = np.ones_like(tmpw)*20 ones = np.ones_like(tmpw) zeros = np.zeros_like(tmpw) num_boxes = np.sum(np.where((np.minimum(tmpw, tmph) >= delete_size), ones, zeros)) cropped_ims = np.zeros((num_boxes, 24, 24, 3), dtype=np.float32) for i in range(num_boxes): # 將PNet生成的box相對與原圖進行裁剪, 超出部分用0補 if tmph[i] < 20 or tmpw[i] < 20: continue tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8) tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] cropped_ims[i, :, :, :] = (cv2.resize(tmp, (24, 24)) - 127.5) / 128 cls_scores, reg, _ = self.rnet_detector.predict(cropped_ims) cls_scores = cls_scores[:, 1] keep_inds = np.where(cls_scores > self.thresh[1])[0] if len(keep_inds) > 0: boxes = dets[keep_inds] boxes[:, 4] = cls_scores[keep_inds] reg = reg[keep_inds] else: return None, None, None keep = py_nms(boxes, 0.6) boxes = boxes[keep] # 對PNet截取的圖像的坐標進行校准,生成RNet的人臉框對於原圖的絕對坐標 boxes_c = self.calibrate_box(boxes, reg[keep]) return boxes, boxes_c, None def detect_onet(self, im, dets): """ 將ONet的選框繼續篩選基本和RNet差不多但多返回了landmark """ h, w, c = im.shape dets = convert_to_square(dets) dets[:, 0:4] = np.round(dets[:, 0:4]) [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h) num_boxes = dets.shape[0] cropped_ims = np.zeros((num_boxes, 48, 48, 3), dtype=np.float32) for i in range(num_boxes): tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8) tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] cropped_ims[i, :, :, :] = (cv2.resize(tmp, (48, 48)) - 127.5) / 128 cls_scores, reg, landmark = self.onet_detector.predict(cropped_ims) cls_scores = cls_scores[:, 1] keep_inds = np.where(cls_scores > self.thresh[2])[0] if len(keep_inds) > 0: boxes = dets[keep_inds] boxes[:, 4] = cls_scores[keep_inds] reg = reg[keep_inds] landmark = landmark[keep_inds] else: return None, None, None w = boxes[:, 2] - boxes[:, 0] + 1 h = boxes[:, 3] - boxes[:, 1] + 1 landmark[:, 0::2] = (np.tile(w, (5, 1)) * landmark[:, 0::2].T + np.tile(boxes[:, 0], (5, 1)) - 1).T landmark[:, 1::2] = (np.tile(h, (5, 1)) * landmark[:, 1::2].T + np.tile(boxes[:, 1], (5, 1)) - 1).T boxes_c = self.calibrate_box(boxes, reg) boxes = boxes[py_nms(boxes, 0.6)] keep = py_nms(boxes_c, 0.6) boxes_c = boxes_c[keep] landmark = landmark[keep] return boxes, boxes_c, landmark def processed_image(self, img, scale): """ 預處理數據,轉化圖像尺度並對像素歸一到[-1, 1] """ height, width, channels = img.shape new_height = int(height * scale) new_width = int(width * scale) new_dim = (new_width, new_height) img_resized = cv2.resize(img, new_dim, interpolation=cv2.INTER_LINEAR) img_resized = (img_resized - 127.5) / 128 return img_resized def generate_bbox(self, cls_map, reg, scale, threshold): """ 得到對應原圖的box坐標,分類分數,box偏移量 cls_map: n * m(輸入是cls_cls_map[:, :, 1], 第一維, 人臉框的概率.) reg: n * m * 4 """ # pnet大致將圖像size縮小2倍 stride = 2 cellsize = 12 # 將置信度高的留下, 即為預測的人臉框. 二維的. t_index = np.where(cls_map > threshold) # 沒有人臉, 這里也可以是t_index[1].size # 使用np.where(二維數組), 得到包括兩個元素的列表, 第一個元素是第一維的坐標, 第二個元素是第二維的坐標. if t_index[0].size == 0: return np.array([]) # 偏移量 dx1, dy1, dx2, dy2 = [reg[t_index[0], t_index[1], i] for i in range(4)] reg = np.array([dx1, dy1, dx2, dy2]) score = cls_map[t_index[0], t_index[1]] # 對應原圖的box坐標,分類分數,box偏移量 # 原始圖片中回歸框坐標需要經過反向運算,計算方式如下,其中cellSize=12,是因為12*12的圖片進去后變成1*1 # stride=2是因為幾層卷積中只有一個stride為2,scale代表的是我們在哪個尺度金字塔的圖像, boundingbox = np.vstack([np.round((stride * t_index[1]) / scale), np.round((stride * t_index[0]) / scale), np.round((stride * t_index[1] + cellsize) / scale), np.round((stride * t_index[0] + cellsize) / scale), score, reg]) # shape[n,9] return boundingbox.T def pad(self, bboxes, w, h): """ 將超出圖像的box進行處理 參數: bboxes: 人臉框 w, h: 圖像長寬 返回值: dy, dx : 為調整后的box的左上角坐標相對於原box左上角的坐標 edy, edx : n為調整后的box右下角相對原box左上角的相對坐標 y, x : 調整后的box在原圖上左上角的坐標 ey, ex : 調整后的box在原圖上右下角的坐標 tmph, tmpw: 原始box的長寬 """ # box的長寬 tmpw, tmph = bboxes[:, 2] - bboxes[:, 0] + 1, bboxes[:, 3] - bboxes[:, 1] + 1 num_box = bboxes.shape[0] dx, dy = np.zeros((num_box, )), np.zeros((num_box, )) edx, edy = tmpw.copy() - 1, tmph.copy() - 1 # box左上右下的坐標 x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3] # 找到超出右下邊界的box並將ex, ey歸為圖像的w, h # edx, edy為調整后的box右下角相對原box左上角的相對坐標 tmp_index = np.where(ex > w - 1) # w -1 + tmpw -1 - edx= ex -> edx = w + tmpw - ex - 2 edx[tmp_index] = tmpw[tmp_index] + w - 2 - ex[tmp_index] ex[tmp_index] = w - 1 tmp_index = np.where(ey > h - 1) # h -1 + tmph -1 - edy = ey -> edy = h + tmph - ey - 2 edy[tmp_index] = tmph[tmp_index] + h - 2 - ey[tmp_index] ey[tmp_index] = h - 1 # 找到超出左上角的box並將x,y歸為0 # dx, dy為調整后的box的左上角坐標相對於原box左上角的坐標 tmp_index = np.where(x < 0) dx[tmp_index] = 0 - x[tmp_index] x[tmp_index] = 0 tmp_index = np.where(y < 0) dy[tmp_index] = 0 - y[tmp_index] y[tmp_index] = 0 return_list = [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] return_list = [item.astype(np.int32) for item in return_list] return return_list def calibrate_box(self, bbox, reg): """ 校准box 參數: bbox: PNet生成的box reg: RNet生成的box偏移值 返回值: 調整后的box是針對原圖的絕對坐標 """ bbox_c = bbox.copy() w = bbox[:, 2] - bbox[:, 0] + 1 w = np.expand_dims(w, 1) h = bbox[:, 3] - bbox[:, 1] + 1 h = np.expand_dims(h, 1) reg_m = np.hstack([w, h, w, h]) aug = reg_m * reg bbox_c[:, 0:4] = bbox_c[:, 0:4] + aug return bbox_c def detect(self, img): """ 用於測試當個圖像的 """ boxes = None # PNet if self.pnet_detector: boxes, boxes_c, _ = self.detect_pnet(img) if boxes_c is None: return np.array([]), np.array([]) # RNet if self.rnet_detector: boxes, boxes_c, _ = self.detect_rnet(img, boxes_c) if boxes_c is None: return np.array([]), np.array([]) # ONet if self.onet_detector: boxes, boxes_c, landmark = self.detect_onet(img, boxes_c) if boxes_c is None: return np.array([]), np.array([]) return boxes_c, landmark
測試驗證過程:
python test.py
結果:
圖片數據來源網絡,僅供學習使用,如有侵權,請聯系刪除,謝謝!
參考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html