基於MTCNN算法的人臉檢測


論文:《Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Networks》

論文網址:https://arxiv.org/abs/1604.02878v1

一、總體框架

MTCNN通過不同的卷積神經網絡,實現對人臉的識別以及人臉關鍵點檢測。總的框架如下:

                                                                    圖1 Pipeline

如圖1所示為MTCNN的整體框架(檢測實現流程—測試流程)。

給定一張圖片,需要將其resize成不同大小的圖片,建立圖像金字塔。這些不同size的圖片是下面三個stage的輸入。

stage1:首先使用全卷積網絡(P-Net)獲取候選框和他們的回歸向量。然后使用估計的bounding box回歸向量去標定候選框。然后再使用非極大值抑制(NMS)去合並高度重疊的候選框;

stage2:這一層使用一個提煉網絡(Refine Network, R-Net)。所有stage1中的候選框傳入R-Net,使用邊界框回歸(bounding box regression)以及NMS,使得消除更多假的候選框(false candidates);

stage3:stage3使用一個輸出網絡(O-Net),該階段與stage2相似。但是在這個階段,我們的目標是更加詳細地描述人臉。尤其是該網絡將要輸出5個人臉標記位置(facial landmarks' positions)。

 

二、CNN結構:

 許多的論文都設計CNN用於人臉檢測。但是,這些論文都受到以下幾個原因的限制:

1)許多filter缺乏權重多樣性限制了他們產生有判別力的描述。

2)相比於其他的多分類物體檢測和分類任務,人臉檢測是一個具有挑戰性的二分類任務,因此可能需要更少數量的filters但是需要對人臉更具有辨別力。為了這個目的,我們減少filter的數量,將5×5的filter變成3×3的filter,減少計算量,盡管增加filter的深度能夠獲取到更好的性能。通過這些改進,我們可以得到更好的性能,但是運行時間變少。CNN的結構如圖2所示。

                                        圖2 CNN的結構(MP-max polling,Conv—convolution, 卷積和池化的step分別為1和2)

 

三、訓練

使用三個tasks訓練CNN detector,分別為:人臉/非人臉分類,邊界框回歸以及人臉標記定位。

1)人臉分類

學習目標可以表述為二分類任務。對於每個樣本$x_{i}$,我們使用交叉熵損失函數(cross-entropy loss):

$L_{i}^{det}=-(y_{i}^{det}log(p_{i})+(1-y_{i}^{det})(1-log(p_{i})))$    (1)

其中,$p_{i}$是神經網絡輸出的概率,表示了一個樣本是人臉的概率。$y_{i}^{det}\in \left \{0,  1\right \}$,表示ground truth的標簽。

2)邊界框回歸

對於每一個候選框,我們需要預測它與最近的ground truth的偏移,包括:左上坐標、高度和寬度。學習目標可以表述為回歸問題,對於每一個樣本$x_{i}$,我們使用歐式損失(Euclidean loss):

$L_{i}^{box}=\left \| \hat{y}_{i}^{box} - y_{i}^{box}\right \|_{2}^{2}$    (2)

其中,$\hat{y}_{i}^{box}$回歸目標是從神經網絡獲得的(即網絡的輸出),$y_{i}^{box}$是ground truth。有4個坐標,包括:左上、高度和寬度,因此$y_{i}^{box}\in \mathbb{R}^{4}$。

3)人臉標記定位

和邊界框回歸任務相似,人臉標記定位可以表示為回歸任務問題,使用最小化歐式損失:

$L_{i}^{landmark}=\left \| \hat{y}_{i}^{landmark} - y_{i}^{landmark} \right \|_{2}^{2}$   (3)

其中,$\hat{y}_{i}^{landmark}$是從神經網絡輸出獲得的人臉標記坐標,$ y_{i}^{landmark}$是ground truth。因為有5個人臉標記,包括:左眼睛、右眼睛、鼻子、嘴巴左邊界和嘴巴右邊界,因此$y_{i}^{landmark}\in \mathbb{R}^{10}$。

4)多數據源訓練

因為我們在不同的CNN中執行不同的任務,所以在訓練過程中,使用不同類型的訓練圖像數據,例如:人臉、非人臉和部分人臉數據。所以,一些損失函數(1-3公式)不會使用。例如,對於背景區域,我們僅僅計算$L_{i}^{det}$,另外兩個損失設置為0,這個可以使用采樣類型指示器實現。總的學習目標可以表示為:

$min\sum _{i=1}^{N}\sum _{j\in det,box,landmark}\alpha _{j}\beta _{i}^{j}L_{i}^{j}$    (4)

其中,$N$為訓練樣本的數量。$\alpha_{j}$表示人物的重要性(分別設置為:P-Net和R-Net中,$\alpha_{det}=1,\alpha_{box}=0.5,\alpha_{landmark}=0.5$;O-Net為了獲得更加精確的人臉標記點定位,在O-Net中,$\alpha_{det}=1,\alpha_{box}=0.5,\alpha_{landmark}=1$)。$\beta _{i}^{j}\in \left \{ 0,1 \right \}$為采樣類型指示器。使用隨機梯度下降算法(SGD)訓練CNNs。

5)在線困難樣本挖掘

 不同於在原始分類訓練完成之后執行傳統的困難樣本挖掘,我們采用在線困難樣挖掘適應訓練過程。

特別地,我們對前向傳播過程計算出的損失進行分類,然后只采用其中的70%作為困難樣本。然后我們在后向傳播過程中,只計算困難樣本的梯度。這也就意味着我們忽略簡單樣本,這些簡單樣本對增強訓練過程的探測功能不太有幫助。

6)訓練數據

因為我們聯合執行人臉檢測和人臉對齊,因此我們在訓練過程中使用四種不同的數據標記。分別為:

6.1負樣本:與圖片中任何一個ground truth的IOU小於0.3的區域;

6.2正樣本:與圖片中任何一個ground truth的IOU大於0.65的區域;

6.3部分人臉:IOU介於0.4和0.65之間;

6.4標記人臉:標記5個人臉標記位置的圖片;

其中,負樣本和正樣本用於人臉分類任務(即判別是人臉還是非人臉);正樣本和部分人臉用於邊界框回歸;人臉編輯樣本用於人臉標記定位。每一個網絡的訓練數據可以如下表示:

①P-Net:從WIDER FACE數據集中隨機裁剪獲取正樣本、負樣本和部分人臉樣本。然后,從CelebA數據裁剪人臉標記數據,需要resize成12×12;

②R-Net:將框架第一階段的輸出的proposal作為R-Net的輸入,需要resize成24×24;

③O-Net :輸入是經過第二步篩選和refine過的人臉框,同樣從原圖摳出后統一resize到48*48,成批輸入ONet。

后面階段都是在前面階段的基礎上對訓練結果進行調整。

 

四、測試階段

如第一節的總體架構,首先使圖像生成圖像金字塔,生成多尺度的圖像,然后輸入P-Net(因為P-Net是全卷積網絡,該網絡的輸出的featuremap上的每一個特征點都對應於輸入圖像上的12×12的區域,因此)。PNet由於尺寸很小,所以可以很快的選出候選區域,但是准確率不高,不同尺度上的判斷出來的人臉檢測框,然后采用NMS算法,合並候選框,然后根據候選框提取圖像,之后縮放到24*24的大小,作為RNet的輸入,RNet可以精確的選取邊框,一般最后只剩幾個邊框,最后縮放到48*48的大小,輸入ONet,判斷后選框是不是人臉,ONet雖然速度較慢,但是由於經過前兩個網絡,已經得到了高概率的邊框,所以輸入ONet的圖像較少,然后ONet輸出精確的邊框和關鍵點信息,只是在第三個階段上才顯示人臉特征定位;前兩個階段只是分類,不顯示人臉定點的結果。

參考:https://blog.csdn.net/wfei101/article/details/79935037

 

五、項目實踐

參考項目地址:GitHub

根據參考項目做一些調整,模型實現。

數據集下載:

這里使用的數據集是WIDER FACE以及CelebA

 

代碼講解如下:

參考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html

 

主要代碼理解如下:

 

生成P-Net數據:

gen_12net_data.py

# coding: utf-8

"""
截取pos,neg,part三種類型圖片並resize成12x12大小作為PNet的輸入
"""
import os
import cv2
import numpy as np
npr = np.random
from tqdm import  tqdm
from utils import IOU 

# face的id對應label的txt
anno_file = '../data/wider_face_train.txt'
# 圖片地址
im_dir = '../data/WIDER_train/images'
# pos,part,neg裁剪圖片放置位置
pos_save_dir = '../data/12/positive'
part_save_dir = '../data/12/part'
neg_save_dir = '../data/12/negative'
# PNet數據地址
save_dir = '../data/12'

if not os.path.exists(save_dir):
    os.mkdir(save_dir)
if not os.path.exists(pos_save_dir):
    os.mkdir(pos_save_dir)
if not os.path.exists(part_save_dir):
    os.mkdir(part_save_dir)
if not os.path.exists(neg_save_dir):
    os.mkdir(neg_save_dir)
    
f1 = open(os.path.join(save_dir, 'pos_12.txt'), 'w')
f2 = open(os.path.join(save_dir, 'neg_12.txt'), 'w')
f3 = open(os.path.join(save_dir, 'part_12.txt'), 'w')

with open(anno_file, 'r') as f:
    annotations = f.readlines()
num = len(annotations)
print('總共的圖片數: %d' % num)
# 記錄pos, neg, part三類生成數
p_idx = 0
n_idx = 0
d_idx = 0
# 記錄讀取圖片數
idx = 0
for annotation in tqdm(annotations):  # 進度條顯示
    annotation = annotation.strip().split(' ')
    im_path = annotation[0]
    box = list(map(float, annotation[1:]))
    boxes = np.array(box, dtype=np.float32).reshape(-1, 4)  # numpy.array.reshape -> 4列, 每一行是box
    
    img = cv2.imread(os.path.join(im_dir, im_path+'.jpg'))
    idx += 1
    height, width, channel = img.shape
    
    neg_num = 0
    # 先采樣一定數量neg圖片
    while neg_num < 50:
        # 隨機選取截取圖像大小
        size = npr.randint(12, min(width, height)/2)
        # 隨機選取左上坐標
        nx = npr.randint(0, width-size)
        ny = npr.randint(0, height-size)
        # 截取box
        crop_box = np.array([nx, ny, nx+size, ny+size])
        # 計算iou值
        Iou = IOU(crop_box, boxes)
        # 截取圖片並resize成12x12大小
        cropped_im = img[ny:ny+size, nx:nx+size, :]  # cv2.imread讀取的圖片第一維度是y
        resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)  # P-Net的訓練輸入圖像大小為12 × 12

        # iou值小於0.3判定為neg圖像
        if np.max(Iou) < 0.3:
            save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx)  # neg的圖片的絕對路徑
            f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0\n')  # neg_12.txt文件保存neg的圖片的絕對路徑
            cv2.imwrite(save_file, resized_im)  # 將截取的圖片保存
            n_idx += 1
            neg_num += 1
    
    for box in boxes:  # 以每個box為基礎選取截圖
        # 左上右下坐標
        x1, y1, x2, y2 = box
        w = x2 - x1 + 1
        h = y2 - y1 + 1
        # 舍去圖像過小和box在圖片外的圖像
        if max(w, h) < 20 or x1 < 0 or y1 < 0:
            continue
        for i in range(5):  # 每個box附近截取5個截圖用於判斷是否為negative訓練樣本
            size = npr.randint(12, min(width, height)/2)

            # 隨機生成的關於x1, y1的偏移量,並且保證x1+delta_x>0,y1+delta_y>0
            delta_x = npr.randint(max(-size, -x1), w)
            delta_y = npr.randint(max(-size, -y1), h)
            # 截取后的左上角坐標
            # 這里面是獲取negative的截圖, 所以可以(最好是)隨意選取, 因此左上角坐標和偏移量都是隨意選取的.
            nx1 = int(max(0, x1+delta_x))
            ny1 = int(max(0, y1+delta_y))
            # 排除大於圖片尺度的
            if nx1 + size > width or ny1 + size > height:
                continue
            crop_box = np.array([nx1, ny1, nx1+size, ny1+size])
            Iou = IOU(crop_box, boxes)
            cropped_im = img[ny1:ny1+size, nx1:nx1+size, :]
            resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)
            
            if np.max(Iou) < 0.3:
                save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx)
                f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0\n')
                cv2.imwrite(save_file, resized_im)
                n_idx += 1
        for i in range(20):  # 每個box附近截取20個截圖用於判斷是否為positive或者是part訓練樣本
            # 這里是截取positive和part圖片, 目的是需要截取box附近的圖片, 因此下面size的大小也需要接近w, h. 不然取不到positive、part的幾率大.
            size = npr.randint(int(min(w, h)*0.8), np.ceil(1.25*max(w, h)))

            # 除去尺度小的box
            # 注意:w, h是box的尺寸. width、height是整個訓練圖片的尺寸.
            if w < 5:
                continue
            # 在box附近截取圖片, 偏移量取值, 稍微小一點好.
            delta_x = npr.randint(-w*0.2, w*0.2)
            delta_y = npr.randint(-h*0.2, h*0.2)
            # 截取圖像左上坐標計算是先計算x1+w/2表示的中心坐標,再+delta_x偏移量,再-size/2,
            nx1 = int(max(x1+w/2+delta_x-size/2, 0))
            ny1 = int(max(y1+h/2+delta_y-size/2, 0))
            nx2 = nx1 + size
            ny2 = ny1 + size
            
            # 排除超出的圖像
            if nx2 > width or ny2 > height:
                continue
            crop_box = np.array([nx1, ny1, nx2, ny2])
            # 人臉框相對於截取圖片的偏移量並做歸一化處理
            # 這里訓練數據使用相對於人臉框歸一化處理的offset, 實際測試時得到的也是歸一化的offset. 因此訓練就是獲取歸一化的offset.
            offset_x1 = (x1-nx1)/float(size)
            offset_y1 = (y1-ny1)/float(size)
            offset_x2 = (x2-nx2)/float(size)
            offset_y2 = (y2-ny2)/float(size)
            
            cropped_im = img[ny1:ny2, nx1:nx2, :]
            resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)
            # box擴充一個維度作為iou輸入
            box_ = box.reshape(1, -1)  # 這里是每一個box, 對每一個box和截取的圖像進行IOU計算
            iou = IOU(crop_box, box_)
            if iou >= 0.65:
                save_file = os.path.join(pos_save_dir, '%s.jpg'%p_idx)
                f1.write(pos_save_dir+'/%s.jpg'%p_idx+' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1,
                         offset_y1, offset_x2, offset_y2))
                cv2.imwrite(save_file, resized_im)
                p_idx += 1
            elif iou >= 0.4:
                save_file = os.path.join(part_save_dir, '%s.jpg'%d_idx)
                f3.write(part_save_dir+'/%s.jpg'%d_idx+' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1,
                         offset_y1, offset_x2, offset_y2))
                cv2.imwrite(save_file, resized_im)
                d_idx += 1

print('%s 個圖片已處理,pos:%s  part: %s neg:%s' %(idx, p_idx, d_idx, n_idx))
f1.close()
f2.close()
f3.close()
View Code

 

生成landmark數據 :

gen_landmark_aug.py

# coding: utf-8

import os
import random
import sys
import cv2
import numpy as np
npr = np.random
import argparse
from tqdm import tqdm
from utils import IOU
from BBox_utils import getDataFromTxt, BBox
data_dir = '../data'


def main(args):
    """
    用於處理帶有landmark的數據
    """
    size = args.input_size
    # 是否對圖像變換
    argument = True
    if size == 12:
        net = 'PNet'
    elif size == 24:
        net = 'RNet'
    elif size == 48:
        net = 'ONet'
    image_id = 0
    # 數據輸出路徑
    OUTPUT = os.path.join(data_dir, str(size))
    if not os.path.exists(OUTPUT):
        os.mkdir(OUTPUT)
    # 圖片處理后輸出路徑
    dstdir = os.path.join(OUTPUT, 'train_%s_landmark_aug' %(net))
    if not os.path.exists(dstdir):
        os.mkdir(dstdir)
    # label記錄txt
    ftxt = os.path.join(data_dir, 'trainImageList.txt')  # trainImageList.txt記錄了CelebA數據的路徑以及關鍵點信息.
    # 記錄label的txt
    f = open(os.path.join(OUTPUT, 'landmark_%d_aug.txt' %(size)), 'w')
    # 獲取圖像路徑,box,關鍵點
    data = getDataFromTxt(ftxt, data_dir)
    idx = 0
    for (imgPath, box, landmarkGt) in tqdm(data):
        # 存儲人臉圖片和關鍵點
        F_imgs = []
        F_landmarks = []
        img = cv2.imread(imgPath)
        
        img_h, img_w, img_c = img.shape
        gt_box = np.array([box.left, box.top, box.right, box.bottom])
        # 人臉圖片
        f_face = img[box.top:box.bottom+1, box.left:box.right+1]
        # resize成網絡輸入大小
        f_face = cv2.resize(f_face, (size, size))
        
        landmark = np.zeros((5, 2))
        for index, one in enumerate(landmarkGt):
            # 關鍵點相對於左上坐標偏移量並歸一化
            rv = ((one[0]-gt_box[0])/(gt_box[2]-gt_box[0]), (one[1]-gt_box[1])/(gt_box[3]-gt_box[1]))
            landmark[index] = rv
        F_imgs.append(f_face)
        F_landmarks.append(landmark.reshape(10))
        landmark = np.zeros((5, 2))
        if argument:
            # 對圖像變換
            idx = idx+1
            x1, y1, x2, y2 = gt_box
            gt_w = x2 - x1 + 1
            gt_h = y2 - y1 + 1
            # 除去過小的人臉圖像
            if max(gt_w, gt_h) < 40 or x1 < 0 or y1 < 0:
                continue
            for i in range(10):
                # 隨機裁剪圖像大小
                # 每張圖片截取10個, x下面計算方法類似於在positive和part的截圖過程.
                box_size = npr.randint(int(min(gt_w, gt_h)*0.8), np.ceil(1.25*max(gt_w, gt_h)))
                # 隨機左上坐標偏移量
                delta_x = npr.randint(-gt_w*0.2, gt_w*0.2)
                delta_y = npr.randint(-gt_h*0.2, gt_h*0.2)
                # 計算左上坐標
                nx1 = int(max(x1+gt_w/2-box_size/2+delta_x, 0))
                ny1 = int(max(y1+gt_h/2-box_size/2+delta_y, 0))
                nx2 = nx1 + box_size
                ny2 = ny1 + box_size
                # 除去超過邊界的
                if nx2 > img_w or ny2 > img_h:
                    continue
                # 裁剪邊框, 圖片
                crop_box = np.array([nx1, ny1, nx2, ny2])
                cropped_im = img[ny1:ny2+1, nx1:nx2+1, :]
                resized_im = cv2.resize(cropped_im, (size, size))
                iou = IOU(crop_box, np.expand_dims(gt_box, 0))  # 擴展數組形狀. -> 1 * 1 * 4
                # 只保留pos圖像
                if iou > 0.65:
                    F_imgs.append(resized_im)
                    # 關鍵點相對偏移
                    for index, one in enumerate(landmarkGt):
                        rv = ((one[0]-nx1)/box_size, (one[1]-ny1)/box_size)
                        landmark[index] = rv
                    F_landmarks.append(landmark.reshape(10))
                    landmark = np.zeros((5, 2))
                    landmark_ = F_landmarks[-1].reshape(-1, 2)
                    box = BBox([nx1, ny1, nx2, ny2])
                    # 鏡像
                    if random.choice([0, 1]) > 0:
                        face_flipped, landmark_flipped = flip(resized_im, landmark_)
                        face_flipped = cv2.resize(face_flipped, (size, size))
                        F_imgs.append(face_flipped)
                        F_landmarks.append(landmark_flipped.reshape(10))
                    # 逆時針翻轉
                    if random.choice([0, 1]) > 0:
                        face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), 5)
                        # 關鍵點偏移
                        landmark_rorated = box.projectLandmark(landmark_rorated)
                        face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size))
                        F_imgs.append(face_rotated_by_alpha)
                        F_landmarks.append(landmark_rorated.reshape(10))
                        
                        # 左右翻轉
                        face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated)
                        face_flipped = cv2.resize(face_flipped, (size, size))
                        F_imgs.append(face_flipped)
                        F_landmarks.append(landmark_flipped.reshape(10))
                    # 順時針翻轉
                    if random.choice([0, 1]) > 0:
                        face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), -5)
                        # 關鍵點偏移
                        landmark_rorated = box.projectLandmark(landmark_rorated)
                        face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size))
                        F_imgs.append(face_rotated_by_alpha)
                        F_landmarks.append(landmark_rorated.reshape(10))
                        
                        # 左右翻轉
                        face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated)
                        face_flipped = cv2.resize(face_flipped, (size, size))
                        F_imgs.append(face_flipped)
                        F_landmarks.append(landmark_flipped.reshape(10))

        F_imgs, F_landmarks = np.asarray(F_imgs), np.asarray(F_landmarks)
        for i in range(len(F_imgs)):
            # 剔除數據偏移量在[0,1]之間
            if np.sum(np.where(F_landmarks[i] <= 0, 1, 0)) > 0:
                continue
            if np.sum(np.where(F_landmarks[i] >= 1, 1, 0)) > 0:
                continue
            cv2.imwrite(os.path.join(dstdir, '%d.jpg' %(image_id)), F_imgs[i])
            landmarks = list(map(str, list(F_landmarks[i])))
            f.write(os.path.join(dstdir, '%d.jpg' %(image_id))+' -2 '+' '.join(landmarks)+'\n')
            image_id += 1
    f.close()
    return F_imgs, F_landmarks


def flip(face, landmark):
    # 鏡像
    face_flipped_by_x = cv2.flip(face, 1)
    landmark_ = np.asarray([(1-x, y) for (x, y) in landmark])
    landmark_[[0, 1]] = landmark_[[1, 0]]
    landmark_[[3, 4]] = landmark_[[4, 3]]
    return (face_flipped_by_x, landmark_)


def rotate(img, box, landmark, alpha):
    # 旋轉
    center = ((box.left+box.right)/2, (box.top+box.bottom)/2)
    rot_mat = cv2.getRotationMatrix2D(center, alpha, 1)
    img_rotated_by_alpha = cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0]))
    landmark_ = np.asarray([(rot_mat[0][0]*x+rot_mat[0][1]*y+rot_mat[0][2],
                            rot_mat[1][0]*x+rot_mat[1][1]*y+rot_mat[1][2]) for (x, y) in landmark])
    face = img_rotated_by_alpha[box.top:box.bottom+1, box.left:box.right+1]
    return (face, landmark_)


def parse_arguments(argv):

    parser = argparse.ArgumentParser()
    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

 

合並生成P-Net訓練使用的數據:

gen_imglist_pnet.py

# coding: utf-8

import numpy as np
npr = np.random
import os
data_dir = '../data/'


"""
將pos, part, neg, landmark四者混在一起
"""

size = 12
with open(os.path.join(data_dir, '12/pos_12.txt'), 'r') as f:
    pos = f.readlines()
with open(os.path.join(data_dir, '12/neg_12.txt'), 'r') as f:
    neg = f.readlines()
with open(os.path.join(data_dir, '12/part_12.txt'), 'r') as f:
    part = f.readlines()
with open(os.path.join(data_dir, '12/landmark_12_aug.txt'), 'r') as f:
    landmark = f.readlines()
dir_path = os.path.join(data_dir, '12')
if not os.path.exists(dir_path):
    os.makedirs(dir_path)
with open(os.path.join(dir_path, 'train_pnet_landmark.txt'), 'w') as f:
    nums = [len(neg), len(pos), len(part)]
    base_num = 250000
    print('neg數量:{} pos數量:{} part數量:{} 基數:{}'.format(len(neg), len(pos), len(part), base_num))
    if len(neg) > base_num*3:
        neg_keep = npr.choice(len(neg), size=base_num*3, replace=True)
    else:
        neg_keep = npr.choice(len(neg), size=len(neg), replace=True)
    sum_p = len(neg_keep)//3  # pos : part : neg = 1 : 1 : 3
    pos_keep = npr.choice(len(pos), sum_p, replace=True)
    part_keep = npr.choice(len(part), sum_p, replace=True)
    print('neg數量:{} pos數量:{} part數量:{}'.format(len(neg_keep), len(pos_keep), len(part_keep)))
    for i in pos_keep:
        f.write(pos[i])
    for i in neg_keep:
        f.write(neg[i])
    for i in part_keep:
        f.write(part[i])
    for item in landmark:
        f.write(item) 
View Code

 

將訓練數據轉換成TFRecords個數文件:

gen_tfrecords.py

# coding: utf-8

import os
import random
import sys
import time
import tensorflow as tf
import cv2
from tqdm import tqdm
import argparse


def main(args):
    """
    生成tfrecords文件
    """
    size = args.input_size
    # 數據存放地址
    dataset_dir = '../data/'
    # tfrecord存放地址
    output_dir = os.path.join(dataset_dir, str(size)+'/tfrecord')
    if not os.path.exists(output_dir):
        os.mkdir(output_dir)
    # pnet只生成一個混合的tfrecords, rnet和onet要分別生成4個
    if size == 12:
        net = 'PNet'
        tf_filenames = [os.path.join(output_dir, 'train_%s_landmark.tfrecord' % net)]
        items = ['12/train_pnet_landmark.txt']
    elif size == 24:
        net = 'RNet'
        tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord')
        item1 = '%d/pos_%d.txt' % (size, size)
        tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord')
        item2 = '%d/part_%d.txt' % (size, size)
        tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord')
        item3 = '%d/neg_%d.txt' % (size, size)
        tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord')
        item4 = '%d/landmark_%d_aug.txt' % (size, size)
        tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4]
        items = [item1, item2, item3, item4]
    elif size == 48:
        net = 'ONet'
        tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord')
        item1 = '%d/pos_%d.txt' % (size, size)
        tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord')
        item2 = '%d/part_%d.txt' % (size, size)
        tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord')
        item3 = '%d/neg_%d.txt' % (size, size)
        tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord')
        item4 = '%d/landmark_%d_aug.txt' % (size, size)
        tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4]
        items = [item1, item2, item3, item4]
    
    if tf.gfile.Exists(tf_filenames[0]):
        print('tfrecords文件早已生成,無需此操作')
        return
    # 獲取數據
    for tf_filename, item in zip(tf_filenames, items):
        print('開始讀取數據')
        dataset = get_dataset(dataset_dir, item)
        tf_filename = tf_filename+'_shuffle'
        random.shuffle(dataset)  # 數據進行打亂
        print('開始轉換tfrecords')
        with tf.python_io.TFRecordWriter(tf_filename) as tfrecord_writer:
            for image_example in tqdm(dataset):
                filename = image_example['filename']
                try:
                    _add_to_tfrecord(filename, image_example, tfrecord_writer)
                except:
                    print(filename)
    print('完成轉換')


def get_dataset(dir, item):
    """
    從txt獲取數據
    參數:
      dir:存放數據目錄
      item:txt目錄
    返回值:
      包含label,box,關鍵點的data
    """
    dataset_dir = os.path.join(dir, item)
    imagelist = open(dataset_dir, 'r')
    dataset = []
    for line in tqdm(imagelist.readlines()):  # 進度條顯示
        info = line.strip().split(' ')
        data_example = dict()
        bbox = dict()
        data_example['filename'] = info[0]
        data_example['label'] = int(info[1])
        # neg的box默認為0,part,pos的box只包含人臉框,landmark的box只包含關鍵點
        bbox['xmin'] = 0
        bbox['ymin'] = 0
        bbox['xmax'] = 0
        bbox['ymax'] = 0
        bbox['xlefteye'] = 0
        bbox['ylefteye'] = 0
        bbox['xrighteye'] = 0
        bbox['yrighteye'] = 0
        bbox['xnose'] = 0
        bbox['ynose'] = 0
        bbox['xleftmouth'] = 0
        bbox['yleftmouth'] = 0
        bbox['xrightmouth'] = 0
        bbox['yrightmouth'] = 0        
        if len(info) == 6:  # 長度為6, 說明只有人臉框標記(6-2)
            bbox['xmin'] = float(info[2])
            bbox['ymin'] = float(info[3])
            bbox['xmax'] = float(info[4])
            bbox['ymax'] = float(info[5])
        if len(info) == 12:  # 長度為12, 說明是人臉關鍵點關鍵點(12-2)
            bbox['xlefteye'] = float(info[2])
            bbox['ylefteye'] = float(info[3])
            bbox['xrighteye'] = float(info[4])
            bbox['yrighteye'] = float(info[5])
            bbox['xnose'] = float(info[6])
            bbox['ynose'] = float(info[7])
            bbox['xleftmouth'] = float(info[8])
            bbox['yleftmouth'] = float(info[9])
            bbox['xrightmouth'] = float(info[10])
            bbox['yrightmouth'] = float(info[11])
        data_example['bbox'] = bbox
        dataset.append(data_example)
    return dataset


def _add_to_tfrecord(filename, image_example, tfrecord_writer):
    """
    轉換成tfrecord文件
    參數:
      filename:圖片文件名
      image_example:數據
      tfrecord_writer:寫入文件
    """
    image_data, height, width = _process_image_withoutcoder(filename)
    example = _convert_to_example_simple(image_example, image_data)
    tfrecord_writer.write(example.SerializeToString())


def _process_image_withoutcoder(filename):
    """
    讀取圖片文件,返回圖片大小
    """
    image = cv2.imread(filename)
    image_data = image.tostring()
    assert len(image.shape) == 3
    height = image.shape[0]
    width = image.shape[1]
    assert image.shape[2] == 3
    return image_data, height, width


# 不同類型數據的轉換
def _int64_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def _float_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def _bytes_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))


def _convert_to_example_simple(image_example, image_buffer):
    """
    轉換成tfrecord接受形式
    """
    class_label = image_example['label']
    bbox = image_example['bbox']
    roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']]
    landmark = [bbox['xlefteye'], bbox['ylefteye'], bbox['xrighteye'], bbox['yrighteye'], bbox['xnose'], bbox['ynose'],
                bbox['xleftmouth'], bbox['yleftmouth'], bbox['xrightmouth'], bbox['yrightmouth']]

    example = tf.train.Example(features=tf.train.Features(feature={
        'image/encoded': _bytes_feature(image_buffer),
        'image/label': _int64_feature(class_label),
        'image/roi': _float_feature(roi),
        'image/landmark': _float_feature(landmark)
    }))
    return example


def parse_arguments(argv):

    parser = argparse.ArgumentParser()

    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

 

訓練:

train_model.py

# coding: utf-8


import os
import sys
from datetime import datetime
import numpy as np
import tensorflow as tf
import config as FLAGS
import random
import cv2


def train(net_factory, prefix, end_epoch, base_dir, display, base_lr):
    """
    訓練模型
    """
    size = int(base_dir.split('/')[-1])  # 獲取得到網絡大小(因為base_dir保存的路徑為:../data/12, ../data/24, ../data/48)

    # 論文中的alpha, 代表了任務的重要性. 和論文中保持一致.
    if size == 12:
        net = 'PNet'
        radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5;
    elif size == 24:
        net = 'RNet'
        radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5;
    elif size == 48:
        net = 'ONet'
        radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 1;
        
    if net == 'PNet':
        # 計算一共多少組數據
        label_file = os.path.join(base_dir, 'train_pnet_landmark.txt')
        f = open(label_file, 'r')
   
        num = len(f.readlines())
        dataset_dir = os.path.join(base_dir, 'tfrecord/train_PNet_landmark.tfrecord_shuffle')
        # 從tfrecord讀取數據
        image_batch, label_batch, bbox_batch, landmark_batch = read_single_tfrecord(dataset_dir, FLAGS.batch_size, net)
    else:
        # 計算一共多少組數據
        label_file1 = os.path.join(base_dir, 'pos_%d.txt' % size)
        f1 = open(label_file1, 'r')
        label_file2 = os.path.join(base_dir, 'part_%d.txt' % size)
        f2 = open(label_file2, 'r')
        label_file3 = os.path.join(base_dir, 'neg_%d.txt' % size)
        f3 = open(label_file3, 'r')
        label_file4 = os.path.join(base_dir, 'landmark_%d_aug.txt' % size)
        f4 = open(label_file4, 'r')
   
        num = len(f1.readlines())+len(f2.readlines())+len(f3.readlines())+len(f4.readlines())
    
        pos_dir = os.path.join(base_dir, 'tfrecord/pos_landmark.tfrecord_shuffle')
        part_dir = os.path.join(base_dir, 'tfrecord/part_landmark.tfrecord_shuffle')
        neg_dir = os.path.join(base_dir, 'tfrecord/neg_landmark.tfrecord_shuffle')
        landmark_dir = os.path.join(base_dir, 'tfrecord/landmark_landmark.tfrecord_shuffle')
        dataset_dirs = [pos_dir, part_dir, neg_dir, landmark_dir]
        # 各數據占比
        # 目的是使每一個batch的數據占比都相同
        # 訓練數據的比例, pos : part : landmark, neg = 1 : 1 : 1 : 3.
        pos_radio, part_radio, landmark_radio, neg_radio = 1.0/6, 1.0/6, 1.0/6, 3.0/6
        pos_batch_size = int(np.ceil(FLAGS.batch_size*pos_radio))
        assert pos_batch_size != 0, "Batch Size 有誤 "
        part_batch_size = int(np.ceil(FLAGS.batch_size*part_radio))
        assert part_batch_size != 0, "BBatch Size 有誤 "
        neg_batch_size = int(np.ceil(FLAGS.batch_size*neg_radio))
        assert neg_batch_size != 0, "Batch Size 有誤 "
        landmark_batch_size = int(np.ceil(FLAGS.batch_size*landmark_radio))
        assert landmark_batch_size != 0, "Batch Size 有誤 "
        batch_sizes = [pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size]
        image_batch, label_batch, bbox_batch, landmark_batch = read_multi_tfrecords(dataset_dirs, batch_sizes, net)

    # 定義占位符, 訓練時使用, 后續將讀取的tfrecords數據傳入.
    input_image = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, size, size, 3], name='input_image')
    label = tf.placeholder(tf.float32, shape=[FLAGS.batch_size], name='label')
    bbox_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 4], name='bbox_target')
    landmark_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 10], name='landmark_target')
    # 圖像色相變換
    input_image = image_color_distort(input_image)
    cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, accuracy_op = net_factory(input_image,
                                                                                       label,
                                                                                       bbox_target,
                                                                                       landmark_target,
                                                                                       training=True)

    # 計算訓練損失, 論文中公式實現.
    total_loss_op = radio_cls_loss*cls_loss_op+radio_bbox_loss*bbox_loss_op+radio_landmark_loss*landmark_loss_op+L2_loss_op
    train_op, lr_op = optimize(base_lr, total_loss_op, num)

    # 將變量添加到tensorboard, 實現可視化.
    tf.summary.scalar("cls_loss", cls_loss_op)  # cls_loss
    tf.summary.scalar("bbox_loss", bbox_loss_op)  # bbox_loss
    tf.summary.scalar("landmark_loss", landmark_loss_op)  # landmark_loss
    tf.summary.scalar("cls_accuracy", accuracy_op)  # cls_acc
    tf.summary.scalar("total_loss", total_loss_op)  # cls_loss, bbox loss, landmark loss and L2 loss add together
    summary_op = tf.summary.merge_all()
    logs_dir = "../graph/%s" % net
    if not os.path.exists(logs_dir):  # if os.path.exists(logs_dir) == False:
        os.mkdir(logs_dir)
    # 模型訓練
    init = tf.global_variables_initializer()
    sess = tf.Session()

    saver = tf.train.Saver(max_to_keep=3)
    sess.run(init)
    # 模型的graph
    writer = tf.summary.FileWriter(logs_dir, sess.graph)
    # 使用 tf.train.Coordinator()來創建一個線程管理器(協調器)對象, 管理線程.
    coord = tf.train.Coordinator()
    # 啟動QueueRunner
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    i = 0
    
    MAX_STEP = int(num / FLAGS.batch_size + 1) * end_epoch
    epoch = 0
    sess.graph.finalize()
    try:
        for step in range(MAX_STEP):
            i = i + 1
            if coord.should_stop():
                break
            image_batch_array, label_batch_array, bbox_batch_array, landmark_batch_array = sess.run([image_batch,
                                                                                                     label_batch,
                                                                                                     bbox_batch,
                                                                                                     landmark_batch])
            # 隨機翻轉圖像
            image_batch_array, landmark_batch_array = random_flip_images(image_batch_array,
                                                                         label_batch_array,
                                                                         landmark_batch_array)

            _, _, summary = sess.run([train_op, lr_op, summary_op],
                                     feed_dict={input_image: image_batch_array,
                                                label: label_batch_array,
                                                bbox_target: bbox_batch_array,
                                                landmark_target: landmark_batch_array})
            # 訓練過程
            if (step+1) % display == 0:
                cls_loss, bbox_loss, landmark_loss, L2_loss, lr, acc = sess.run([cls_loss_op,
                                                                                 bbox_loss_op,
                                                                                 landmark_loss_op,
                                                                                 L2_loss_op,
                                                                                 lr_op,
                                                                                 accuracy_op],
                                                                                feed_dict={input_image: image_batch_array,
                                                                                           label: label_batch_array,
                                                                                           bbox_target: bbox_batch_array,
                                                                                           landmark_target: landmark_batch_array})

                total_loss = radio_cls_loss*cls_loss + radio_bbox_loss*bbox_loss + radio_landmark_loss*landmark_loss + L2_loss
                print('epoch: %d/%d' % (epoch+1, end_epoch))
                print("Step: %d/%d, accuracy: %3f, cls loss: %4f, bbox loss: %4f, Landmark loss :%4f, L2 loss: %4f, Total Loss: %4f, lr:%f"
                      % (step+1, MAX_STEP, acc, cls_loss, bbox_loss, landmark_loss, L2_loss, total_loss, lr))

            # 每一次epoch保留一次模型
            if i * FLAGS.batch_size > num:
                epoch = epoch + 1
                i = 0
                path_prefix = saver.save(sess, prefix, global_step=epoch)
            writer.add_summary(summary, global_step=step)
    except tf.errors.OutOfRangeError:
        print("完成!!!")
    finally:
        coord.request_stop()
        writer.close()
    coord.join(threads)
    sess.close()


def optimize(base_lr, loss, data_num):
    """
    參數優化
    """
    lr_factor = 0.1
    global_step = tf.Variable(0, trainable=False)
    # 計算訓練次數 data_num / batch 為整個訓練集完成一次訓練需要的次數. 再乘以epoch(整個數據集訓練次數), 即為總的訓練次數.
    # 這里使用階梯式的學習率lr, 所以lr也區分三個. base_lr * lr_factor ^ x  --->  x=(0, 1, 2, 3)
    boundaries = [int(epoch * data_num / FLAGS.batch_size) for epoch in FLAGS.LR_EPOCH]
    lr_values = [base_lr * (lr_factor ** x) for x in range(0, len(FLAGS.LR_EPOCH) + 1)]
    lr_op = tf.train.piecewise_constant(global_step, boundaries, lr_values)
    # 使用momentum優化器
    optimizer = tf.train.MomentumOptimizer(lr_op, 0.9)
    train_op = optimizer.minimize(loss, global_step)
    return train_op, lr_op


def read_single_tfrecord(tfrecord_file, batch_size, net):
    """
    讀取tfrecord數據
    """
    filename_queue = tf.train.string_input_producer([tfrecord_file], shuffle=True)
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    image_features = tf.parse_single_example(serialized_example,
                                             features={
                                                       'image/encoded': tf.FixedLenFeature([], tf.string),
                                                       'image/label': tf.FixedLenFeature([], tf.int64),
                                                       'image/roi': tf.FixedLenFeature([4], tf.float32),
                                                       'image/landmark': tf.FixedLenFeature([10], tf.float32)})

    if net == 'PNet':
        image_size = 12
    elif net == 'RNet':
        image_size = 24
    elif net == 'ONet':
        image_size = 48

    # _bytes_feature將原始圖像進行轉換保存到tfrecords文件, tf.decode_raw將原來編碼為字符串類型的變量重新變回來原始圖像數據
    image = tf.decode_raw(image_features['image/encoded'], tf.uint8)
    image = tf.reshape(image, [image_size, image_size, 3])
    # 將值規划在[-1,1]內
    image = (tf.cast(image, tf.float32)-127.5)/128  # 上面將數據轉換成uint8, 即8位無符號整型(0-255).
    
    label = tf.cast(image_features['image/label'], tf.float32)
    roi = tf.cast(image_features['image/roi'], tf.float32)
    landmark = tf.cast(image_features['image/landmark'], tf.float32)
    image, label, roi, landmark = tf.train.batch([image, label, roi, landmark],
                                                 batch_size=batch_size,
                                                 num_threads=2,
                                                 capacity=batch_size)

    # tf.train.batch獲取一個batch的數據, 所以下面將數據的第一維reshape成batch_size.
    label = tf.reshape(label, [batch_size])
    roi = tf.reshape(roi, [batch_size, 4])
    landmark = tf.reshape(landmark, [batch_size, 10])
    return image, label, roi, landmark


def read_multi_tfrecords(tfrecord_files, batch_sizes, net):
    """
    讀取多個tfrecord文件放一起
    """
    pos_dir, part_dir, neg_dir, landmark_dir = tfrecord_files
    pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size = batch_sizes
   
    pos_image, pos_label, pos_roi, pos_landmark = read_single_tfrecord(pos_dir, pos_batch_size, net)
  
    part_image, part_label, part_roi, part_landmark = read_single_tfrecord(part_dir, part_batch_size, net)
  
    neg_image, neg_label, neg_roi, neg_landmark = read_single_tfrecord(neg_dir, neg_batch_size, net)

    landmark_image, landmark_label, landmark_roi, landmark_landmark = read_single_tfrecord(landmark_dir, landmark_batch_size, net)

    images = tf.concat([pos_image, part_image, neg_image, landmark_image], 0, name="concat/image")
   
    labels = tf.concat([pos_label, part_label, neg_label, landmark_label], 0, name="concat/label")
 
    assert isinstance(labels, object)

    rois = tf.concat([pos_roi, part_roi, neg_roi, landmark_roi], 0, name="concat/roi")
    
    landmarks = tf.concat([pos_landmark, part_landmark, neg_landmark, landmark_landmark], 0, name="concat/landmark")
    return images, labels, rois, landmarks


def image_color_distort(inputs):
    inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5)
    inputs = tf.image.random_brightness(inputs, max_delta=0.2)
    inputs = tf.image.random_hue(inputs,max_delta= 0.2)
    inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5)

    return inputs


def random_flip_images(image_batch,label_batch,landmark_batch):
    '''隨機翻轉圖像'''
    if random.choice([0,1]) > 0:
        num_images = image_batch.shape[0]
        fliplandmarkindexes = np.where(label_batch==-2)[0]
        flipposindexes = np.where(label_batch==1)[0]
        
        flipindexes = np.concatenate((fliplandmarkindexes,flipposindexes))
          
        for i in flipindexes:
            cv2.flip(image_batch[i],1,image_batch[i])        
        
           
        for i in fliplandmarkindexes:
            landmark_ = landmark_batch[i].reshape((-1,2))
            landmark_ = np.asarray([(1-x, y) for (x, y) in landmark_])
            landmark_[[0, 1]] = landmark_[[1, 0]]
            landmark_[[3, 4]] = landmark_[[4, 3]]       
            landmark_batch[i] = landmark_.ravel()
        
    return image_batch,landmark_batch
View Code

 

train.py

# coding: utf-8

from model import P_Net, R_Net, O_Net
import argparse
import os
import sys
import config as FLAGS
from train_model import train
net_factorys = [P_Net, R_Net, O_Net]


def main(args):
    size = args.input_size
    base_dir = os.path.join('../data/', str(size))
    
    if size == 12:
        net = 'PNet'
        net_factory = net_factorys[0]
        end_epoch = FLAGS.end_epoch[0]
    elif size == 24:
        net = 'RNet'
        net_factory = net_factorys[1]
        end_epoch = FLAGS.end_epoch[1]
    elif size == 48:
        net = 'ONet'
        net_factory = net_factorys[2]
        end_epoch = FLAGS.end_epoch[2]
    model_path = os.path.join('../model/', net)
    if not os.path.exists(model_path):
        os.mkdir(model_path)
    prefix = os.path.join(model_path, net)
    display = FLAGS.display
    lr = FLAGS.lr
    train(net_factory, prefix, end_epoch, base_dir, display, lr)


def parse_arguments(argv):

    parser = argparse.ArgumentParser()

    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

 

其中,模型文件:

model.py

 

# coding: utf-8

# In[1]:


import tensorflow as tf
slim = tf.contrib.slim
import numpy as np
# 只把70%數據用作參數更新
num_keep_radio = 0.7


def P_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
    """
    PNet的結構
    """
    with tf.variable_scope('PNet'):
        # 使用tensorflow slim構建神經網絡
        with slim.arg_scope([slim.conv2d], activation_fn=prelu,
                            weights_initializer=slim.xavier_initializer(),
                            weights_regularizer=slim.l2_regularizer(0.0005),
                            padding='VALID'):
            net = slim.conv2d(inputs, 10, 3, scope='conv1')  # 第一層:輸出為10, kernel_size為3
            net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool1')
            net = slim.conv2d(net, 16, 3, scope='conv2')
            net = slim.conv2d(net, 32, 3, scope='conv3')
            # 二分類輸出通道數為2
            conv4_1 = slim.conv2d(net, 2, 1, activation_fn=tf.nn.softmax, scope='conv4_1')  # 二分類預測是不是人臉框
            bbox_pred = slim.conv2d(net, 4, 1, activation_fn=None, scope='conv4_2')  # 4回歸獲取人臉框坐標
            landmark_pred = slim.conv2d(net, 10, 1, activation_fn=None, scope='conv4_3')  # 10回歸獲取人臉特征點坐標
            
            if training:
                # 刪除維度1, 2, size為1的維度, 即:[batch 1 1 2] -> [batch, 2]
                cls_prob = tf.squeeze(conv4_1, [1, 2], name='cls_prob')
                cls_loss = cls_ohem(cls_prob, label)
                
                bbox_pred = tf.squeeze(bbox_pred, [1, 2], name='bbox_pred')  # [batch, 4]
                bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                
                landmark_pred = tf.squeeze(landmark_pred, [1, 2], name='landmark_pred')  # [batch, 10]
                landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                
                accuracy = cal_accuracy(cls_prob, label)
                L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
            else:
                # 測試時batch_size=1
                cls_pro_test = tf.squeeze(conv4_1, axis=0)
                bbox_pred_test = tf.squeeze(bbox_pred, axis=0)
                landmark_pred_test = tf.squeeze(landmark_pred, axis=0)
                return cls_pro_test, bbox_pred_test, landmark_pred_test


def R_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
    """
    RNet的結構
    """
    with tf.variable_scope('RNet'):
        with slim.arg_scope([slim.conv2d],
                            activation_fn=prelu,
                            weights_initializer=slim.xavier_initializer(),
                            weights_regularizer=slim.l2_regularizer(0.0005),
                            padding='VALID'):
            net = slim.conv2d(inputs, 28, 3, scope='conv1')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1')
            net = slim.conv2d(net, 48, 3, scope='conv2')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2')
            net = slim.conv2d(net, 64, 2, scope='conv3')
            fc_flatten = slim.flatten(net)
            fc1 = slim.fully_connected(fc_flatten, num_outputs=128, scope='fc1')
            
            cls_prob = slim.fully_connected(fc1, num_outputs=2,activation_fn=tf.nn.softmax, scope='cls_fc')
            bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc')
            landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc')
            if training:
                cls_loss = cls_ohem(cls_prob, label)
                bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                
                accuracy = cal_accuracy(cls_prob, label)
                L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
            else:
                return cls_prob, bbox_pred, landmark_pred


def O_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
    """
    ONet結構
    """
    with tf.variable_scope('ONet'):
        with slim.arg_scope([slim.conv2d],
                            activation_fn=prelu,
                            weights_initializer=slim.xavier_initializer(),
                            weights_regularizer=slim.l2_regularizer(0.0005),
                            padding='VALID'):
            net = slim.conv2d(inputs, 32, 3, scope='conv1')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1')
            net = slim.conv2d(net, 64, 3, scope='conv2')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2')
            net = slim.conv2d(net, 64, 3, scope='conv3')
            net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool3')
            net = slim.conv2d(net, 128, 2, scope='conv4')
            fc_flatten = slim.flatten(net)
            fc1 = slim.fully_connected(fc_flatten, num_outputs=256, scope='fc1')
            
            cls_prob = slim.fully_connected(fc1, num_outputs=2, activation_fn=tf.nn.softmax, scope='cls_fc')
            bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc')
            landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc')
            if training:
                cls_loss = cls_ohem(cls_prob, label)
                bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                
                accuracy = cal_accuracy(cls_prob, label)
                L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
            else:
                return cls_prob, bbox_pred, landmark_pred


def prelu(inputs):
    """
    prelu函數定義
    """
    alphas = tf.get_variable('alphas', shape=inputs.get_shape()[-1], dtype=tf.float32,
                             initializer=tf.constant_initializer(0.25))
    pos = tf.nn.relu(inputs)
    neg = alphas*(inputs-abs(inputs))*0.5
    return pos+neg


def cls_ohem(cls_prob, label):
    """
    計算類別損失
    參數:
      cls_prob:預測類別,是否有人
      label:真實值
    返回值:
      損失
    """
    zeros = tf.zeros_like(label)

    # neg: 0, pos: 1, part: -1
    # negatives and positives are used for face classification tasks
    # 這里只把pos的label置1, neg和part的label置0.
    # neg: label->0, pos: label->1, part: 0
    label_filter_invalid = tf.where(tf.less(label, 0), zeros, label)
    num_cls_prob = tf.size(cls_prob)  # 計算類別的size=batch*2
    cls_prob_reshape = tf.reshape(cls_prob, [num_cls_prob, -1])  # 將類別數組轉換成1維的
    label_int = tf.cast(label_filter_invalid, tf.int32)  # 將置0, 1的數組轉換成int32的
    num_row = tf.to_int32(cls_prob.get_shape()[0])  # 獲取batch數
    # 對應某一batch而言,batch*2為非人類別概率,batch*2+1為人概率類別,indices為對應 cls_prob_reshape
    # 應該的真實值,后續用交叉熵計算損失
    row = tf.range(num_row) * 2  # 生成每一個類別的基址:(0, 2, 4, 6, ..., (num_row - 1) * 2)
    # 以上面為基址, 即每個樣本的neg類別, label_int為是neg還是pos. 訓練樣本的label_int=0, neg; label_int=1, pos.
    indices_ = row + label_int
    # 獲取真實標簽對應的概率, indices_顯示了實際標簽的類別. 是neg還是pos.
    label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_))
    loss = -tf.log(label_prob+1e-10)  # 這里有點疑問, 交叉熵損失函數公式不是這樣的吧??????
    zeros = tf.zeros_like(label_prob, dtype=tf.float32)
    ones = tf.ones_like(label_prob, dtype=tf.float32)
    # 統計neg和pos的數量loss, 這里篩選neg和pos的loss用於后續訓練
    # label小於0(即part: -1)-> 0, 否則:pos、part均為1.
    # 上面全部計算了所有的
    valid_inds = tf.where(label < zeros, zeros, ones)
    num_valid = tf.reduce_sum(valid_inds)
    # 選取70%的數據
    keep_num = tf.cast(num_valid*num_keep_radio, dtype=tf.int32)
    # 只選取neg, pos的70%損失
    # loss * valid_inds 數組想乘只保留valid_inds為1的元素
    loss = loss * valid_inds
    loss, _ = tf.nn.top_k(loss, k=keep_num)
    return tf.reduce_mean(loss)


def bbox_ohem(bbox_pred, bbox_target, label):
    """
    計算box的損失
    """
    zeros_index = tf.zeros_like(label, dtype=tf.float32)
    ones_index = tf.ones_like(label, dtype=tf.float32)
    # 保留pos和part的數據
    valid_inds = tf.where(tf.equal(tf.abs(label), 1), ones_index, zeros_index)
    # 計算平方差損失
    square_error = tf.square(bbox_pred-bbox_target)
    square_error = tf.reduce_sum(square_error, axis=1)
    # 保留的數據的個數
    num_valid = tf.reduce_sum(valid_inds)
    keep_num = tf.cast(num_valid, dtype=tf.int32)
    # 保留pos和part部分的損失
    square_error = square_error*valid_inds
    square_error, _ = tf.nn.top_k(square_error, k=keep_num)
    return tf.reduce_mean(square_error)


def landmark_ohem(landmark_pred, landmark_target, label):
    """
    計算關鍵點損失
    """
    ones = tf.ones_like(label, dtype=tf.float32)
    zeros = tf.zeros_like(label, dtype=tf.float32)
    # 只保留landmark數據
    valid_inds = tf.where(tf.equal(label, -2), ones, zeros)
    # 計算平方差損失
    square_error = tf.square(landmark_pred-landmark_target)
    square_error = tf.reduce_sum(square_error, axis=1)
    # 保留數據個數
    num_valid = tf.reduce_sum(valid_inds)
    keep_num = tf.cast(num_valid, dtype=tf.int32)
    # 保留landmark部分數據損失
    square_error = square_error*valid_inds
    square_error, _ = tf.nn.top_k(square_error, k=keep_num)
    return tf.reduce_mean(square_error)


def cal_accuracy(cls_prob, label):
    """
    計算分類准確率
    """
    # 預測最大概率的類別,0代表無人,1代表有人
    pred = tf.argmax(cls_prob, axis=1)
    label_int = tf.cast(label, tf.int64)
    # 保留label>=0的數據,即pos和neg的數據
    cond = tf.where(tf.greater_equal(label_int, 0))
    picked = tf.squeeze(cond)
    # 獲取pos和neg的label值
    label_picked = tf.gather(label_int, picked)
    pred_picked = tf.gather(pred, picked)
    # 計算准確率
    accuracy_op = tf.reduce_mean(tf.cast(tf.equal(label_picked, pred_picked), tf.float32))
    return accuracy_op
View Code

 

生成下一個網絡的輸入:

gen_hard_example.py

# coding: utf-8


import sys
from utils import *
import numpy as np
import argparse
import os
import pickle
import cv2
from tqdm import tqdm
from loader import TestLoader
sys.path.append('../')
from train.model import P_Net, R_Net, O_Net
import train.config as config
from detection.detector import Detector
from detection.fcn_detector import FcnDetector
from detection.MtcnnDetector import MtcnnDetector


def main(args):
    """
    通過PNet或RNet生成下一個網絡的輸入
    """
    size = args.input_size
    batch_size = config.batches
    min_face_size = config.min_face
    stride = config.stride
    thresh = config.thresh
    # 模型地址
    model_path = ['../model/PNet/', '../model/RNet/', '../model/ONet']
    if size == 12:
        net = 'PNet'
        save_size = 24
    elif size == 24:
        net = 'RNet'
        save_size = 48
    # 圖片數據地址
    base_dir = '../data/WIDER_train/'
    # 處理后的圖片存放地址
    data_dir = '../data/%d' % save_size
    neg_dir = os.path.join(data_dir, 'negative')
    pos_dir = os.path.join(data_dir, 'positive')
    part_dir = os.path.join(data_dir, 'part')
    for dir_path in [neg_dir, pos_dir, part_dir]:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
    detectors = [None, None, None]
    PNet = FcnDetector(P_Net, model_path[0])
    detectors[0] = PNet
    if net == 'RNet':
        RNet = Detector(R_Net, 24, batch_size[1], model_path[1])
        detectors[1] = RNet
    basedir = '../data/'
    filename = '../data/wider_face_train_bbx_gt.txt'
    # 讀取文件的image和box對應函數在utils中
    data = read_annotation(base_dir, filename)
    mtcnn_detector = MtcnnDetector(detectors, min_face_size=min_face_size,
                                   stride=stride, threshold=thresh)
    save_path = data_dir
    save_file = os.path.join(save_path, 'detections.pkl')
    if not os.path.exists(save_file):
        # 將data制作成迭代器
        print('載入數據')
        test_data = TestLoader(data['images'])
        detectors, _ = mtcnn_detector.detect_face(test_data)
        print('完成識別')

        with open(save_file, 'wb') as f:
            pickle.dump(detectors, f, 1)
    print('開始生成圖像')
    save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path)


def save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path):
    """
    將網絡識別的box用來裁剪原圖像作為下一個網絡的輸入
    """
    im_idx_list = data['images']
    gt_boxes_list = data['bboxes']
    num_of_images = len(im_idx_list)

    neg_label_file = "../data/%d/neg_%d.txt" % (save_size, save_size)
    neg_file = open(neg_label_file, 'w')

    pos_label_file = "../data/%d/pos_%d.txt" % (save_size, save_size)
    pos_file = open(pos_label_file, 'w')

    part_label_file = "../data/%d/part_%d.txt" % (save_size, save_size)
    part_file = open(part_label_file, 'w')
    # read detect result
    det_boxes = pickle.load(open(os.path.join(save_path, 'detections.pkl'), 'rb'))
    # print(len(det_boxes), num_of_images)
   
    assert len(det_boxes) == num_of_images, "弄錯了"

    n_idx = 0
    p_idx = 0
    d_idx = 0
    image_done = 0
    
    for im_idx, dets, gts in tqdm(zip(im_idx_list, det_boxes, gt_boxes_list)):
        gts = np.array(gts, dtype=np.float32).reshape(-1, 4)
        image_done += 1

        if dets.shape[0] == 0:
            continue
        img = cv2.imread(im_idx)
        # 轉換成正方形
        dets = convert_to_square(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])
        neg_num = 0
        for box in dets:
            x_left, y_top, x_right, y_bottom, _ = box.astype(int)
            width = x_right - x_left + 1
            height = y_bottom - y_top + 1

            # 除去過小的box框
            if width < 20 or x_left < 0 or y_top < 0 or x_right > img.shape[1] - 1 or y_bottom > img.shape[0] - 1:
                continue

            Iou = IOU(box, gts)
            cropped_im = img[y_top:y_bottom + 1, x_left:x_right + 1, :]  # 截取圖片得到box.
            resized_im = cv2.resize(cropped_im, (save_size, save_size),
                                    interpolation=cv2.INTER_LINEAR)

            # 划分種類, 選取60張neg人臉框用於后續網絡的訓練.
            if np.max(Iou) < 0.3 and neg_num < 60:
                save_file = os.path.join(neg_dir, "%s.jpg" % n_idx)
                neg_file.write(save_file + ' 0\n')
                cv2.imwrite(save_file, resized_im)
                n_idx += 1
                neg_num += 1
            else:
                idx = np.argmax(Iou)  # 獲取IOU最大的索引
                assigned_gt = gts[idx]  # 得到IOU最大的人臉框
                x1, y1, x2, y2 = assigned_gt

                # 偏移量
                offset_x1 = (x1 - x_left) / float(width)
                offset_y1 = (y1 - y_top) / float(height)
                offset_x2 = (x2 - x_right) / float(width)
                offset_y2 = (y2 - y_bottom) / float(height)

                # pos和part
                if np.max(Iou) >= 0.65:
                    save_file = os.path.join(pos_dir, "%s.jpg" % p_idx)
                    pos_file.write(save_file + ' 1 %.2f %.2f %.2f %.2f\n' % (
                        offset_x1, offset_y1, offset_x2, offset_y2))
                    cv2.imwrite(save_file, resized_im)
                    p_idx += 1

                elif np.max(Iou) >= 0.4:
                    save_file = os.path.join(part_dir, "%s.jpg" % d_idx)
                    part_file.write(save_file + ' -1 %.2f %.2f %.2f %.2f\n' % (
                        offset_x1, offset_y1, offset_x2, offset_y2))
                    cv2.imwrite(save_file, resized_im)
                    d_idx += 1
    neg_file.close()
    part_file.close()
    pos_file.close()


def parse_arguments(argv):

    parser = argparse.ArgumentParser()

    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

 

訓練過程如下:

source activate tensorflow

進入到preprocess目錄下:
python gen_12net_data.py生成三種pnet數據


python gen_landmark_aug.py 12 生成pnet的landmark數據


python gen_imglist_pnet.py整理到一起


python gen_tfrecords.py 12生成tfrecords文件

 

進入到train目錄下:

python train.py 12 訓練pnet

 

tensorboard顯示loss:

 

進入到preprocess目錄:
python gen_hard_example.py 12 生成三種rnet數據,
python gen_landmark_aug.py 24 生成rnet的landmark數據,
python gen_tfrecords.py 24生成tfrecords文件
將目錄cd到train上python train.py 24 訓練rnet

將目錄cd到preprocess上,
python gen_hard_example.py 24 生成三種onet數據,
python gen_landmark_aug.py 48 生成onet的landmark數據,
python gen_tfrecords.py 48生成tfrecords文件
將目錄cd到train上python train.py 48 訓練onet

 

測試文件:

test.py

# coding: utf-8

import sys
from detection.MtcnnDetector import MtcnnDetector
from detection.detector import Detector
from detection.fcn_detector import FcnDetector
from train.model import P_Net, R_Net, O_Net
import cv2
import os
import numpy as np
import train.config as config


test_mode = config.test_mode
thresh = config.thresh
min_face_size = config.min_face
stride = config.stride
detectors = [None, None, None]
# 模型放置位置
model_path = ['model/PNet/', 'model/RNet/', 'model/ONet']
batch_size = config.batches
PNet = FcnDetector(P_Net, model_path[0])
detectors[0] = PNet


if test_mode in ["RNet", "ONet"]:
    RNet = Detector(R_Net, 24, batch_size[1], model_path[1])
    detectors[1] = RNet


if test_mode == "ONet":
    ONet = Detector(O_Net, 48, batch_size[2], model_path[2])
    detectors[2] = ONet

mtcnn_detector = MtcnnDetector(detectors=detectors, min_face_size=min_face_size,
                               stride=stride, threshold=thresh)
out_path = config.out_path
if config.input_mode == '1':
    # 選用圖片
    path = config.test_dir
    # print(path)
    for item in os.listdir(path):
        img_path = os.path.join(path, item)
        img = cv2.imread(img_path)
        boxes_c, landmarks = mtcnn_detector.detect(img)
        for i in range(boxes_c.shape[0]):
            bbox = boxes_c[i, :4]
            score = boxes_c[i, 4]
            corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
            # 畫人臉框
            cv2.rectangle(img, (corpbbox[0], corpbbox[1]),
                          (corpbbox[2], corpbbox[3]), (255, 0, 0), 1)
            # 判別為人臉的置信度
            cv2.putText(img, '{:.2f}'.format(score), 
                        (corpbbox[0], corpbbox[1] - 2),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
        # 畫關鍵點
        for i in range(landmarks.shape[0]):
            for j in range(len(landmarks[i])//2):
                cv2.circle(img, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255))
        cv2.imshow('im', img)
        k = cv2.waitKey(0) & 0xFF
        if k == 27:        
            cv2.imwrite(out_path + item, img)
    cv2.destroyAllWindows()

if config.input_mode == '2':
    cap = cv2.VideoCapture(0)
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(out_path+'out.mp4', fourcc, 10, (640, 480))
    while True:
            t1 = cv2.getTickCount()
            ret, frame = cap.read()
            if ret:
                boxes_c, landmarks = mtcnn_detector.detect(frame)
                t2 = cv2.getTickCount()
                t = (t2-t1)/cv2.getTickFrequency()
                fps = 1.0/t
                for i in range(boxes_c.shape[0]):
                    bbox = boxes_c[i, :4]
                    score = boxes_c[i, 4]
                    corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
                
                    # 畫人臉框
                    cv2.rectangle(frame, (corpbbox[0], corpbbox[1]),
                          (corpbbox[2], corpbbox[3]), (255, 0, 0), 1)
                    # 畫置信度
                    cv2.putText(frame, '{:.2f}'.format(score), 
                                (corpbbox[0], corpbbox[1] - 2), 
                                cv2.FONT_HERSHEY_SIMPLEX,
                                0.5, (0, 0, 255), 2)
                    # 畫fps值
                cv2.putText(frame, '{:.4f}'.format(t) + " " + '{:.3f}'.format(fps), (10, 20),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2)
                # 畫關鍵點
                for i in range(landmarks.shape[0]):
                    for j in range(len(landmarks[i])//2):
                        cv2.circle(frame, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255))
                a = out.write(frame)
                cv2.imshow("result", frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            else:
                break
    cap.release()
    out.release()
    cv2.destroyAllWindows()
View Code

 

其中使用到的模塊:

 

detector.py

# coding: utf-8


import tensorflow as tf
import numpy as np


class Detector:
    """
    識別多組圖片
    """
    def __init__(self, net_factory, data_size, batch_size, model_path):
        graph = tf.Graph()
        with graph.as_default():
            self.image_op = tf.placeholder(tf.float32, [None, data_size, data_size, 3])
            self.cls_prob, self.bbox_pred, self.landmark_pred = net_factory(self.image_op, training=False)
            self.sess = tf.Session()
            # 重載模型
            saver = tf.train.Saver()
            model_file = tf.train.latest_checkpoint(model_path)
            saver.restore(self.sess, model_file)
        self.data_size = data_size
        self.batch_size = batch_size

    def predict(self, databatch):
        scores = []
        batch_size = self.batch_size
        minibatch = []
        cur = 0
        # 所有數據總數
        n = databatch.shape[0]
        # 將數據整理成固定batch
        while cur < n:
            minibatch.append(databatch[cur:min(cur+batch_size, n), :, :, :])
            cur += batch_size
        cls_prob_list = []
        bbox_pred_list = []
        landmark_pred_list = []
        for idx, data in enumerate(minibatch):
            m = data.shape[0]
            real_size = self.batch_size
            # 最后一組數據不夠一個batch的處理
            if m < batch_size:
                keep_inds = np.arange(m)
                gap = self.batch_size-m
                while gap >= len(keep_inds):
                    gap -= len(keep_inds)
                    keep_inds = np.concatenate((keep_inds, keep_inds))
                if gap != 0:
                    keep_inds = np.concatenate((keep_inds, keep_inds[:gap]))
                data = data[keep_inds]
                real_size = m
            cls_prob, bbox_pred, landmark_pred = self.sess.run([self.cls_prob, self.bbox_pred, self.landmark_pred],
                                                               feed_dict={self.image_op: data})
            
            cls_prob_list.append(cls_prob[:real_size])
            bbox_pred_list.append(bbox_pred[:real_size])
            landmark_pred_list.append(landmark_pred[:real_size])
        
        return np.concatenate(cls_prob_list, axis=0), np.concatenate(bbox_pred_list, axis=0), np.concatenate(landmark_pred_list, axis=0)
View Code

 

fcn_detector.py

# coding: utf-8

import tensorflow as tf
import sys
sys.path.append('../')
import train.config as config


class FcnDetector:
    """
    識別單張圖片
    """
    def __init__(self, net_factory, model_path):
        graph = tf.Graph()
        with graph.as_default():
            self.image_op = tf.placeholder(tf.float32, name='input_image')
            self.width_op = tf.placeholder(tf.int32, name='image_width')
            self.height_op = tf.placeholder(tf.int32, name='image_height')
            image_reshape = tf.reshape(self.image_op, [1, self.height_op, self.width_op, 3])
            # 預測值
            self.cls_prob, self.bbox_pred, _ = net_factory(image_reshape, training=False)
            self.sess = tf.Session()
            # 重載模型
            saver = tf.train.Saver()
            model_file = tf.train.latest_checkpoint(model_path)
            saver.restore(self.sess, model_file)

    def predict(self, databatch):
        height, width, _ = databatch.shape
        cls_prob, bbox_pred = self.sess.run([self.cls_prob, self.bbox_pred],
                                            feed_dict={self.image_op: databatch,
                                                       self.width_op: width,
                                                       self.height_op: height})
        
        return cls_prob, bbox_pred
View Code

 

MtcnnDetector.py

# coding: utf-8


import cv2
import numpy as np
import sys

sys.path.append('../')
from preprocess.utils import *
from tqdm import tqdm


def py_nms(dets, thresh):
    """
    剔除太相似的box
    """
    x1 = dets[:, 0]
    y1 = dets[:, 1]
    x2 = dets[:, 2]
    y2 = dets[:, 3]
    scores = dets[:, 4]

    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    # 將概率值從大到小排列
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1 + 1)
        h = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w * h
        
        ovr = inter / (areas[i] + areas[order[1:]] - inter+1e-10)
       
        # 保留小於閾值的下標,因為order[0]拿出來做比較了,所以inds+1是原來對應的下標
        inds = np.where(ovr <= thresh)[0]
        order = order[inds + 1]

    return keep


class MtcnnDetector:
    """
    來生成人臉的圖像
    """
    def __init__(self, detectors,
                 min_face_size=20,
                 stride=2,
                 threshold=[0.6, 0.7, 0.7],
                 scale_factor=0.79  # 圖像金字塔的縮小率
                 ):
        self.pnet_detector = detectors[0]
        self.rnet_detector = detectors[1]
        self.onet_detector = detectors[2]
        self.min_face_size = min_face_size
        self.stride = stride
        self.thresh = threshold
        self.scale_factor = scale_factor

    def detect_face(self, test_data):
        all_boxes = []
        landmarks = []
        batch_idx = 0
        num_of_img = test_data.size
        empty_array = np.array([])
        for databatch in tqdm(test_data):
            batch_idx += 1
            im = databatch
            if self.pnet_detector:
                boxes, boxes_c, landmark = self.detect_pnet(im)
                if boxes_c is None:
                    all_boxes.append(empty_array)
                    landmarks.append(empty_array)
                    continue
            if self.rnet_detector:
                boxes, boxes_c, landmark = self.detect_rnet(im, boxes_c)
                
                if boxes_c is None:
                    all_boxes.append(empty_array)
                    landmarks.append(empty_array)
                    continue

            if self.onet_detector:
                
                boxes, boxes_c, landmark = self.detect_onet(im, boxes_c)
               
                if boxes_c is None:
                    all_boxes.append(empty_array)
                    landmarks.append(empty_array)
                    continue

            all_boxes.append(boxes_c)
            landmark = [1]
            landmarks.append(landmark)
        return all_boxes, landmarks

    def detect_pnet(self, im):
        """
        通過PNet篩選box和landmark
        參數:
          im:輸入圖像[h,2,3]
        """
        h, w, c = im.shape
        net_size = 12
        # 人臉和輸入圖像的比率
        current_scale = float(net_size) / self.min_face_size
        im_resized = self.processed_image(im, current_scale)
        current_height, current_width, _ = im_resized.shape
        all_boxes = list()
        # 圖像金字塔, 不斷地去resize圖片
        while min(current_height, current_width) > net_size:
            # 類別和box
            # 這里是測試流程, 輸入是一張圖片(size不一定是12*12)
            # 因此這里面輸出得到的cls_cls_map形狀是feature map(n * m * 2)
            # reg形狀是是(n * m * 4)
            cls_cls_map, reg = self.pnet_detector.predict(im_resized)
            boxes = self.generate_bbox(cls_cls_map[:, :, 1], reg, current_scale, self.thresh[0])
            current_scale *= self.scale_factor  # 繼續縮小圖像做金字塔
            im_resized = self.processed_image(im, current_scale)
            current_height, current_width, _ = im_resized.shape
            
            if boxes.size == 0:
                continue
            # 非極大值抑制留下重復低的box
            keep = py_nms(boxes[:, :5], 0.5)
            boxes = boxes[keep]
            all_boxes.append(boxes)
        if len(all_boxes) == 0:
            return None, None, None
        all_boxes = np.vstack(all_boxes)

        # 將金字塔之后的box也進行非極大值抑制
        keep = py_nms(all_boxes[:, 0:5], 0.7)
        all_boxes = all_boxes[keep]
        boxes = all_boxes[:, :5]

        # box的長寬
        bbw = all_boxes[:, 2] - all_boxes[:, 0] + 1
        bbh = all_boxes[:, 3] - all_boxes[:, 1] + 1

        # 對應原圖的box坐標和分數, 訓練數據是相對於人臉框bbox的歸一化的offset, 因此這里dx、dy也都是歸一化的.
        boxes_c = np.vstack([all_boxes[:, 0] + all_boxes[:, 5] * bbw,  # all_boxes[:, 5]--> dx1
                             all_boxes[:, 1] + all_boxes[:, 6] * bbh,  # all_boxes[:, 6]--> dy1
                             all_boxes[:, 2] + all_boxes[:, 7] * bbw,  # all_boxes[:, 7]--> dx2
                             all_boxes[:, 3] + all_boxes[:, 8] * bbh,  # all_boxes[:, 8]--> dy2
                             all_boxes[:, 4]])
        boxes_c = boxes_c.T
        return boxes, boxes_c, None

    def detect_rnet(self, im, dets):
        """
        通過rent選擇box
        參數:
          im:輸入圖像
          dets: PNet選擇的box,是相對原圖的絕對坐標
        返回值:
          box絕對坐標
        """
        h, w, c = im.shape
        # 將PNet的box變成包含它的正方形,可以避免信息損失
        dets = convert_to_square(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])
        # 調整超出圖像的box
        [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
        delete_size = np.ones_like(tmpw)*20
        ones = np.ones_like(tmpw)
        zeros = np.zeros_like(tmpw)
        num_boxes = np.sum(np.where((np.minimum(tmpw, tmph) >= delete_size), ones, zeros))
        cropped_ims = np.zeros((num_boxes, 24, 24, 3), dtype=np.float32)
        for i in range(num_boxes):
            # 將PNet生成的box相對與原圖進行裁剪, 超出部分用0補
            if tmph[i] < 20 or tmpw[i] < 20:
                continue
            tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
            tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
            cropped_ims[i, :, :, :] = (cv2.resize(tmp, (24, 24)) - 127.5) / 128
        cls_scores, reg, _ = self.rnet_detector.predict(cropped_ims)
        cls_scores = cls_scores[:, 1]
        keep_inds = np.where(cls_scores > self.thresh[1])[0]
        if len(keep_inds) > 0:
            boxes = dets[keep_inds]
            boxes[:, 4] = cls_scores[keep_inds]
            reg = reg[keep_inds]
        else:
            return None, None, None

        keep = py_nms(boxes, 0.6)
        boxes = boxes[keep]
        # 對PNet截取的圖像的坐標進行校准,生成RNet的人臉框對於原圖的絕對坐標
        boxes_c = self.calibrate_box(boxes, reg[keep])
        return boxes, boxes_c, None
    
    def detect_onet(self, im, dets):
        """
        將ONet的選框繼續篩選基本和RNet差不多但多返回了landmark
        """
        h, w, c = im.shape
        dets = convert_to_square(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])
        [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
        num_boxes = dets.shape[0]
        cropped_ims = np.zeros((num_boxes, 48, 48, 3), dtype=np.float32)
        for i in range(num_boxes):
            tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
            tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
            cropped_ims[i, :, :, :] = (cv2.resize(tmp, (48, 48)) - 127.5) / 128

        cls_scores, reg, landmark = self.onet_detector.predict(cropped_ims)
        
        cls_scores = cls_scores[:, 1]
        keep_inds = np.where(cls_scores > self.thresh[2])[0]
        if len(keep_inds) > 0:
            boxes = dets[keep_inds]
            boxes[:, 4] = cls_scores[keep_inds]
            reg = reg[keep_inds]
            landmark = landmark[keep_inds]
        else:
            return None, None, None

        w = boxes[:, 2] - boxes[:, 0] + 1
        h = boxes[:, 3] - boxes[:, 1] + 1
        landmark[:, 0::2] = (np.tile(w, (5, 1)) * landmark[:, 0::2].T + np.tile(boxes[:, 0], (5, 1)) - 1).T
        landmark[:, 1::2] = (np.tile(h, (5, 1)) * landmark[:, 1::2].T + np.tile(boxes[:, 1], (5, 1)) - 1).T
        boxes_c = self.calibrate_box(boxes, reg)

        boxes = boxes[py_nms(boxes, 0.6)]
        keep = py_nms(boxes_c, 0.6)
        boxes_c = boxes_c[keep]
        landmark = landmark[keep]
        return boxes, boxes_c, landmark

    def processed_image(self, img, scale):
        """
        預處理數據,轉化圖像尺度並對像素歸一到[-1, 1]
        """
        height, width, channels = img.shape
        new_height = int(height * scale)  
        new_width = int(width * scale)  
        new_dim = (new_width, new_height)
        img_resized = cv2.resize(img, new_dim, interpolation=cv2.INTER_LINEAR) 
        img_resized = (img_resized - 127.5) / 128
        return img_resized

    def generate_bbox(self, cls_map, reg, scale, threshold):
        """
         得到對應原圖的box坐標,分類分數,box偏移量
         cls_map: n * m(輸入是cls_cls_map[:, :, 1], 第一維, 人臉框的概率.)
         reg: n * m * 4
        """

        # pnet大致將圖像size縮小2倍
        stride = 2

        cellsize = 12

        # 將置信度高的留下, 即為預測的人臉框. 二維的.
        t_index = np.where(cls_map > threshold)

        # 沒有人臉, 這里也可以是t_index[1].size
        # 使用np.where(二維數組), 得到包括兩個元素的列表, 第一個元素是第一維的坐標, 第二個元素是第二維的坐標.
        if t_index[0].size == 0:
            return np.array([])
        # 偏移量
        dx1, dy1, dx2, dy2 = [reg[t_index[0], t_index[1], i] for i in range(4)]

        reg = np.array([dx1, dy1, dx2, dy2])
        score = cls_map[t_index[0], t_index[1]]
        # 對應原圖的box坐標,分類分數,box偏移量
        # 原始圖片中回歸框坐標需要經過反向運算,計算方式如下,其中cellSize=12,是因為12*12的圖片進去后變成1*1
        # stride=2是因為幾層卷積中只有一個stride為2,scale代表的是我們在哪個尺度金字塔的圖像,
        boundingbox = np.vstack([np.round((stride * t_index[1]) / scale),
                                 np.round((stride * t_index[0]) / scale),
                                 np.round((stride * t_index[1] + cellsize) / scale),
                                 np.round((stride * t_index[0] + cellsize) / scale),
                                 score,
                                 reg])
        # shape[n,9]
        return boundingbox.T

    def pad(self, bboxes, w, h):
        """
        將超出圖像的box進行處理
        參數:
          bboxes: 人臉框
          w, h: 圖像長寬
        返回值:
          dy, dx : 為調整后的box的左上角坐標相對於原box左上角的坐標
          edy, edx : n為調整后的box右下角相對原box左上角的相對坐標
          y, x : 調整后的box在原圖上左上角的坐標
          ey, ex : 調整后的box在原圖上右下角的坐標
          tmph, tmpw: 原始box的長寬
        """
        # box的長寬
        tmpw, tmph = bboxes[:, 2] - bboxes[:, 0] + 1, bboxes[:, 3] - bboxes[:, 1] + 1
        num_box = bboxes.shape[0]

        dx, dy = np.zeros((num_box, )), np.zeros((num_box, ))
        edx, edy = tmpw.copy() - 1, tmph.copy() - 1
        # box左上右下的坐標
        x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]
        # 找到超出右下邊界的box並將ex, ey歸為圖像的w, h
        # edx, edy為調整后的box右下角相對原box左上角的相對坐標
        tmp_index = np.where(ex > w - 1)
        # w -1 + tmpw -1 - edx= ex -> edx = w + tmpw - ex - 2
        edx[tmp_index] = tmpw[tmp_index] + w - 2 - ex[tmp_index]
        ex[tmp_index] = w - 1

        tmp_index = np.where(ey > h - 1)
        # h -1 + tmph -1 - edy = ey -> edy = h + tmph - ey - 2
        edy[tmp_index] = tmph[tmp_index] + h - 2 - ey[tmp_index]
        ey[tmp_index] = h - 1
        # 找到超出左上角的box並將x,y歸為0
        # dx, dy為調整后的box的左上角坐標相對於原box左上角的坐標
        tmp_index = np.where(x < 0)
        dx[tmp_index] = 0 - x[tmp_index]
        x[tmp_index] = 0

        tmp_index = np.where(y < 0)
        dy[tmp_index] = 0 - y[tmp_index]
        y[tmp_index] = 0

        return_list = [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph]
        return_list = [item.astype(np.int32) for item in return_list]

        return return_list

    def calibrate_box(self, bbox, reg):
        """
        校准box
        參數:
          bbox: PNet生成的box
          reg: RNet生成的box偏移值
        返回值:
          調整后的box是針對原圖的絕對坐標
        """
        bbox_c = bbox.copy()
        w = bbox[:, 2] - bbox[:, 0] + 1
        w = np.expand_dims(w, 1)
        h = bbox[:, 3] - bbox[:, 1] + 1
        h = np.expand_dims(h, 1)
        reg_m = np.hstack([w, h, w, h])
        aug = reg_m * reg
        bbox_c[:, 0:4] = bbox_c[:, 0:4] + aug
        return bbox_c

    def detect(self, img):
        """
        用於測試當個圖像的
        """
        boxes = None

        # PNet
        if self.pnet_detector:
            boxes, boxes_c, _ = self.detect_pnet(img)
            if boxes_c is None:
                return np.array([]), np.array([])

        # RNet
        if self.rnet_detector:
            boxes, boxes_c, _ = self.detect_rnet(img, boxes_c)
            if boxes_c is None:
                return np.array([]), np.array([])

        # ONet
        if self.onet_detector:
            boxes, boxes_c, landmark = self.detect_onet(img, boxes_c)
            if boxes_c is None:
                return np.array([]), np.array([])

        return boxes_c, landmark
View Code

 

測試驗證過程:

python test.py

 

結果:

 

 

圖片數據來源網絡,僅供學習使用,如有侵權,請聯系刪除,謝謝!

 

參考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM