新手入門PaddlePaddle的一個簡單Demo——貓狗大戰
主要目的在於整體了解PP用卷積做圖像分類的流程,以及最最重要的掌握自定義數據集的讀取方式
貓狗數據集是從網絡上下載到工作目錄的。
本項目源地址。
import warnings
warnings.filterwarnings('ignore')
import imghdr # 判斷圖片格式用的
import random
import seaborn as sns
from time import time
import paddle.fluid as fluid
import paddle
import numpy as np
from PIL import Image
import os
from multiprocessing import cpu_count
import matplotlib.pyplot as plt
crop_size = 300 # 剪切尺寸,最后圖片的size是這個值,不是resize_size
resize_size = 320
is_color = True # 是否加載彩色圖
USE_GPU = True # 是否使用GPU
BUF_SIZE = 528
BATCH_SIZE = 128 # 每批次圖片數量
EPOCH = 20 # 訓練次數
生成圖像列表
data_path_cat = '/home/aistudio/work/catdog/PetImages/Cat'
data_path_dog = '/home/aistudio/work/catdog/PetImages/Dog'
data_path = [data_path_cat, data_path_dog]
train_img_num = 0
test_img_num = 0
with open('./train_data.list', 'w') as f_train:
with open('./test_data.list', 'w') as f_test:
for label,path in enumerate(data_path):
data_imgs = os.listdir(path)
for i in range(len(data_imgs)):
try:
img_path = os.path.join(path, data_imgs[i]) # 合成路徑
img_type = imghdr.what(img_path) # 獲取圖片類型
if (img_type=='jpeg')|(img_type=='jpg'): # jpeg/jpg格式圖片保存
img_arr = np.array(Image.open(img_path)) # 獲取圖片數據形式
if len(img_arr.shape) != 2: # 非彩色圖不要
if i % 10 == 0:
test_img_num += 1
f_test.write(img_path + "\t" + str(label) + '\n')
else:
train_img_num += 1
f_train.write(img_path + "\t" + str(label) + '\n')
except:
pass
print('圖像列表已生成。')
print(f'訓練圖片{train_img_num}張,測試圖片{test_img_num}張。')
圖像列表已生成。
訓練圖片22256張,測試圖片2473張。
定義讀取數據
PaddlePaddle讀取訓練和測試數據都是通過reader來讀取的,所以我們要自定義一個reader。首先我們定義一個train_mapper()函數,這個函數是對圖片進行預處理的。比如通過 paddle.dataset.image.simple_transform接口對圖片進行壓縮然后裁剪,和灰度化,當參數is_train為True時就會隨機裁剪,否則為中心裁剪,一般測試和預測都是中心裁剪。train_r()函數是從上一部分生成的圖像列表中讀取圖片路徑和標簽,然后把圖片路徑傳遞給train_mapper()函數進行預處理。同樣的測試數據也是相同的操作。
def train_mapper(sample): # 映射器
img, label = sample
img = paddle.dataset.image.load_image(file=img, is_color=is_color) # 加載file路徑下彩色或者灰色圖像
img = paddle.dataset.image.simple_transform(im=img,
resize_size=resize_size, crop_size=crop_size,
is_color=is_color, is_train=True) # 簡單的圖像變換
img = img.astype('float32') / 255.0
return img,label
def train_r(train_list_path): # 這種函數里面定義函數的現象好像叫做閉包,有興趣可以百度一下
def reader():
with open(train_list_path, 'r') as f:
lines = f.readlines()
random.shuffle(lines) # 非常重要,對模型影響很大
for line in lines:
img, label = line.split('\t') # 分離路徑和標簽
yield img, int(label) # 構建生成器
return paddle.reader.xmap_readers(train_mapper,reader,cpu_count(),256) # 數據映射
# xmap_readers(): 通過多線程方式,通過用戶自定義的映射器mapper來映射reader返回的樣本(到輸出隊列)。
# 映射我清楚什么意思,但是我猜可能是從保存圖像路徑的集合中加載出每個路徑對應的經過簡單變化的圖像數據
# mapper(callable) - 一種映射reader數據的函數。
# reader(callable) - 產生數據的reader。
# process_num(int) - 用於處理樣本的線程數目。
# buffer_size(int) - 存有待讀取數據的隊列的大小。
# 個人理解:reader返回的是圖像的路徑以及標簽的生成器,對這些返回值經過train_mapper函數映射出圖像數據
def test_mapper(sample): # sample估計就是reader返回的img,label
img, label = sample
img = paddle.dataset.image.load_image(file=img, is_color=is_color)
img = paddle.dataset.image.simple_transform(im=img,
resize_size=resize_size, crop_size=crop_size,
is_color=is_color, is_train=False)
img = img.astype('float32') / 255.0
return img, label
def test_r(test_list_path):
def reader():
with open(test_list_path, 'r') as f:
lines = f.readlines()
random.shuffle(lines)
for line in lines:
img, label = line.split('\t')
yield img, int(label)
return paddle.reader.xmap_readers(test_mapper, reader, cpu_count(), 256)
定義卷積神經網絡
def cnn(ipt):
# 第一個卷積-池化層
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=ipt, # 輸入圖像
filter_size=5, # 濾波器的大小
num_filters=20, # filter 的數量。它與輸出的通道相同
pool_size=2, # 池化核大小2*2
pool_stride=2, # 池化步長
act="relu") # 激活類型
conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
# 第二個卷積-池化層
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu")
conv_pool_2 = fluid.layers.batch_norm(conv_pool_2)
# 第三個卷積-池化層
conv_pool_3 = fluid.nets.simple_img_conv_pool(
input=conv_pool_2,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu")
# 以softmax為激活函數的全連接輸出層,10類數據輸出10個數字
prediction = fluid.layers.fc(input=conv_pool_3, size=2, act='softmax')
return prediction
獲取網絡
通過上面定義的卷積神經網絡獲取一個分類器,網絡的輸入層是通過fluid.layers.data()接口定義的,例如輸入的形狀為[1,128,128],表示單通道,寬度和高度都是28的圖。
c = 3 if is_color else 1
image = fluid.layers.data(name='image', shape=[c,crop_size,crop_size], dtype='float32')
net = cnn(image)
定義損失函數
使用交叉熵函數,fluid.layers.cross_entropy,還是用了fluid.layers.accuracy接口,方便在訓練和測試時輸出平均值。
label = fluid.layers.data(name='label', shape=[1], dtype='int64') # 訓練數據標簽
cost = fluid.layers.cross_entropy(input=net, label=label) # 傳入網絡和對應標簽,一個batch的損失值
avg_cost = fluid.layers.mean(cost) # 對一個batch的損失值求得平均值
acc = fluid.layers.accuracy(input=net, label=label)
克隆測試程序
在定義損失之后和定義優化方法之前從主程序中克隆一個測試程序
test_program = fluid.default_main_program().clone(for_test=True)
定義優化方法
使用Adam優化方法
optimizer = fluid.optimizer.AdamOptimizer(learning_rate=0.001)
opt = optimizer.minimize(avg_cost) # 通過優化方法不斷使avg_cost盡量小
創建執行器
place = fluid.CUDAPlace(0) if USE_GPU else fluid.CPUPlace() # 使用CPU執行訓練
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
[]
把圖片數據生成reader
把上面定義的reader按照設置的大小得到每一個batch的reader
train_reader = paddle.batch( # 此操作創建一個分批讀取器,該成批讀取器將輸入讀取器中的數據組合到成批數據中。
# 多以說train_reader是一個讀取器
reader=paddle.reader.shuffle( # 創建一個經過裝飾的讀取器
reader=train_r('./train_data.list'),
buf_size=BUF_SIZE),
batch_size=BATCH_SIZE) # 訓練數據每BATCH_SIZE張照片為一個批次
test_reader = paddle.batch(
reader=paddle.reader.shuffle(
reader=test_r('./test_data.list'),
buf_size=BUF_SIZE),
batch_size=BATCH_SIZE)
定義輸入數據的維度
第一個是圖片數據,第二個是標簽
feeder = fluid.DataFeeder(place=place, feed_list=[image, label])
開始訓練
在每一個Pass訓練完之后都使用測試集數據測試一下模型准確率。
training_costs = []
training_accs = []
testing_costs = []
testing_accs = []
start = time()
for pass_id in range(1,EPOCH+1):
train_costs, train_accs = [], []
for batch_id, data in enumerate(train_reader()):
train_cost, train_acc = exe.run(
program=fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost, acc])
train_costs.append(train_cost)
train_accs.append(train_acc)
training_costs.append(train_cost)
training_accs.append(train_acc)
train_cost = sum(train_costs) / len(train_costs)
train_acc = sum(train_accs) / len(train_accs)
# print("Pass:%d\tTrain_cost:%.5f\tAccuracy:%.5f" % (pass_id, train_cost, train_acc)) # 每批次結束時輸出
test_costs, test_accs = [], []
for batch_id, data in enumerate(test_reader()):
test_cost, test_acc = exe.run(
program=test_program,
feed=feeder.feed(data),
fetch_list=[avg_cost, acc])
test_costs.append(test_cost)
test_accs.append(test_acc)
testing_costs.append(test_cost)
testing_accs.append(test_acc)
test_cost = sum(test_costs) / len(test_costs)
test_acc = sum(test_accs) / len(test_accs)
print("\nPass:%d \tTrain_cost:%.5f\tTrain_acc:%.5f\tTest_cost:%.5f\tTest_acc:%.5f\n"
%(pass_id,train_cost,train_acc,test_cost,test_acc))
end = time()
Pass:1 Train_cost:1.80947 Train_acc:0.58116 Test_cost:0.61831 Test_acc:0.64787
Pass:2 Train_cost:0.63346 Train_acc:0.66226 Test_cost:0.58241 Test_acc:0.71032
Pass:3 Train_cost:0.56434 Train_acc:0.71170 Test_cost:0.53824 Test_acc:0.72857
Pass:4 Train_cost:0.52023 Train_acc:0.74307 Test_cost:0.47614 Test_acc:0.77935
Pass:5 Train_cost:0.49171 Train_acc:0.76639 Test_cost:0.48825 Test_acc:0.77423
Pass:6 Train_cost:0.45704 Train_acc:0.78754 Test_cost:0.41720 Test_acc:0.81207
Pass:7 Train_cost:0.42528 Train_acc:0.80819 Test_cost:0.39377 Test_acc:0.82823
Pass:8 Train_cost:0.40764 Train_acc:0.81685 Test_cost:0.38794 Test_acc:0.82759
Pass:9 Train_cost:0.38059 Train_acc:0.83253 Test_cost:0.34883 Test_acc:0.85167
Pass:10 Train_cost:0.35983 Train_acc:0.84485 Test_cost:0.37238 Test_acc:0.83126
Pass:11 Train_cost:0.34370 Train_acc:0.85182 Test_cost:0.33813 Test_acc:0.85684
Pass:12 Train_cost:0.33027 Train_acc:0.86123 Test_cost:0.29966 Test_acc:0.87711
Pass:13 Train_cost:0.30136 Train_acc:0.87309 Test_cost:0.28069 Test_acc:0.87984
Pass:14 Train_cost:0.30478 Train_acc:0.87161 Test_cost:0.29137 Test_acc:0.87901
Pass:15 Train_cost:0.29061 Train_acc:0.87896 Test_cost:0.30017 Test_acc:0.87369
Pass:16 Train_cost:0.27127 Train_acc:0.88819 Test_cost:0.26217 Test_acc:0.89092
Pass:17 Train_cost:0.26203 Train_acc:0.89054 Test_cost:0.24602 Test_acc:0.89326
Pass:18 Train_cost:0.24977 Train_acc:0.90027 Test_cost:0.28517 Test_acc:0.88028
Pass:19 Train_cost:0.24926 Train_acc:0.89830 Test_cost:0.26093 Test_acc:0.88975
Pass:20 Train_cost:0.23507 Train_acc:0.90277 Test_cost:0.24906 Test_acc:0.89268
print(f"用時{end-start}s")
用時1483.9844856262207s
training_costs = [x[0] for x in training_costs]
training_accs = [x[0] for x in training_accs]
testing_costs = [x[0] for x in testing_costs]
testing_accs = [x[0] for x in testing_accs]
plt.figure(figsize=(16,10))
plt.subplot(221)
plt.title('train_cost')
sns.lineplot(x=list(range(1,1+len(training_costs))),y=training_costs)
plt.subplot(222)
plt.title('train_acc')
sns.lineplot(x=list(range(1,1+len(training_accs))),y=training_accs,color='r')
plt.subplot(223)
plt.title('test_cost')
sns.lineplot(x=list(range(1,1+len(testing_costs))),y=testing_costs,color='c')
plt.subplot(224)
plt.title('test_acc')
sns.lineplot(x=list(range(1,1+len(testing_accs))),y=testing_accs,color='k')

保存模型
save_model_dir = "./model/Cat&Dog"
if not os.path.exists(save_model_dir):
os.makedirs(save_model_dir)
print(f"Saved model to {save_model_dir}")
## 保存模型到指定目錄
fluid.io.save_inference_model(dirname=save_model_dir,
feeded_var_names=['image'], # 字符串列表,預測期間需要喂入的數據變量名稱
target_vars=[net], # 變量列表,從net中我們可以得到推理結果
executor=exe) # 保存模型的推理程序
Saved model to ./model/Cat&Dog
['save_infer_model/scale_0']
## 加載模型並預測
```python
## 構建測試用的執行器
infer_exe = fluid.Executor(place)
## 指定作用域
inference_scope = fluid.core.Scope()
with fluid.scope_guard(scope=inference_scope):
# load model
save_model_dir = "./model/Cat&Dog"
[inference_program, # 預測程序
feed_target_names, # 需要在推理時提供的數據名稱 字符串列表
fetch_targets] = fluid.io.load_inference_model( # 從fetch_traget中可以得到預測結果 變量列表
dirname=save_model_dir, executor=infer_exe)
infer_reader = paddle.batch(reader=paddle.reader.shuffle(
reader=test_r('./test_data.list'),
buf_size=BUF_SIZE),
batch_size=BATCH_SIZE)
all_result = [] # 保存測試結果
test_y = [] # 保存真實標簽
for data in infer_reader():
test_x = np.array([x[0] for x in data]).astype("float32") # 提取圖片數據
test_y.append([x[1] for x in data]) # 提取圖片標簽
result = infer_exe.run(program=inference_program,
feed={feed_target_names[0]:test_x},
fetch_list=fetch_targets)
all_result.append(result[0])
## 整理預測結果和真實標簽成一維numpy.ndarray
infer_lab = np.array([np.argmax(x) for batch in all_result for x in batch]).astype("int32")
true_lab = np.array([x for batch in test_y for x in batch]).astype("int32")
print(f"預測准確率為:{sum(infer_lab==true_lab)/len(true_lab)}")
預測准確率為:0.8940558026688233
網上隨便下載數據預測
def load_img(img):
img = paddle.dataset.image.load_image(file=img, is_color=is_color)
img = paddle.dataset.image.simple_transform(im=img,
resize_size=resize_size, crop_size=crop_size,
is_color=is_color, is_train=False)
img = img.astype('float32') / 255.0
return img
place = fluid.CPUPlace()
infer_exe = fluid.Executor(place)
inference_scope = fluid.core.Scope()
with fluid.scope_guard(scope=inference_scope):
[inference_program,
feed_target_names,
fetch_targets] = fluid.io.load_inference_model(
dirname="./model/Cat&Dog", executor=infer_exe)
test_imgs_dir = './work/catdog/test_imgs'
img_data,label,img_paths = [],[],[]
for img_name in os.listdir(test_imgs_dir):
label.append(0 if 'cat' in img_name else 1)
img_path = os.path.join(test_imgs_dir, img_name)
img_paths.append(img_path)
img_data.append(load_img(img_path))
img_data = np.array(img_data).astype("float32")
result = infer_exe.run(program=inference_program,
feed={feed_target_names[0]:img_data},
fetch_list=fetch_targets)
infer_label = [np.argmax(x) for x in result[0]]
plt.figure(figsize=(15,9))
for i in range(len(infer_label)):
plt.subplot(3,5,i+1)
plt.title(f"INFER:{infer_label[i]} TRUE:{label[i]}")
plt.imshow(np.array(Image.open(img_paths[i])))
plt.show()

