在之前的文章中我訓練模型都是使用的 CPU,因為家中黃臉婆不允許我浪費錢買電腦😭。終於的,附近一個廢品回收站的朋友轉讓給我一台破爛舊電腦,所以我現在可以體驗使用 GPU 訓練模型了🥳。
顯卡要求
pytorch, tensorflow 等主流的框架的 GPU 支持都基於 CUDA 框架,而目前提供 CUDA 支持的顯卡只有 nvidia,這次我撿到的破爛是 GTX 1650 4GB 所以滿足最低要求了。簡單描述下目前各種顯卡的支持程度:
Intel 核顯:死心叭
APU:沒法用
Nvidia Geforce
2GB 可以用來跑一些入門例子
4GB 可以跑一些簡單模型
6GB 可以跑一些中級模型
8GB 可以跑一些高級模型
10GB以上 可以跑最前沿的模型
Radeon:要折騰,試試 ROCm
如果真的要玩機器學習推薦購買 RTX 系列,因為有 tensor 核心和 16 位浮點數支持,訓練速度會快很多,並且使用 16 位浮點數可以讓顯存占用少一半。雖然在過幾個星期就可以看到 3000 系列的顯卡了,可惜沒錢買🤒。此外,明年如果出支持機器學習的民用國產顯卡必定會大力支持😡。
安裝顯卡驅動
Windows 的話會通過 Windows Update 自動安裝, pytorch 會自動檢測出顯卡,不需要做任何工作。Linux 需要安裝 Nvidia 官方的閉源驅動 (開源的 Nouveau 驅動不支持 CUDA),如果是 Ubuntu 那么在安裝系統的時候打個勾就可以自動安裝,如果沒打可以參考這篇文章,其他 Linux 系統如果源沒有提供可以去 Nvidia 官方下載驅動。
安裝以后可以執行以下代碼看看 pytorch 是否可以檢測出顯卡:
import torch
判讀是否有 GPU 支持
torch.cuda.is_available()
True
判斷插了幾張可用的顯卡
torch.cuda.device_count()
1
獲取第一張顯卡的名稱
torch.cuda.get_device_name(0)
'GeForce GTX 1650'
如果輸出類似以上的結果,那么就代表沒有問題了。
在 pytorch 中使用 GPU
pytorch 默認會把 tensor 對象的數據保存在內存上,計算會由 CPU 執行,如果我們想使用 GPU,可以調用 tensor 對象的 cuda 方法把對象的數據復制到顯存上,復制以后的 tensor 對象運算會使用 GPU。注意在內存上的 tensor 對象和在顯存上的 tensor 對象之間無法進行運算。
創建一個 tensor,默認會保存在內存上,由 CPU 進行計算
a = torch.tensor([1,2,3])
a
tensor([1, 2, 3])
把 tensor 復制到顯存上,針對此 tensor 的計算將會使用 GPU
b = a.cuda()
b
tensor([1, 2, 3], device='cuda:0')
如果你想編寫同時兼容 GPU 和 CPU 的代碼可以使用以下寫法,如果有支持的 GPU 則會使用 GPU,如果沒有則會使用 CPU:
創建一個 device 對象,如果顯卡可用則指向顯卡,否則指向 CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
創建一個 tensor 並復制到指定 device
a = torch.tensor([1,2,3])
b = a.to(device)
a
tensor([1, 2, 3])b
tensor([1, 2, 3], device='cuda:0')
如果你插了多張顯卡,以上的寫法只會使用第一張,你可以通過 "cuda:序號" 來指定不同的顯卡來實現分布式計算。
device1 = torch.device("cuda:0")
device1
device(type='cuda', index=0)
device2 = torch.device("cuda:1")
device2
device(type='cuda', index=1)
使用 GPU 訓練識別驗證碼的模型
這里我拿前一篇文章的代碼來展示怎樣實際使用 GPU 訓練識別驗證碼的模型,以下是修改后完整的代碼:
如何生成訓練數據和如何使用這份代碼的說明請參考前一篇文章。
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot
分析目標的圖片大小,全部圖片都會先縮放到這個大小
驗證碼原圖是 120x50
IMAGE_SIZE = (56, 24)
分析目標的圖片所在的文件夾
IMAGE_DIR = "./generate-captcha/output/"
字母數字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
驗證碼位數
DIGITS = 4
標簽數量,字母數字混合*位數
NUM_LABELS = len(ALPHA_NUMS)*DIGITS
用於啟用 GPU 支持
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class BasicBlock(nn.Module):
"""ResNet 使用的基礎塊"""
expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這里的實現固定是一倍
def init(self, channels_in, channels_out, stride):
super().init()
# 生成 3x3 的卷積層
# 處理間隔 stride = 1 時,輸出的長寬會等於輸入的長寬,例如 (32-3+2)//1+1 == 32
# 處理間隔 stride = 2 時,輸出的長寬會等於輸入的長寬的一半,例如 (32-3+2)//2+1 == 16
# 此外 resnet 的 3x3 卷積層不使用偏移值 bias
self.conv1 = nn.Sequential(
nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 再定義一個讓輸出和輸入維度相同的 3x3 卷積層
self.conv2 = nn.Sequential(
nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 讓原始輸入和輸出相加的時候,需要維度一致,如果維度不一致則需要整合
self.identity = nn.Sequential()
if stride != 1 or channels_in != channels_out * self.expansion:
self.identity = nn.Sequential(
nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(channels_out * self.expansion))
def forward(self, x):
# x => conv1 => relu => conv2 => + => relu
# | ^
# |==============================|
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp)
tmp = self.conv2(tmp)
tmp += self.identity(x)
y = nn.functional.relu(tmp)
return y
class MyModel(nn.Module):
"""識別驗證碼 (ResNet-18)"""
def init(self, block_type = BasicBlock):
super().init()
# 記錄上一層的出通道數量
self.previous_channels_out = 64
# 把 3 通道轉換到 64 通道,長寬不變
self.conv1 = nn.Sequential(
nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(self.previous_channels_out))
# ResNet 使用的各個層
self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
# 把最后一層的長寬轉換為 1x1 的池化層,Adaptive 表示會自動檢測原有長寬
# 例如 B,512,4,4 的矩陣會轉換為 B,512,1,1,每個通道的單個值會是原有 16 個值的平均
self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
# 全連接層,只使用單層線性模型
self.fc_model = nn.Linear(512 * block_type.expansion, NUM_LABELS)
# 控制輸出在 0 ~ 1 之間,BCELoss 需要
# 因為每組只應該有一個值為真,使用 softmax 效果會比 sigmoid 好
self.softmax = nn.Softmax(dim=2)
def _make_layer(self, block_type, channels_out, num_blocks, stride):
blocks = []
# 添加第一個塊
blocks.append(block_type(self.previous_channels_out, channels_out, stride))
self.previous_channels_out = channels_out * block_type.expansion
# 添加剩余的塊,剩余的塊固定處理間隔為 1,不會改變長寬
for _ in range(num_blocks-1):
blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
self.previous_channels_out *= block_type.expansion
return nn.Sequential(*blocks)
def forward(self, x):
# 轉換出通道到 64
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp)
# 應用 ResNet 的各個層
tmp = self.layer1(tmp)
tmp = self.layer2(tmp)
tmp = self.layer3(tmp)
tmp = self.layer4(tmp)
# 轉換長寬到 1x1
tmp = self.avgPool(tmp)
# 扁平化,維度會變為 B,512
tmp = tmp.view(tmp.shape[0], -1)
# 應用全連接層
tmp = self.fc_model(tmp)
# 划分每個字符對應的組,之后維度為 batch_size, digits, alpha_nums
tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
# 應用 softmax 到每一組
tmp = self.softmax(tmp)
# 重新扁平化,之后維度為 batch_size, num_labels
y = tmp.reshape(tmp.shape[0], NUM_LABELS)
return y
def save_tensor(tensor, path):
"""保存 tensor 對象到文件"""
torch.save(tensor, gzip.GzipFile(path, "wb"))
def load_tensor(path):
"""從文件讀取 tensor 對象"""
return torch.load(gzip.GzipFile(path, "rb"))
def image_to_tensor(img):
"""轉換圖片對象到 tensor 對象"""
in_img = img.resize(IMAGE_SIZE)
in_img = in_img.convert("RGB") # 轉換圖片模式到 RGB
arr = numpy.asarray(in_img)
t = torch.from_numpy(arr)
t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
t = t / 255.0 # 正規化數值使得范圍在 0 ~ 1
return t
def code_to_tensor(code):
"""轉換驗證碼到 tensor 對象,使用 onehot 編碼"""
t = torch.zeros((NUM_LABELS,))
code = code.lower() # 驗證碼不分大小寫
for index, c in enumerate(code):
p = ALPHA_NUMS_MAP[c]
t[index*len(ALPHA_NUMS)+p] = 1
return t
def tensor_to_code(tensor):
"""轉換 tensor 對象到驗證碼"""
tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
indices = tensor.max(dim=1).indices
code = "".join(ALPHA_NUMS[index] for index in indices)
return code
def prepare_save_batch(batch, tensor_in, tensor_out):
"""准備訓練 - 保存單個批次的數據"""
# 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
random_indices = torch.randperm(tensor_in.shape[0])
training_indices = random_indices[:int(len(random_indices)0.8)]
validating_indices = random_indices[int(len(random_indices)0.8):int(len(random_indices)0.9):]
testing_indices = random_indices[int(len(random_indices)0.9):]
training_set = (tensor_in[training_indices], tensor_out[training_indices])
validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])
# 保存到硬盤
save_tensor(training_set, f"data/training_set.{batch}.pt")
save_tensor(validating_set, f"data/validating_set.{batch}.pt")
save_tensor(testing_set, f"data/testing_set.{batch}.pt")
print(f"batch {batch} saved")
def prepare():
"""准備訓練"""
# 數據集轉換到 tensor 以后會保存在 data 文件夾下
if not os.path.isdir("data"):
os.makedirs("data")
# 查找所有圖片
image_paths = []
for root, dirs, files in os.walk(IMAGE_DIR):
for filename in files:
path = os.path.join(root, filename)
if not path.endswith(".png"):
continue
# 驗證碼在文件名中,例如
# 00000-R865.png => R865
code = filename.split(".")[0].split("-")[1]
image_paths.append((path, code))
# 打亂圖片順序
random.shuffle(image_paths)
# 分批讀取和保存圖片
batch_size = 1000
for batch in range(0, len(image_paths) // batch_size):
image_tensors = []
image_labels = []
for path, code in image_paths[batch*batch_size:(batch+1)*batch_size]:
with Image.open(path) as img:
image_tensors.append(image_to_tensor(img))
image_labels.append(code_to_tensor(code))
tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
tensor_out = torch.stack(image_labels) # 維度: B,N
prepare_save_batch(batch, tensor_in, tensor_out)
def train():
"""開始訓練"""
# 創建模型實例
model = MyModel().to(device)
# 創建損失計算器
# 計算多分類輸出最好使用 BCELoss
loss_function = torch.nn.BCELoss()
# 創建參數調整器
optimizer = torch.optim.Adam(model.parameters())
# 記錄訓練集和驗證集的正確率變化
training_accuracy_history = []
validating_accuracy_history = []
# 記錄最高的驗證集正確率
validating_accuracy_highest = -1
validating_accuracy_highest_epoch = 0
# 讀取批次的工具函數
def read_batches(base_path):
for batch in itertools.count():
path = f"{base_path}.{batch}.pt"
if not os.path.isfile(path):
break
yield [ t.to(device) for t in load_tensor(path) ]
# 計算正確率的工具函數
def calc_accuracy(actual, predicted):
# 把每一位的最大值當作正確字符,然后比對有多少個字符相等
actual_indices = actual.reshape(actual.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
predicted_indices = predicted.reshape(predicted.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
matched = (actual_indices - predicted_indices).abs().sum(dim=1) == 0
acc = matched.sum().item() / actual.shape[0]
return acc
# 划分輸入和輸出的工具函數
def split_batch_xy(batch, begin=None, end=None):
# shape = batch_size, channels, width, height
batch_x = batch[0][begin:end]
# shape = batch_size, num_labels
batch_y = batch[1][begin:end]
return batch_x, batch_y
# 開始訓練過程
for epoch in range(1, 10000):
print(f"epoch: {epoch}")
# 根據訓練集訓練並修改參數
# 切換模型到訓練模式,將會啟用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.train()
training_accuracy_list = []
for batch_index, batch in enumerate(read_batches("data/training_set")):
# 切分小批次,有助於泛化模型
training_batch_accuracy_list = []
for index in range(0, batch[0].shape[0], 100):
# 划分輸入和輸出
batch_x, batch_y = split_batch_xy(batch, index, index+100)
# 計算預測值
predicted = model(batch_x)
# 計算損失
loss = loss_function(predicted, batch_y)
# 從損失自動微分求導函數值
loss.backward()
# 使用參數調整器調整參數
optimizer.step()
# 清空導函數值
optimizer.zero_grad()
# 記錄這一個批次的正確率,torch.no_grad 代表臨時禁用自動微分功能
with torch.no_grad():
training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
# 輸出批次正確率
training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
training_accuracy_list.append(training_batch_accuracy)
print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
training_accuracy_history.append(training_accuracy)
print(f"training accuracy: {training_accuracy}")
# 檢查驗證集
# 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
model.eval()
validating_accuracy_list = []
for batch in read_batches("data/validating_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
validating_accuracy_history.append(validating_accuracy)
print(f"validating accuracy: {validating_accuracy}")
# 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練后仍然沒有刷新記錄
if validating_accuracy > validating_accuracy_highest:
validating_accuracy_highest = validating_accuracy
validating_accuracy_highest_epoch = epoch
save_tensor(model.state_dict(), "model.pt")
print("highest validating accuracy updated")
elif epoch - validating_accuracy_highest_epoch > 20:
# 在 20 次訓練后仍然沒有刷新記錄,結束訓練
print("stop training because highest validating accuracy not updated in 20 epoches")
break
# 使用達到最高正確率時的模型狀態
print(f"highest validating accuracy: {validating_accuracy_highest}",
f"from epoch {validating_accuracy_highest_epoch}")
model.load_state_dict(load_tensor("model.pt"))
# 檢查測試集
testing_accuracy_list = []
for batch in read_batches("data/testing_set"):
batch_x, batch_y = split_batch_xy(batch)
predicted = model(batch_x)
testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
print(f"testing accuracy: {testing_accuracy}")
# 顯示訓練集和驗證集的正確率變化
pyplot.plot(training_accuracy_history, label="training")
pyplot.plot(validating_accuracy_history, label="validing")
pyplot.ylim(0, 1)
pyplot.legend()
pyplot.show()
def eval_model():
"""使用訓練好的模型"""
# 創建模型實例,加載訓練好的狀態,然后切換到驗證模式
model = MyModel().to(device)
model.load_state_dict(load_tensor("model.pt"))
model.eval()
# 詢問圖片路徑,並顯示可能的分類一覽
while True:
try:
# 構建輸入
image_path = input("Image path: ")
if not image_path:
continue
with Image.open(image_path) as img:
tensor_in = image_to_tensor(img).to(device).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
# 預測輸出
tensor_out = model(tensor_in)
# 轉換到驗證碼
code = tensor_to_code(tensor_out[0])
print(f"code: {code}")
print()
except Exception as e:
print("error:", e)
def main():
"""主函數"""
if len(sys.argv) < 2:
print(f"Please run: {sys.argv[0]} prepare|train|eval")
exit()
# 給隨機數生成器分配一個初始值,使得每次運行都可以生成相同的隨機數
# 這是為了讓過程可重現,你也可以選擇不這樣做
random.seed(0)
torch.random.manual_seed(0)
# 根據命令行參數選擇操作
operation = sys.argv[1]
if operation == "prepare":
prepare()
elif operation == "train":
train()
elif operation == "eval":
eval_model()
else:
raise ValueError(f"Unsupported operation: {operation}")
if name == "main":
main()
使用 diff 生成相差的部分如下:
$ diff -U3 example.py.old example.py
@@ -23,6 +23,9 @@
標簽數量,字母數字混合*位數
NUM_LABELS = len(ALPHA_NUMS)*DIGITS
+# 用於啟用 GPU 支持
+device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
+
class BasicBlock(nn.Module):
"""ResNet 使用的基礎塊"""
expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這里的實現固定是一倍
@@ -203,7 +206,7 @@
def train():
"""開始訓練"""
# 創建模型實例
- model = MyModel()
-
model = MyModel().to(device)
創建損失計算器
計算多分類輸出最好使用 BCELoss
@@ -226,7 +229,7 @@
path = f"{base_path}.{batch}.pt"
if not os.path.isfile(path):
break
-
yield load_tensor(path)
-
yield [ t.to(device) for t in load_tensor(path) ]
計算正確率的工具函數
def calc_accuracy(actual, predicted):
@@ -327,7 +330,7 @@
def eval_model():
"""使用訓練好的模型"""創建模型實例,加載訓練好的狀態,然后切換到驗證模式
- model = MyModel()
- model = MyModel().to(device)
model.load_state_dict(load_tensor("model.pt"))
model.eval()
@@ -339,7 +342,7 @@
if not image_path:
continue
with Image.open(image_path) as img:
-
tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
-
tensor_in = image_to_tensor(img).to(device).unsqueeze(0) # 維度 C,W,H => 1,C,W,H # 預測輸出 tensor_out = model(tensor_in) # 轉換到驗證碼
可以看到只改動了五個部分,在頭部添加了 device 的定義,然后在加載模型和 tensor 對象的時候使用 .to(device) 即可。
簡單吧☺️。
那么訓練速度相差如何呢?只訓練一個 batch 使用 CPU 和 GPU 消耗的時間分別如下 (單位秒):
CPU: 13.60
GPU: 1.90
差了整整 7 倍😱,,如果是高端的顯卡估計可以看到數十倍的差距。
顯存占用
如果你想查看訓練過程中的顯存占用情況,可以使用 nvidia-smi 命令,命令會輸出以下的信息:
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.57 Driver Version: 450.57 CUDA Version: 11.0 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|=++==============|
| 0 GeForce GTX 1650 Off | 00000000:06:00.0 On | N/A |
| 60% 67C P3 40W / 90W | 3414MiB / 3902MiB | 100% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
| 0 N/A N/A 1237 G /usr/lib/xorg/Xorg 238MiB |
| 0 N/A N/A 2545 G cinnamon 68MiB |
| 0 N/A N/A 2797 G ...AAAAAAAAA= --shared-files 103MiB |
| 0 N/A N/A 18534 G ...AAAAAAAAA= --shared-files 82MiB |
| 0 N/A N/A 20035 C python3 2915MiB |
+-----------------------------------------------------------------------------+
如果訓練過程中出現顯存不足,你會看到以下的異常信息:
RuntimeError: CUDA error: out of memory
如果你遇到顯存不足的問題,那么可以嘗試以下的辦法解決,按實用程度排序:
出錢買新顯卡🤒
減少訓練批次大小 (例如每個批次 100 條數據,減為每個批次 50 條數據)
不使用的對象早回收,例如 predicted = None,pytorch 會在對象聲明周期結束后自動釋放顯存
計算單值的時候使用 item(),例如 acc_total += acc.item(),但配合 backward 生成運算路徑的計算不能用
如果你使用桌面 Linux,試試開機的時候添加 rw init=/bin/bash 進入命令行界面再訓練,這樣可以節省個幾百 MB 顯存
你可能會好奇為什了 pytorch 可以及時釋放顯存,這是因為 python 的對象使用了引用計數 (Reference Counted),GC 基本上只負責回收循環引用的對象,對象的引用計數歸 0 的時候 python 會自動調用析構函數,不需要等待 GC。而 NET 和 Java 等語言則無法做到及時回收,除非你每個 tensor 對象都及時的去調用 Dispose 方法,或者使用 tensorflow 來編譯靜態運算路徑然后把生命周期管理工作全部交給框架。這也是使用 Python 的一大好處🥳