這一篇將會介紹卷積神經網絡 (CNN),CNN 模型非常適合用來進行圖片相關的學習,例如圖片分類和驗證碼識別,也可以配合其他模型實現 OCR。
使用 Python 處理圖片
在具體介紹 CNN 之前,我們先來看看怎樣使用 Python 處理圖片。Python 處理圖片最主要使用的類庫是 Pillow (Python2 PIL 的 fork),使用以下命令即可安裝:
pip3 install Pillow
一些簡單操作的例子如下,如果你想了解更多可以參考 Pillow 的文檔:
# 打開圖片
>>> from PIL import Image
>>> img = Image.open("1.png")
# 查看圖片信息
>>> img.size
(175, 230)
>>> img.mode
'RGB'
>>> img
<PIL.PngImagePlugin.PngImageFile image mode=RGB size=175x230 at 0x10B807B50>
# 縮放圖片
>>> img1 = img.resize((20, 30))
>>> img1
<PIL.Image.Image image mode=RGB size=20x30 at 0x106426FD0>
# 裁剪圖片
>>> img2 = img.crop((0, 0, 16, 16))
>>> img2
<PIL.Image.Image image mode=RGB size=16x16 at 0x105E0EFD0>
# 保存圖片
>>> img1.save("11.png")
>>> img2.save("12.png")
使用 pytorch 處理圖片時要首先獲取圖片的數據,即各個像素對應的顏色值,例如大小為 175 * 230,模式是 RGB 的圖片會擁有 175 * 230 * 3 的數據,3 分別代表紅綠藍的值,范圍是 0 ~ 255,把圖片轉換為 pytorch 的 tensor 對象需要經過 numpy 中轉,以下是轉換的例子:
>>> import numpy
>>> import torch
>>> v = numpy.asarray(img)
>>> t = torch.tensor(v)
>>> t
tensor([[[255, 253, 254],
[255, 253, 254],
[255, 253, 254],
...,
[255, 253, 254],
[255, 253, 254],
[255, 253, 254]],
[[255, 253, 254],
[255, 253, 254],
[255, 253, 254],
...,
[255, 253, 254],
[255, 253, 254],
[255, 253, 254]],
[[255, 253, 254],
[255, 253, 254],
[255, 253, 254],
...,
[255, 253, 254],
[255, 253, 254],
[255, 253, 254]],
...,
[[255, 253, 254],
[255, 253, 254],
[255, 253, 254],
...,
[255, 253, 254],
[255, 253, 254],
[255, 253, 254]],
[[255, 253, 254],
[255, 253, 254],
[255, 253, 254],
...,
[255, 253, 254],
[255, 253, 254],
[255, 253, 254]],
[[255, 253, 254],
[255, 253, 254],
[255, 253, 254],
...,
[255, 253, 254],
[255, 253, 254],
[255, 253, 254]]], dtype=torch.uint8)
>>> t.shape
torch.Size([230, 175, 3])
可以看到 tensor 的維度是 高度 x 寬度 x 通道數
(RGB 圖片為 3,黑白圖片為 1),可是 pytorch 的 CNN 模型會要求維度為 通道數 x 寬度 x 高度
,並且數值應該正規化到 0 ~ 1 的范圍內,使用以下代碼可以實現:
# 交換維度 0 (高度) 和 維度 2 (通道數)
>>> t1 = t.transpose(0, 2)
>>> t1.shape
torch.Size([3, 175, 230])
>>> t2 = t1 / 255.0
>>> t2
tensor([[[1.0000, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[1.0000, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[1.0000, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],
...,
[1.0000, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[1.0000, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[1.0000, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000]],
[[0.9922, 0.9922, 0.9922, ..., 0.9922, 0.9922, 0.9922],
[0.9922, 0.9922, 0.9922, ..., 0.9922, 0.9922, 0.9922],
[0.9922, 0.9922, 0.9922, ..., 0.9922, 0.9922, 0.9922],
...,
[0.9922, 0.9922, 0.9922, ..., 0.9922, 0.9922, 0.9922],
[0.9922, 0.9922, 0.9922, ..., 0.9922, 0.9922, 0.9922],
[0.9922, 0.9922, 0.9922, ..., 0.9922, 0.9922, 0.9922]],
[[0.9961, 0.9961, 0.9961, ..., 0.9961, 0.9961, 0.9961],
[0.9961, 0.9961, 0.9961, ..., 0.9961, 0.9961, 0.9961],
[0.9961, 0.9961, 0.9961, ..., 0.9961, 0.9961, 0.9961],
...,
[0.9961, 0.9961, 0.9961, ..., 0.9961, 0.9961, 0.9961],
[0.9961, 0.9961, 0.9961, ..., 0.9961, 0.9961, 0.9961],
[0.9961, 0.9961, 0.9961, ..., 0.9961, 0.9961, 0.9961]]])
之后就可以圍繞類似上面例子中 t2
這樣的 tensor 對象做文章了🥳。
卷積神經網絡 (CNN)
卷積神經網絡 (CNN) 會從圖片的各個部分提取特征,然后再從一級特征提取二級特征,如有必要再提取三級特征 (以此類推),提取結束以后扁平化到最終特征,然后使用多層或單層線性模型來實現分類識別。提取各級特征會使用卷積層 (Convolution Layer) 和池化層 (Pooling Layer),提取特征時可以選擇添加通道數量以增加各個部分的信息量,分類識別最終特征使用的線性模型又稱全連接層 (Fully Connected Layer),下圖是流程示例:
之前的文章介紹線性模型和遞歸模型的時候我使用了數學公式,但只用數學公式說明 CNN 將會非常難以理解,所以接下來我會伴隨例子逐步講解各個層具體做了怎樣的運算。
卷積層 (Convolution Layer)
卷積層會對圖片的各個部分做矩陣乘法操作,然后把結果作為一個新的矩陣,每個卷積層有兩個主要的參數,一個是內核大小 (kernel_size
),一個是處理間隔 (stride
),下圖是一個非常簡單的計算流程例子:
如果增加處理間隔會怎樣呢?下圖展示了不同處理間隔的計算部分和輸出結果維度的區別:
我們可以看到處理間隔決定了每次向右或者向下移動的距離,輸出長度可以使用公式 (長度 - 內核大小) / 處理間隔 + 1
計算,輸出寬度可以使用公式 (長度 - 內核大小) / 處理間隔 + 1
計算。
現在再來看看 pytorch 中怎樣使用卷積層,創建卷積層可以使用 torch.nn.Conv2d
:
# 創建卷積層,入通道 = 1,出通道 = 1,內核大小 = 2,處理間隔 = 1
>>> conv2d = torch.nn.Conv2d(in_channels = 1, out_channels = 1, kernel_size = 2, stride = 1)
# 查看卷積層內部的參數,第一個是內核對應的權重矩陣,第二個是偏移值
>>> p = list(conv2d.parameters())
>>> p
[Parameter containing:
tensor([[[[-0.0650, -0.0575],
[-0.0313, -0.3539]]]], requires_grad=True), Parameter containing:
tensor([0.1482], requires_grad=True)]
# 現在生成一個 5 x 5,單通道的圖片數據,為了方便理解這里使用了 1 ~ 25,實際應該使用 0 ~ 1 之間的值
>>> x = torch.tensor(list(range(1, 26)), dtype=torch.float).reshape(1, 1, 5, 5)
>>> x
tensor([[[[ 1., 2., 3., 4., 5.],
[ 6., 7., 8., 9., 10.],
[11., 12., 13., 14., 15.],
[16., 17., 18., 19., 20.],
[21., 22., 23., 24., 25.]]]])
# 使用卷積層計算輸出
>>> y = conv2d(x)
>>> y
tensor([[[[ -2.6966, -3.2043, -3.7119, -4.2196],
[ -5.2349, -5.7426, -6.2502, -6.7579],
[ -7.7732, -8.2809, -8.7885, -9.2962],
[-10.3115, -10.8192, -11.3268, -11.8345]]]],
grad_fn=<MkldnnConvolutionBackward>)
# 我們可以模擬一下處理單個部分的計算,看看和上面的輸出是否一致
# 第 1 部分
>>> x[0,0,0:2,0:2]
tensor([[1., 2.],
[6., 7.]])
>>> (p[0][0,0,:,:] * x[0,0,0:2,0:2]).sum() + p[1]
tensor([-2.6966], grad_fn=<AddBackward0>)
# 第 2 部分
>>> x[0,0,0:2,1:3]
tensor([[2., 3.],
[7., 8.]])
>>> (p[0][0,0,:,:] * x[0,0,0:2,1:3]).sum() + p[1]
tensor([-3.2043], grad_fn=<AddBackward0>)
# 第 3 部分
>>> (p[0][0,0,:,:] * x[0,0,0:2,2:4]).sum() + p[1]
tensor([-3.7119], grad_fn=<AddBackward0>)
# 一致吧🥳
到這里你應該了解單通道的卷積層是怎樣計算的,那么多通道呢?如果有多個入通道,那么卷積層的權重矩陣會相應有多份,如果有多個出通道,那么卷積層的權重矩陣數量也會乘以出通道的倍數,例如有 3 個入通道,2 個出通道時,卷積層的權重矩陣會有 6 個 (3 * 2
),偏移值會有 2 個,計算規則如下:
部分輸出[出通道1] = 部分輸入[入通道1] * 權重矩陣[0][0] + 部分輸入[入通道2] * 權重矩陣[0][1] + 部分輸入[入通道3] * 權重矩陣[0][2] + 偏移值1
部分輸出[出通道2] = 部分輸入[入通道1] * 權重矩陣[1][0] + 部分輸入[入通道2] * 權重矩陣[1][1] + 部分輸入[入通道3] * 權重矩陣[1][2] + 偏移值2
從計算規則可以看出,出通道越多每個部分可提取的特征數量 (信息量) 也就越多,但計算量也會相應增大。
最后看看卷積層的數學公式 (基本和 pytorch 文檔的公式相同),現在應該可以理解了吧🤢?
池化層 (Pooling Layer)
池化層的處理比較好理解,它會對每個圖片每個區域進行求最大值或者求平均值等運算,如下圖所示:
現在再來看看 pytorch 中怎樣使用卷積層,創建求最大值的池化層可以使用 torch.nn.MaxPool2d
,創建求平均值的池化層可以使用 torch.nn.AvgPool2d
:
# 創建池化層,內核大小 = 2,處理間隔 = 2
>>> maxPool = torch.nn.MaxPool2d(2, stride=2)
# 生成一個 6 x 6,單通道的圖片數據
>>> x = torch.tensor(range(1, 37), dtype=float).reshape(1, 1, 6, 6)
>>> x
tensor([[[[ 1., 2., 3., 4., 5., 6.],
[ 7., 8., 9., 10., 11., 12.],
[13., 14., 15., 16., 17., 18.],
[19., 20., 21., 22., 23., 24.],
[25., 26., 27., 28., 29., 30.],
[31., 32., 33., 34., 35., 36.]]]], dtype=torch.float64)
# 使用池化層計算輸出
>>> maxPool(x)
tensor([[[[ 8., 10., 12.],
[20., 22., 24.],
[32., 34., 36.]]]], dtype=torch.float64)
# 很好理解吧🥳
# 創建和使用求平均值的池化層也很簡單
>>> avgPool = torch.nn.AvgPool2d(2, stride=2)
>>> avgPool(x)
tensor([[[[ 4.5000, 6.5000, 8.5000],
[16.5000, 18.5000, 20.5000],
[28.5000, 30.5000, 32.5000]]]], dtype=torch.float64)
全連接層 (Fully Connected Layer)
全連接層實際上就是多層或單層線性模型,但把特征傳到全連接層之前還需要進行扁平化 (Flatten),例子如下所示:
# 模擬創建一個批次數量為 2,通道數為 3,長寬各為 2 的特征
>>> x = torch.rand((2, 3, 2, 2))
>>> x
tensor([[[[0.6395, 0.6240],
[0.4194, 0.6054]],
[[0.4798, 0.4690],
[0.2647, 0.6087]],
[[0.5727, 0.7567],
[0.8287, 0.1382]]],
[[[0.7903, 0.8635],
[0.0053, 0.6417]],
[[0.7093, 0.7740],
[0.3115, 0.7587]],
[[0.5875, 0.8268],
[0.2923, 0.6016]]]])
# 對它進行扁平化,維度會變為 批次數量, 通道數*長*寬
>>> x_flatten = x.view(x.shape[0], -1)
>>> x_flatten
tensor([[0.6395, 0.6240, 0.4194, 0.6054, 0.4798, 0.4690, 0.2647, 0.6087, 0.5727,
0.7567, 0.8287, 0.1382],
[0.7903, 0.8635, 0.0053, 0.6417, 0.7093, 0.7740, 0.3115, 0.7587, 0.5875,
0.8268, 0.2923, 0.6016]])
# 之后再傳給線性模型即可
>>> linear = torch.nn.Linear(in_features=12, out_features=2)
>>> linear(x_flatten)
tensor([[-0.3067, -0.5534],
[-0.1876, -0.6523]], grad_fn=<AddmmBackward>)
填充處理
在看前面提到的卷積層操作的時候,你可能會發現如果處理間隔 (stride) 小於內核大小 (kernel_size),那么圖片邊緣的像素參與運算的次數會比圖片中間的像素要少,也就是說圖片邊緣對運算結果的影響會更小,如果圖片邊緣的信息同樣比較重要,那么就會影響預測輸出的精度。為了解決這個問題發明的就是填充處理,填充處理簡單的來說就是在卷積層初期前給圖片的周邊添加 0,如果填充量等於 1,那么長寬會各增加 2,如下圖所示:
在 pytorch 中添加填充處理可以在創建 Conv2d
的時候指定 padding
參數:
# 創建卷積層,入通道 = 1,出通道 = 1,內核大小 = 2,處理間隔 = 1, 填充量 = 1
>>> conv2d = torch.nn.Conv2d(in_channels = 1, out_channels = 1, kernel_size = 2, stride = 1, padding = 1)
使用 CNN 實現圖片分類 (LeNet)
接下來我們試試使用 CNN 實現圖片分類,也就是給出一張圖片讓程序識別里面的是什么東西,使用的數據集是 cifar-10,這是一個很經典的數據集,包含了 60000 張 32x32 的小圖片,圖片有十個分類 (飛機,汽車,鳥,貓,鹿,狗,青蛙,馬,船,貨車),官方下載地址在這里。
需要注意的是,官方下載地址只包含二進制數據,通常很多文章或者教程都會讓我們使用 torchvision.datasets.CIFAR10
等現成的加載器來加載這個數據集,但我不推薦使用這種方法,因為如果我們需要訓練實際業務上的數據,那么肯定不會有現成的加載器可以用,還是得一張張圖片的加載和轉換。所以這里我使用了 cifar-10 的原始圖片庫,然后演示怎樣從代碼加載圖片和標簽,然后轉換到訓練使用的 tensor 對象。
以下的代碼使用了 LeNet 模型,這是 30 年前就已經被提出的模型,結構和本文第一個圖片介紹的一樣。此外還有一些需要注意的地方:
- cifar-10 官方默認划分了 50000 張圖片作為訓練集,10000 張圖片作為驗證集;而我的代碼划分了 48000 張圖片作為訓練集,6000 張圖片作為驗證集,6000 張圖片作為測試集,所以正確率等數據會和其他文章或者論文不一致
- 訓練時的損失計算器使用了
CrossEntropyLoss
, 這個計算器的特征是要求預測輸出是 onehot,實際輸出是索引值 (只有一個分類是正確輸出),例如圖片分類為鳥
時,預測輸出應該為[0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
實際輸出應該為2
- 轉換各個分類的數值到概率使用了 Softmax 函數, 這個函數必須放在模型之外,如果放在模型內部會導致訓練效果變差,因為
CrossEntropyLoss
損失計算器會盡量讓正確輸出的數值更高,錯誤輸出的數值更低,而不是分別接近 1 和 0,使用 softmax 會干擾損失的計算
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot
# 分析目標的圖片大小,全部圖片都會先縮放到這個大小
IMAGE_SIZE = (32, 32)
# 分析目標的圖片所在的文件夾
IMAGE_DIR = "./cifar"
# 包含所有圖片標簽的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"
class MyModel(nn.Module):
"""圖片分類 (LeNet)"""
def __init__(self, num_labels):
super().__init__()
# 卷積層和池化層
self.cnn_model = nn.Sequential(
nn.Conv2d(3, 6, kernel_size=5), # 維度: B,3,32,32 => B,6,28,28
nn.ReLU(),
nn.MaxPool2d(2, stride=2), # 維度: B,6,14,14
nn.Conv2d(6, 16, kernel_size=5), # 維度: B,16,10,10
nn.ReLU(),
nn.MaxPool2d(2, stride=2) # 維度: B,16,5,5
)
# 全連接層
self.fc_model = nn.Sequential(
nn.Linear(16 * 5 * 5, 120), # 維度: B,120
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(120, 60), # 維度: B,60
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(60, num_labels), # 維度: B,num_labels
)
def forward(self, x):
# 應用卷積層和池化層
cnn_features = self.cnn_model(x)
# 扁平化輸出的特征
cnn_features_flatten = cnn_features.view(cnn_features.shape[0], -1)
# 應用全連接層
y = self.fc_model(cnn_features_flatten)
return y
def save_tensor(tensor, path):
"""保存 tensor 對象到文件"""
torch.save(tensor, gzip.GzipFile(path, "wb"))
def load_tensor(path):
"""從文件讀取 tensor 對象"""
return torch.load(gzip.GzipFile(path, "rb"))
def image_to_tensor(img):
"""轉換圖片對象到 tensor 對象"""
in_img = img.resize(IMAGE_SIZE)
arr = numpy.asarray(in_img)
t = torch.from_numpy(arr)
t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
t = t / 255.0 # 正規化數值使得范圍在 0 ~ 1
return t
def load_image_labels():
"""讀取圖片分類列表"""
return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))
def prepare_save_batch(batch, tensor_in, tensor_out):
"""准備訓練 - 保存單個批次的數據"""
# 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
random_indices = torch.randperm(tensor_in.shape[0])
training_indices = random_indices[:int(len(random_indices)*0.8)]
validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
testing_indices = random_indices[int(len(random_indices)*0.9):]
training_set = (tensor_in[training_indices], tensor_out[training_indices])
validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])
# 保存到硬盤
save_tensor(training_set, f"data/training_set.{batch}.pt")
save_tensor(validating_set, f"data/validating_set.{batch}.pt")
save_tensor(testing_set, f"data/testing_set.{batch}.pt")
print(f"batch {batch} saved")
def prepare():
"""准備訓練"""
# 數據集轉換到 tensor 以后會保存在 data 文件夾下
if not os.path.isdir("data"):
os.makedirs("data")
# 准備圖片分類到序號的索引
labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }
# 查找所有圖片
image_paths = []
for root, dirs, files in os.walk(IMAGE_DIR):
for filename in files:
path = os.path.join(root, filename)
if not path.endswith(".png"):
continue
# 分類名稱在文件名中,例如
# 2598_cat.png => cat
label = filename.split(".")[0].split("_")[1]
label_index = labels_to_index.get(label)
if label_index is None:
continue
image_paths.append((path, label_index))
# 打亂圖片順序
random.shuffle(image_paths)
# 分批讀取和保存圖片
batch_size = 1000
for batch in range(0, len(image_paths) // batch_size):
image_tensors = []
image_labels = []
for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
with Image.open(path) as img:
t = image_to_tensor(img)
image_tensors.append(t)
image_labels.append(label_index)
tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
tensor_out = torch.tensor(image_labels) # 維度: B
prepare_save_batch(batch, tensor_in, tensor_out)
def train():
"""開始訓練"""
# 創建模型實例
num_labels = len(load_image_labels())
model = MyModel(num_labels)
# 創建損失計算器
# 計算單分類輸出最好使用 CrossEntropyLoss, 多分類輸出最好使用 BCELoss
# 使用 CrossEntropyLoss 時實際輸出應該為標簽索引值,不需要轉換為 onehot
loss_function = torch.nn.CrossEntropyLoss()
# 創建參數調整器
optimizer = torch.optim.Adam(model.parameters())
# 記錄訓練集和驗證集的正確率變化
training_accuracy_history = []
validating_accuracy_history = []
# 記錄最高的驗證集正確率
validating_accuracy_highest = -1
validating_accuracy_highest_epoch = 0
# 讀取批次的工具函數
def read_batches(base_path):
for batch in itertools.count():
path = f"{base_path}.{batch}.pt"
if not os.path.isfile(path):
break
yield load_tensor(path)
# 計算正確率的工具函數
def calc_accuracy(actual, predicted):
# 把最大的值當作正確分類,然后比對有多少個分類相等
predicted_labels = predicted.argmax(dim=1)
acc = (actual == predicted_labels).sum().item() / actual.shape[0]
return acc
# 划分輸入和輸出的工具函數
def split_batch_xy(batch, begin=None, end=None):
# shape = batch_size, channels, width, height
batch_x = batch[0][begin:end]
# shape = batch_size
batch_y = batch[1][begin:end]
return batch_x, batch_y
# 開始訓練過程
for epoch in range(1, 10000):
print(f"epoch: {epoch}")
# 根據訓練集訓練並修改參數
# 切換模型到訓練模式,將會啟用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.train()
training_accuracy_list = []
for batch_index, batch in enumerate(read_batches("data/training_set")):
# 切分小批次,有助於泛化模型
training_batch_accuracy_list = []
for index in range(0, batch[0].shape[0], 100):
# 划分輸入和輸出
batch_x, batch_y = split_batch_xy(batch, index, index+100)
# 計算預測值
predicted = model(batch_x)
# 計算損失
loss = loss_function(predicted, batch_y)
# 從損失自動微分求導函數值
loss.backward()
# 使用參數調整器調整參數
optimizer.step()
# 清空導函數值
optimizer.zero_grad()
# 記錄這一個批次的正確率,torch.no_grad 代表臨時禁用自動微分功能
with torch.no_grad():
training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
# 輸出批次正確率
training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
training_accuracy_list.append(training_batch_accuracy)
print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
training_accuracy_history.append(training_accuracy)
print(f"training accuracy: {training_accuracy}")
# 檢查驗證集
# 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.eval()
validating_accuracy_list = []
for batch in read_batches("data/validating_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
validating_accuracy_history.append(validating_accuracy)
print(f"validating accuracy: {validating_accuracy}")
# 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練后仍然沒有刷新記錄
if validating_accuracy > validating_accuracy_highest:
validating_accuracy_highest = validating_accuracy
validating_accuracy_highest_epoch = epoch
save_tensor(model.state_dict(), "model.pt")
print("highest validating accuracy updated")
elif epoch - validating_accuracy_highest_epoch > 20:
# 在 20 次訓練后仍然沒有刷新記錄,結束訓練
print("stop training because highest validating accuracy not updated in 20 epoches")
break
# 使用達到最高正確率時的模型狀態
print(f"highest validating accuracy: {validating_accuracy_highest}",
f"from epoch {validating_accuracy_highest_epoch}")
model.load_state_dict(load_tensor("model.pt"))
# 檢查測試集
testing_accuracy_list = []
for batch in read_batches("data/testing_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
print(f"testing accuracy: {testing_accuracy}")
# 顯示訓練集和驗證集的正確率變化
pyplot.plot(training_accuracy_history, label="training")
pyplot.plot(validating_accuracy_history, label="validing")
pyplot.ylim(0, 1)
pyplot.legend()
pyplot.show()
def eval_model():
"""使用訓練好的模型"""
# 創建模型實例,加載訓練好的狀態,然后切換到驗證模式
labels = load_image_labels()
num_labels = len(labels)
model = MyModel(num_labels)
model.load_state_dict(load_tensor("model.pt"))
model.eval()
# 詢問圖片路徑,並顯示可能的分類一覽
while True:
try:
# 構建輸入
image_path = input("Image path: ")
if not image_path:
continue
with Image.open(image_path) as img:
tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
# 預測輸出
tensor_out = model(tensor_in)
# 轉換到各個分類對應的概率
tensor_out = nn.functional.softmax(tensor_out, dim=1)
# 顯示按概率排序后的分類一覽
rates = (t.item() for t in tensor_out[0])
label_with_rates = list(zip(labels, rates))
label_with_rates.sort(key=lambda p:-p[1])
for label, rate in label_with_rates[:5]:
rate = rate * 100
print(f"{label}: {rate:0.2f}%")
print()
except Exception as e:
print("error:", e)
def main():
"""主函數"""
if len(sys.argv) < 2:
print(f"Please run: {sys.argv[0]} prepare|train|eval")
exit()
# 給隨機數生成器分配一個初始值,使得每次運行都可以生成相同的隨機數
# 這是為了讓過程可重現,你也可以選擇不這樣做
random.seed(0)
torch.random.manual_seed(0)
# 根據命令行參數選擇操作
operation = sys.argv[1]
if operation == "prepare":
prepare()
elif operation == "train":
train()
elif operation == "eval":
eval_model()
else:
raise ValueError(f"Unsupported operation: {operation}")
if __name__ == "__main__":
main()
准備訓練使用的數據和開始訓練需要分別執行以下命令:
python3 example.py prepare
python3 example.py train
最終輸出結果如下,可以看到訓練集正確率達到了 71%,驗證集和測試集正確率達到了 61%,這個正確率代表可以精准說出圖片所屬的分類,也稱 top 1 正確率;此外計算正確分類在概率排前三的分類之中的比率稱為 top 3 正確率,如果是電商上傳圖片以后給出三個可能的商品分類讓商家選擇,那么計算 top 3 正確率就有意義了。
training accuracy: 0.7162083333333331
validating accuracy: 0.6134999999999998
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.6183333333333333 from epoch 40
testing accuracy: 0.6168333333333332
訓練集與驗證集正確率變化如下圖所示:
實際使用模型的例子如下,輸出代表預測圖片有 79.23% 的概率是飛機,你也可以試試在互聯網上隨便找一張圖片讓這個模型識別:
$ python3 example.py eval
Image path: ./cifar/test/2257_airplane.png
airplane: 79.23%
deer: 6.06%
automobile: 4.04%
cat: 2.89%
frog: 2.11%
使用 CNN 實現圖片分類 (ResNet)
上述的模型 top 1 正確率只達到了 61%, 畢竟是 30 年前的老模型了🧔,這里我再介紹一個相對比較新的模型,ResNet 是在 2015 年中提出的模型,論文地址在這里,特征是會把輸入和輸出結合在一塊,例如原來計算 y = f(x)
會變為 y = f(x) + x
,從而抵消層數變多帶來的梯度消失問題 (參考我之前寫的訓練過程中常用的技巧)。
下圖是 ResNet-18 模型的結構,內部可以分為 4 組,每個組都包括 2 個基礎塊和 4 個卷積層,並且每個基礎塊會把輸入和輸出結合在一起,層數合計一共有 16,加上最開始轉換輸入的層和全連接層一共有 18 層,所以稱為 ResNet-18,除此之外還有 ResNet-34,ResNet-50 等等變種,如果有興趣可以參考本節末尾給出的 torchvision
的實現代碼。
從圖中可以看到,從第二組開始會把長寬變為一半,同時通道數增加一倍,然后維持通道數和長寬不變,所有組結束后使用一個 AvgPool2d
來讓長寬強制變為 1x1
,最后交給全連接層。計算卷積層輸出長寬的公式是 (長度 - 內核大小 + 填充量*2) / 處理間隔 + 1
,讓長寬變為一半會使用內核大小 3,填充量 1,處理間隔 2 ,例如長度為 32 可以計算得出 (32 - 3 + 2) / 2 + 1 == 16
;而維持長寬的則會使用內核大小 3,填充量 1,處理間隔 1,例如長度為 32 可以計算得出 (32 - 3 + 2) / 1 + 1 == 32
。
以下是使用 ResNet-18 進行訓練的代碼:
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot
# 分析目標的圖片大小,全部圖片都會先縮放到這個大小
IMAGE_SIZE = (32, 32)
# 分析目標的圖片所在的文件夾
IMAGE_DIR = "./cifar"
# 包含所有圖片標簽的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"
class BasicBlock(nn.Module):
"""ResNet 使用的基礎塊"""
expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這里的實現固定是一倍
def __init__(self, channels_in, channels_out, stride):
super().__init__()
# 生成 3x3 的卷積層
# 處理間隔 stride = 1 時,輸出的長寬會等於輸入的長寬,例如 (32-3+2)//1+1 == 32
# 處理間隔 stride = 2 時,輸出的長寬會等於輸入的長寬的一半,例如 (32-3+2)//2+1 == 16
# 此外 resnet 的 3x3 卷積層不使用偏移值 bias
self.conv1 = nn.Sequential(
nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 再定義一個讓輸出和輸入維度相同的 3x3 卷積層
self.conv2 = nn.Sequential(
nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 讓原始輸入和輸出相加的時候,需要維度一致,如果維度不一致則需要整合
self.identity = nn.Sequential()
if stride != 1 or channels_in != channels_out * self.expansion:
self.identity = nn.Sequential(
nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(channels_out * self.expansion))
def forward(self, x):
# x => conv1 => relu => conv2 => + => relu
# | ^
# |==============================|
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp)
tmp = self.conv2(tmp)
tmp += self.identity(x)
y = nn.functional.relu(tmp)
return y
class MyModel(nn.Module):
"""圖片分類 (ResNet-18)"""
def __init__(self, num_labels, block_type = BasicBlock):
super().__init__()
# 記錄上一層的出通道數量
self.previous_channels_out = 64
# 把 3 通道轉換到 64 通道,長寬不變
self.conv1 = nn.Sequential(
nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(self.previous_channels_out))
# ResNet 使用的各個層
self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
# 把最后一層的長寬轉換為 1x1 的池化層,Adaptive 表示會自動檢測原有長寬
# 例如 B,512,4,4 的矩陣會轉換為 B,512,1,1,每個通道的單個值會是原有 16 個值的平均
self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
# 全連接層,只使用單層線性模型
self.fc_model = nn.Linear(512 * block_type.expansion, num_labels)
def _make_layer(self, block_type, channels_out, num_blocks, stride):
blocks = []
# 添加第一個塊
blocks.append(block_type(self.previous_channels_out, channels_out, stride))
self.previous_channels_out = channels_out * block_type.expansion
# 添加剩余的塊,剩余的塊固定處理間隔為 1,不會改變長寬
for _ in range(num_blocks-1):
blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
self.previous_channels_out *= block_type.expansion
return nn.Sequential(*blocks)
def forward(self, x):
# 轉換出通道到 64
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp)
# 應用 ResNet 的各個層
tmp = self.layer1(tmp)
tmp = self.layer2(tmp)
tmp = self.layer3(tmp)
tmp = self.layer4(tmp)
# 轉換長寬到 1x1
tmp = self.avgPool(tmp)
# 扁平化,維度會變為 B,512
tmp = tmp.view(tmp.shape[0], -1)
# 應用全連接層
y = self.fc_model(tmp)
return y
def save_tensor(tensor, path):
"""保存 tensor 對象到文件"""
torch.save(tensor, gzip.GzipFile(path, "wb"))
def load_tensor(path):
"""從文件讀取 tensor 對象"""
return torch.load(gzip.GzipFile(path, "rb"))
def image_to_tensor(img):
"""轉換圖片對象到 tensor 對象"""
in_img = img.resize(IMAGE_SIZE)
arr = numpy.asarray(in_img)
t = torch.from_numpy(arr)
t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
t = t / 255.0 # 正規化數值使得范圍在 0 ~ 1
return t
def load_image_labels():
"""讀取圖片分類列表"""
return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))
def prepare_save_batch(batch, tensor_in, tensor_out):
"""准備訓練 - 保存單個批次的數據"""
# 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
random_indices = torch.randperm(tensor_in.shape[0])
training_indices = random_indices[:int(len(random_indices)*0.8)]
validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
testing_indices = random_indices[int(len(random_indices)*0.9):]
training_set = (tensor_in[training_indices], tensor_out[training_indices])
validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])
# 保存到硬盤
save_tensor(training_set, f"data/training_set.{batch}.pt")
save_tensor(validating_set, f"data/validating_set.{batch}.pt")
save_tensor(testing_set, f"data/testing_set.{batch}.pt")
print(f"batch {batch} saved")
def prepare():
"""准備訓練"""
# 數據集轉換到 tensor 以后會保存在 data 文件夾下
if not os.path.isdir("data"):
os.makedirs("data")
# 准備圖片分類到序號的索引
labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }
# 查找所有圖片
image_paths = []
for root, dirs, files in os.walk(IMAGE_DIR):
for filename in files:
path = os.path.join(root, filename)
if not path.endswith(".png"):
continue
# 分類名稱在文件名中,例如
# 2598_cat.png => cat
label = filename.split(".")[0].split("_")[1]
label_index = labels_to_index.get(label)
if label_index is None:
continue
image_paths.append((path, label_index))
# 打亂圖片順序
random.shuffle(image_paths)
# 分批讀取和保存圖片
batch_size = 1000
for batch in range(0, len(image_paths) // batch_size):
image_tensors = []
image_labels = []
for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
with Image.open(path) as img:
t = image_to_tensor(img)
image_tensors.append(t)
image_labels.append(label_index)
tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
tensor_out = torch.tensor(image_labels) # 維度: B
prepare_save_batch(batch, tensor_in, tensor_out)
def train():
"""開始訓練"""
# 創建模型實例
num_labels = len(load_image_labels())
model = MyModel(num_labels)
# 創建損失計算器
# 計算單分類輸出最好使用 CrossEntropyLoss, 多分類輸出最好使用 BCELoss
# 使用 CrossEntropyLoss 時實際輸出應該為標簽索引值,不需要轉換為 onehot
loss_function = torch.nn.CrossEntropyLoss()
# 創建參數調整器
optimizer = torch.optim.Adam(model.parameters())
# 記錄訓練集和驗證集的正確率變化
training_accuracy_history = []
validating_accuracy_history = []
# 記錄最高的驗證集正確率
validating_accuracy_highest = -1
validating_accuracy_highest_epoch = 0
# 讀取批次的工具函數
def read_batches(base_path):
for batch in itertools.count():
path = f"{base_path}.{batch}.pt"
if not os.path.isfile(path):
break
yield load_tensor(path)
# 計算正確率的工具函數
def calc_accuracy(actual, predicted):
# 把最大的值當作正確分類,然后比對有多少個分類相等
predicted_labels = predicted.argmax(dim=1)
acc = (actual == predicted_labels).sum().item() / actual.shape[0]
return acc
# 划分輸入和輸出的工具函數
def split_batch_xy(batch, begin=None, end=None):
# shape = batch_size, channels, width, height
batch_x = batch[0][begin:end]
# shape = batch_size
batch_y = batch[1][begin:end]
return batch_x, batch_y
# 開始訓練過程
for epoch in range(1, 10000):
print(f"epoch: {epoch}")
# 根據訓練集訓練並修改參數
# 切換模型到訓練模式,將會啟用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.train()
training_accuracy_list = []
for batch_index, batch in enumerate(read_batches("data/training_set")):
# 切分小批次,有助於泛化模型
training_batch_accuracy_list = []
for index in range(0, batch[0].shape[0], 100):
# 划分輸入和輸出
batch_x, batch_y = split_batch_xy(batch, index, index+100)
# 計算預測值
predicted = model(batch_x)
# 計算損失
loss = loss_function(predicted, batch_y)
# 從損失自動微分求導函數值
loss.backward()
# 使用參數調整器調整參數
optimizer.step()
# 清空導函數值
optimizer.zero_grad()
# 記錄這一個批次的正確率,torch.no_grad 代表臨時禁用自動微分功能
with torch.no_grad():
training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
# 輸出批次正確率
training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
training_accuracy_list.append(training_batch_accuracy)
print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
training_accuracy_history.append(training_accuracy)
print(f"training accuracy: {training_accuracy}")
# 檢查驗證集
# 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.eval()
validating_accuracy_list = []
for batch in read_batches("data/validating_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
validating_accuracy_history.append(validating_accuracy)
print(f"validating accuracy: {validating_accuracy}")
# 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練后仍然沒有刷新記錄
if validating_accuracy > validating_accuracy_highest:
validating_accuracy_highest = validating_accuracy
validating_accuracy_highest_epoch = epoch
save_tensor(model.state_dict(), "model.pt")
print("highest validating accuracy updated")
elif epoch - validating_accuracy_highest_epoch > 20:
# 在 20 次訓練后仍然沒有刷新記錄,結束訓練
print("stop training because highest validating accuracy not updated in 20 epoches")
break
# 使用達到最高正確率時的模型狀態
print(f"highest validating accuracy: {validating_accuracy_highest}",
f"from epoch {validating_accuracy_highest_epoch}")
model.load_state_dict(load_tensor("model.pt"))
# 檢查測試集
testing_accuracy_list = []
for batch in read_batches("data/testing_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
print(f"testing accuracy: {testing_accuracy}")
# 顯示訓練集和驗證集的正確率變化
pyplot.plot(training_accuracy_history, label="training")
pyplot.plot(validating_accuracy_history, label="validing")
pyplot.ylim(0, 1)
pyplot.legend()
pyplot.show()
def eval_model():
"""使用訓練好的模型"""
# 創建模型實例,加載訓練好的狀態,然后切換到驗證模式
labels = load_image_labels()
num_labels = len(labels)
model = MyModel(num_labels)
model.load_state_dict(load_tensor("model.pt"))
model.eval()
# 詢問圖片路徑,並顯示可能的分類一覽
while True:
try:
# 構建輸入
image_path = input("Image path: ")
if not image_path:
continue
with Image.open(image_path) as img:
tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
# 預測輸出
tensor_out = model(tensor_in)
# 轉換到各個分類對應的概率
tensor_out = nn.functional.softmax(tensor_out, dim=1)
# 顯示按概率排序后的分類一覽
rates = (t.item() for t in tensor_out[0])
label_with_rates = list(zip(labels, rates))
label_with_rates.sort(key=lambda p:-p[1])
for label, rate in label_with_rates[:5]:
rate = rate * 100
print(f"{label}: {rate:0.2f}%")
print()
except Exception as e:
print("error:", e)
def main():
"""主函數"""
if len(sys.argv) < 2:
print(f"Please run: {sys.argv[0]} prepare|train|eval")
exit()
# 給隨機數生成器分配一個初始值,使得每次運行都可以生成相同的隨機數
# 這是為了讓過程可重現,你也可以選擇不這樣做
random.seed(0)
torch.random.manual_seed(0)
# 根據命令行參數選擇操作
operation = sys.argv[1]
if operation == "prepare":
prepare()
elif operation == "train":
train()
elif operation == "eval":
eval_model()
else:
raise ValueError(f"Unsupported operation: {operation}")
if __name__ == "__main__":
main()
最終輸出結果如下,可以看到訓練集正確率達到了 99%,驗證集正確率達到了 85%,測試集正確率達到了 84%,比起上面的 LeNet 模型改進了很多吧🤗。
training accuracy: 0.9972708333333337
validating accuracy: 0.8373333333333337
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.8521666666666667 from epoch 38
testing accuracy: 0.8464999999999996
隨便在網上找的貓狗圖片:
輸出結果如下,不錯吧:
Image path: BlogArchive/ml-08/cat.jpg
cat: 100.00%
dog: 0.00%
frog: 0.00%
deer: 0.00%
horse: 0.00%
Image path: BlogArchive/ml-08/dog.jpg
dog: 100.00%
bird: 0.00%
deer: 0.00%
frog: 0.00%
horse: 0.00%
pytorch 有專門用於處理視覺信息的 torchvision,其中包含了 ResNet 的實現,也就是說其實我們不用自己去寫🤒,如果你有興趣可以參考里面的實現代碼,再試試 ResNet-50 等層數更多的模型是否可以帶來更好的效果。
AI 鑒黃
相信很多人都看過 AI 鑒黃的新聞🥴🤭🥺,如果你想自己實現一個,可以從 nsfw_data_scraper 下載圖片資源然后使用上面介紹的方法訓練,識別起來會比 cifar 簡單很多。因為實際只需要兩個標簽(1 黃色圖片,0 正常圖片),所以也可以使用單個值代表結果,然后用 sigmoid 代替 softmax。此外你也可以在 github 上搜索 nsfw 找到現成的模型。
使用 CNN 實現驗證碼識別 (ResNet-18)
最后再給出一個實用的例子。很多網站為了防機器人操作會使用驗證碼機制,傳統的驗證碼會顯示一張包含數字字母的圖片,然后讓用戶填寫里面的內容再對比是否正確,來判斷用戶是普通人還是機器人,這樣的驗證碼可以用本篇介紹的 CNN 模型識別出來😈。
首先我們來選一個生成驗證碼的類庫,github 上搜索 captcha c# 里面難度相對比較高的是 Hei.Captcha,這篇就使用 CNN 模型識別這個類庫生成的驗證碼。(我的 zkweb 里面也有生成驗證碼的模塊,但難度比較低所以就不用了)
以下步驟和代碼會生成十萬張用於訓練和測試使用的驗證碼圖片:
mkdir generate-captcha
cd generate-captcha
dotnet new console
dotnet add package Hei.Captcha
mkdir output
mkdir fonts
cd fonts
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/Candara.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/STCAIYUN.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/impact.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/monbaiti.ttf?raw=true
cd ..
# 添加程序代碼
dotnet run -c Release
using System;
using System.IO;
using Hei.Captcha;
namespace generate_captcha
{
class Program
{
static void Main(string[] args)
{
var helper = new SecurityCodeHelper();
var iterations = 100000;
for (var x = 0; x < iterations; ++x)
{
var code = helper.GetRandomEnDigitalText(4);
var bytes = helper.GetEnDigitalCodeByte(code);
File.WriteAllBytes($"output/{x:D5}-{code}.png", bytes);
if (x % 100 == 0)
Console.WriteLine($"{x}/{iterations}");
}
}
}
}
以下是生成的驗證碼圖片例子,變形旋轉干擾線動態背景色該有的都有😠:
接下來我們想想應該用什么數據結構來表達驗證碼。在圖片識別的例子中有十個分類,我們用了 onehot 編碼,即使用長度為 10 的 tensor 對象來表示結果,正確的分類為 1,不正確的分類為 0。換成驗證碼以后,可以用長度為 36 的 tensor 對象來表示 1 位驗證碼 (26 個英文數字 + 10 個字母,假設驗證碼不分大小寫),如果有多位則可以 36 * 位數的 tensor 對象來表達多位驗證碼。以下函數可以把驗證碼轉換為對應的 tensor 對象:
# 字母數字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 驗證碼位數
DIGITS = 4
# 標簽數量,字母數字混合*位數
NUM_LABELS = len(ALPHA_NUMS)*DIGITS
def code_to_tensor(code):
"""轉換驗證碼到 tensor 對象,使用 onehot 編碼"""
t = torch.zeros((NUM_LABELS,))
code = code.lower() # 驗證碼不分大小寫
for index, c in enumerate(code):
p = ALPHA_NUMS_MAP[c]
t[index*len(ALPHA_NUMS)+p] = 1
return t
轉換例子如下:
>>> code_to_tensor("abcd")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
>>> code_to_tensor("a123")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.])
反過來也一樣,我們可以把 tensor 的長度按 36 分為多組,然后求每一組最大的值所在的索引,再根據該索引找到對應的字母或者數字,就可以把 tensor 對象轉換回驗證碼:
def tensor_to_code(tensor):
"""轉換 tensor 對象到驗證碼"""
tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
indices = tensor.max(dim=1).indices
code = "".join(ALPHA_NUMS[index] for index in indices)
return code
接下來就可以用前面介紹過的 ResNet-18 模型進行訓練了😎,相比前面的圖片分類,這份代碼有以下幾點不同:
- 因為是多分類,損失計算器應該使用
BCELoss
代替CrossEntropyLoss
BCELoss
要求模型輸出值范圍在 0 ~ 1 之間,所以需要在模型內部添加控制函數 (CrossEntropyLoss
這么做會影響訓練效果,但BCELoss
不會)- 因為每一組都只有一個值是正確的,用
softmax
效果會比sigmoid
要好 (普通的多分類問題會使用sigmoid
)
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot
# 分析目標的圖片大小,全部圖片都會先縮放到這個大小
# 驗證碼原圖是 120x50
IMAGE_SIZE = (56, 24)
# 分析目標的圖片所在的文件夾
IMAGE_DIR = "./generate-captcha/output/"
# 字母數字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 驗證碼位數
DIGITS = 4
# 標簽數量,字母數字混合*位數
NUM_LABELS = len(ALPHA_NUMS)*DIGITS
class BasicBlock(nn.Module):
"""ResNet 使用的基礎塊"""
expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這里的實現固定是一倍
def __init__(self, channels_in, channels_out, stride):
super().__init__()
# 生成 3x3 的卷積層
# 處理間隔 stride = 1 時,輸出的長寬會等於輸入的長寬,例如 (32-3+2)//1+1 == 32
# 處理間隔 stride = 2 時,輸出的長寬會等於輸入的長寬的一半,例如 (32-3+2)//2+1 == 16
# 此外 resnet 的 3x3 卷積層不使用偏移值 bias
self.conv1 = nn.Sequential(
nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 再定義一個讓輸出和輸入維度相同的 3x3 卷積層
self.conv2 = nn.Sequential(
nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 讓原始輸入和輸出相加的時候,需要維度一致,如果維度不一致則需要整合
self.identity = nn.Sequential()
if stride != 1 or channels_in != channels_out * self.expansion:
self.identity = nn.Sequential(
nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(channels_out * self.expansion))
def forward(self, x):
# x => conv1 => relu => conv2 => + => relu
# | ^
# |==============================|
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp)
tmp = self.conv2(tmp)
tmp += self.identity(x)
y = nn.functional.relu(tmp)
return y
class MyModel(nn.Module):
"""識別驗證碼 (ResNet-18)"""
def __init__(self, block_type = BasicBlock):
super().__init__()
# 記錄上一層的出通道數量
self.previous_channels_out = 64
# 把 3 通道轉換到 64 通道,長寬不變
self.conv1 = nn.Sequential(
nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(self.previous_channels_out))
# ResNet 使用的各個層
self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
# 把最后一層的長寬轉換為 1x1 的池化層,Adaptive 表示會自動檢測原有長寬
# 例如 B,512,4,4 的矩陣會轉換為 B,512,1,1,每個通道的單個值會是原有 16 個值的平均
self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
# 全連接層,只使用單層線性模型
self.fc_model = nn.Linear(512 * block_type.expansion, NUM_LABELS)
# 控制輸出在 0 ~ 1 之間,BCELoss 需要
# 因為每組只應該有一個值為真,使用 softmax 效果會比 sigmoid 好
self.softmax = nn.Softmax(dim=2)
def _make_layer(self, block_type, channels_out, num_blocks, stride):
blocks = []
# 添加第一個塊
blocks.append(block_type(self.previous_channels_out, channels_out, stride))
self.previous_channels_out = channels_out * block_type.expansion
# 添加剩余的塊,剩余的塊固定處理間隔為 1,不會改變長寬
for _ in range(num_blocks-1):
blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
self.previous_channels_out *= block_type.expansion
return nn.Sequential(*blocks)
def forward(self, x):
# 轉換出通道到 64
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp)
# 應用 ResNet 的各個層
tmp = self.layer1(tmp)
tmp = self.layer2(tmp)
tmp = self.layer3(tmp)
tmp = self.layer4(tmp)
# 轉換長寬到 1x1
tmp = self.avgPool(tmp)
# 扁平化,維度會變為 B,512
tmp = tmp.view(tmp.shape[0], -1)
# 應用全連接層
tmp = self.fc_model(tmp)
# 划分每個字符對應的組,之后維度為 batch_size, digits, alpha_nums
tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
# 應用 softmax 到每一組
tmp = self.softmax(tmp)
# 重新扁平化,之后維度為 batch_size, num_labels
y = tmp.reshape(tmp.shape[0], NUM_LABELS)
return y
def save_tensor(tensor, path):
"""保存 tensor 對象到文件"""
torch.save(tensor, gzip.GzipFile(path, "wb"))
def load_tensor(path):
"""從文件讀取 tensor 對象"""
return torch.load(gzip.GzipFile(path, "rb"))
def image_to_tensor(img):
"""轉換圖片對象到 tensor 對象"""
in_img = img.resize(IMAGE_SIZE)
in_img = in_img.convert("RGB") # 轉換圖片模式到 RGB
arr = numpy.asarray(in_img)
t = torch.from_numpy(arr)
t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
t = t / 255.0 # 正規化數值使得范圍在 0 ~ 1
return t
def code_to_tensor(code):
"""轉換驗證碼到 tensor 對象,使用 onehot 編碼"""
t = torch.zeros((NUM_LABELS,))
code = code.lower() # 驗證碼不分大小寫
for index, c in enumerate(code):
p = ALPHA_NUMS_MAP[c]
t[index*len(ALPHA_NUMS)+p] = 1
return t
def tensor_to_code(tensor):
"""轉換 tensor 對象到驗證碼"""
tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
indices = tensor.max(dim=1).indices
code = "".join(ALPHA_NUMS[index] for index in indices)
return code
def prepare_save_batch(batch, tensor_in, tensor_out):
"""准備訓練 - 保存單個批次的數據"""
# 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
random_indices = torch.randperm(tensor_in.shape[0])
training_indices = random_indices[:int(len(random_indices)*0.8)]
validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
testing_indices = random_indices[int(len(random_indices)*0.9):]
training_set = (tensor_in[training_indices], tensor_out[training_indices])
validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])
# 保存到硬盤
save_tensor(training_set, f"data/training_set.{batch}.pt")
save_tensor(validating_set, f"data/validating_set.{batch}.pt")
save_tensor(testing_set, f"data/testing_set.{batch}.pt")
print(f"batch {batch} saved")
def prepare():
"""准備訓練"""
# 數據集轉換到 tensor 以后會保存在 data 文件夾下
if not os.path.isdir("data"):
os.makedirs("data")
# 查找所有圖片
image_paths = []
for root, dirs, files in os.walk(IMAGE_DIR):
for filename in files:
path = os.path.join(root, filename)
if not path.endswith(".png"):
continue
# 驗證碼在文件名中,例如
# 00000-R865.png => R865
code = filename.split(".")[0].split("-")[1]
image_paths.append((path, code))
# 打亂圖片順序
random.shuffle(image_paths)
# 分批讀取和保存圖片
batch_size = 1000
for batch in range(0, len(image_paths) // batch_size):
image_tensors = []
image_labels = []
for path, code in image_paths[batch*batch_size:(batch+1)*batch_size]:
with Image.open(path) as img:
image_tensors.append(image_to_tensor(img))
image_labels.append(code_to_tensor(code))
tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
tensor_out = torch.stack(image_labels) # 維度: B,N
prepare_save_batch(batch, tensor_in, tensor_out)
def train():
"""開始訓練"""
# 創建模型實例
model = MyModel()
# 創建損失計算器
# 計算多分類輸出最好使用 BCELoss
loss_function = torch.nn.BCELoss()
# 創建參數調整器
optimizer = torch.optim.Adam(model.parameters())
# 記錄訓練集和驗證集的正確率變化
training_accuracy_history = []
validating_accuracy_history = []
# 記錄最高的驗證集正確率
validating_accuracy_highest = -1
validating_accuracy_highest_epoch = 0
# 讀取批次的工具函數
def read_batches(base_path):
for batch in itertools.count():
path = f"{base_path}.{batch}.pt"
if not os.path.isfile(path):
break
yield load_tensor(path)
# 計算正確率的工具函數
def calc_accuracy(actual, predicted):
# 把每一位的最大值當作正確字符,然后比對有多少個字符相等
actual_indices = actual.reshape(actual.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
predicted_indices = predicted.reshape(predicted.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
matched = (actual_indices - predicted_indices).abs().sum(dim=1) == 0
acc = matched.sum().item() / actual.shape[0]
return acc
# 划分輸入和輸出的工具函數
def split_batch_xy(batch, begin=None, end=None):
# shape = batch_size, channels, width, height
batch_x = batch[0][begin:end]
# shape = batch_size, num_labels
batch_y = batch[1][begin:end]
return batch_x, batch_y
# 開始訓練過程
for epoch in range(1, 10000):
print(f"epoch: {epoch}")
# 根據訓練集訓練並修改參數
# 切換模型到訓練模式,將會啟用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.train()
training_accuracy_list = []
for batch_index, batch in enumerate(read_batches("data/training_set")):
# 切分小批次,有助於泛化模型
training_batch_accuracy_list = []
for index in range(0, batch[0].shape[0], 100):
# 划分輸入和輸出
batch_x, batch_y = split_batch_xy(batch, index, index+100)
# 計算預測值
predicted = model(batch_x)
# 計算損失
loss = loss_function(predicted, batch_y)
# 從損失自動微分求導函數值
loss.backward()
# 使用參數調整器調整參數
optimizer.step()
# 清空導函數值
optimizer.zero_grad()
# 記錄這一個批次的正確率,torch.no_grad 代表臨時禁用自動微分功能
with torch.no_grad():
training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
# 輸出批次正確率
training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
training_accuracy_list.append(training_batch_accuracy)
print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
training_accuracy_history.append(training_accuracy)
print(f"training accuracy: {training_accuracy}")
# 檢查驗證集
# 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.eval()
validating_accuracy_list = []
for batch in read_batches("data/validating_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
validating_accuracy_history.append(validating_accuracy)
print(f"validating accuracy: {validating_accuracy}")
# 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練后仍然沒有刷新記錄
if validating_accuracy > validating_accuracy_highest:
validating_accuracy_highest = validating_accuracy
validating_accuracy_highest_epoch = epoch
save_tensor(model.state_dict(), "model.pt")
print("highest validating accuracy updated")
elif epoch - validating_accuracy_highest_epoch > 20:
# 在 20 次訓練后仍然沒有刷新記錄,結束訓練
print("stop training because highest validating accuracy not updated in 20 epoches")
break
# 使用達到最高正確率時的模型狀態
print(f"highest validating accuracy: {validating_accuracy_highest}",
f"from epoch {validating_accuracy_highest_epoch}")
model.load_state_dict(load_tensor("model.pt"))
# 檢查測試集
testing_accuracy_list = []
for batch in read_batches("data/testing_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
print(f"testing accuracy: {testing_accuracy}")
# 顯示訓練集和驗證集的正確率變化
pyplot.plot(training_accuracy_history, label="training")
pyplot.plot(validating_accuracy_history, label="validing")
pyplot.ylim(0, 1)
pyplot.legend()
pyplot.show()
def eval_model():
"""使用訓練好的模型"""
# 創建模型實例,加載訓練好的狀態,然后切換到驗證模式
model = MyModel()
model.load_state_dict(load_tensor("model.pt"))
model.eval()
# 詢問圖片路徑,並顯示可能的分類一覽
while True:
try:
# 構建輸入
image_path = input("Image path: ")
if not image_path:
continue
with Image.open(image_path) as img:
tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
# 預測輸出
tensor_out = model(tensor_in)
# 轉換到驗證碼
code = tensor_to_code(tensor_out[0])
print(f"code: {code}")
print()
except Exception as e:
print("error:", e)
def main():
"""主函數"""
if len(sys.argv) < 2:
print(f"Please run: {sys.argv[0]} prepare|train|eval")
exit()
# 給隨機數生成器分配一個初始值,使得每次運行都可以生成相同的隨機數
# 這是為了讓過程可重現,你也可以選擇不這樣做
random.seed(0)
torch.random.manual_seed(0)
# 根據命令行參數選擇操作
operation = sys.argv[1]
if operation == "prepare":
prepare()
elif operation == "train":
train()
elif operation == "eval":
eval_model()
else:
raise ValueError(f"Unsupported operation: {operation}")
if __name__ == "__main__":
main()
因為訓練需要大量時間而我機器只有 CPU 可以用,所以這次我就只訓練到 epoch 23 🤢,訓練結果如下。可以看到訓練集正確率達到了 98%,驗證集正確率達到了 91%,已經是實用的級別了。
epoch: 23, batch: 98: batch accuracy: 0.99125
epoch: 23, batch: 99: batch accuracy: 0.9862500000000001
training accuracy: 0.9849874999999997
validating accuracy: 0.9103000000000003
highest validating accuracy updated
使用訓練好的模型識別驗證碼,你可以對比上面的圖片看看是不是識別對了 (第二張的 P 看起來很像 D 🤒):
$ python3 example.py eval
Image path: BlogArchive/ml-08/captcha-1.png
code: 8ca6
Image path: BlogArchive/ml-08/captcha-2.png
code: tp8s
Image path: BlogArchive/ml-08/captcha-3.png
code: k225
注意這里介紹出來的模型只能識別這一種驗證碼,其他不同種類的驗證碼需要分別訓練和生成模型,做打碼平台的話會先識別驗證碼種類再使用該種類對應的模型識別驗證碼內容。如果你的目標只是單種驗證碼,那么用這篇文章介紹的方法應該可以幫你節省調打碼平台的錢 🤠。如果你機器有好顯卡,也可以試試用更高級的模型提升正確率。
此外,有很多人問我現在流行的滑動驗證碼如何破解,其實破解這種驗證碼只需要做簡單的圖片分析,例如這里和這里都沒有使用機器學習。但滑動驗證碼一般會配合瀏覽器指紋和鼠標軌跡采集一起使用,后台會根據大量數據分析用戶是普通人還是機器人,所以破解幾次很簡單,但一直破解下去則會有很大幾率被檢測出來。
使用 torchvision 里面的 resnet 模型
在前文我們看到了怎么組合卷積層和池化層自己實現 LeNet 和 ResNet-18,我們還可以使用 torchvision 中現成的模型,以下是修改識別驗證碼的模型到 torchvision 提供的 ResNet 實現的代碼:
# 文件開頭引用 torchvision 庫
import torchvision
# 替換原有代碼中的 MyModel 類,BasicBlock 可以刪掉
class MyModel(nn.Module):
"""識別驗證碼 (ResNet-18)"""
def __init__(self):
super().__init__()
# Resnet 的實現
self.resnet = torchvision.models.resnet18(num_classes=NUM_LABELS)
# 控制輸出在 0 ~ 1 之間,BCELoss 需要
# 因為每組只應該有一個值為真,使用 softmax 效果會比 sigmoid 好
self.softmax = nn.Softmax(dim=2)
def forward(self, x):
# 應用 ResNet
tmp = self.resnet(x)
# 划分每個字符對應的組,之后維度為 batch_size, digits, alpha_nums
tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
# 應用 softmax 到每一組
tmp = self.softmax(tmp)
# 重新扁平化,之后維度為 batch_size, num_labels
y = tmp.reshape(tmp.shape[0], NUM_LABELS)
return y
是不是簡單了很多?如果我們想使用 ResNet-50 可以把 resnet18
改為 resnet50
即可切換。雖然使用現成的模型方便,但了解下它們的原理和計算方式總是有好處的😇。
寫在最后
這個系列中預定要寫的內容已經全部寫出來了,接下來要寫什么還不確定,有時間可能會重新維護那些放了半年以上的項目,也可能會想辦法搞好飯店的生意,最近生意實在不好啊🤒。