import os from torchvision import models from torchvision import transforms, datasets from torchvision.utils import make_grid import torch from torch.utils.data import DataLoader from torch.autograd import Variable import time import matplotlib.pyplot as plt use_gpu = torch.cuda.is_available() print(use_gpu) ###################### 模型訓練前的數據整理和准備 ###################### path = '../data/dogs_vs_cat' def spilt_train_val(path, number): # 將train數據集分割一部分出來作為val數據集 以Kaggle中Dogs vs. Cats的數據集為例子 train_path = os.path.join(path, 'train') val_path = os.path.join(path, 'val') if not os.path.exists(val_path): return os.mkdir(val_path) import random import shutil file_list = os.listdir(train_path) random.shuffle(file_list)# 將列表隨機打亂 print('Before', len(file_list)) file_list = file_list[-number:] for img in file_list: img_train_path = os.path.join(train_path, img) img_val_path = os.path.join(val_path, img) shutil.copy(img_train_path, img_val_path) # 刪除src文件 os.remove(img_train_path) print('After', len(os.listdir(train_path))) def creat_dogcat_mov(path=path, str_tvt='train'): train_path = os.path.join(path, str_tvt) file_list = os.listdir(train_path) dog = 'dog' cat = 'cat' dog_path = os.path.join(train_path, dog) cat_path = os.path.join(train_path, cat) if not os.path.exists(dog_path): os.mkdir(dog_path) if not os.path.exists(cat_path): os.mkdir(cat_path) import shutil for img in file_list: img_name = img.split('.')[0] if img_name == 'dog': img_train_path = os.path.join(train_path, img) shutil.copy(img_train_path, dog_path) if img_name == 'cat': img_train_path = os.path.join(train_path, img) shutil.copy(img_train_path, cat_path) os.remove(img_train_path) # 只需調用1次 # spilt_train_val(path=path, number=5000) path = '../data/dog_cat' # 只需調用1次 # creat_dogcat_mov(path=path, str_tvt='train') # creat_dogcat_mov(path=path, str_tvt='val') ###### @@@@@@數據目錄組織結構,便於ImageFolder讀取@@@@ ###### # ../data/dog_cat/train # ../data/dog_cat/train/cat # ../data/dog_cat/train/dog # ../data/dog_cat/val # ../data/dog_cat/val/cat # ../data/dog_cat/val/dog # ../data/dog_cat/test/test # 輸入的是一個224*224*3的圖片(224*224位分辨率,3為RGB 3個通道) # 定義轉換操作運算 # 將圖像RGB三個通道的像素值分別減去0.5再除以0.5,從而將所有像素值 # 固定到[-1.0, 1.0]范圍內 transform = transforms.Compose([transforms.CenterCrop(224),# 對原始圖片進行裁剪 transforms.ToTensor(),# 轉化成張量數據結構 transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])# 用均值和標准差對張量圖像進行歸一化 # 讀取數據並轉化成字典類型{'train': data1, 'val': data2} data_img = {x: datasets.ImageFolder(root=os.path.join(path, x), transform=transform) for x in ['train', 'val']} # 查看訓練數據的類別信息 classes = data_img["train"].classes classes_index = data_img["train"].class_to_idx print(classes) print(classes_index) # DataLoader 加載數據 data_loader_imge = {x: DataLoader(data_img[x], batch_size=4, shuffle=True) for x in ['train', 'val']} # 預覽訓練數據集 x_train, y_train = next(iter(data_loader_imge['train'])) mean = [0.5, 0.5, 0.5] std = [0.5, 0.5, 0.5] img = make_grid(x_train) img = img.numpy().transpose((1, 2, 0)) img = img*std + mean print([classes[i] for i in y_train]) plt.imshow(img) # plt.show() ###################### 以上 模型訓練前的數據整理和准備 ###################### # 遷移學習 ##################################### 遷移學習 ##################################### # 以分類任務的VGG16模型為例 ###################### 遷移學習 第一步:下載預訓練模型並查看其網絡結構 ###################### # download the pretrained model model = models.vgg16(pretrained=True) print(model) ###################### 遷移學習 第二步:針對網絡結構進行修改等操作 ###################### # 比如,對VGG16模型中最后的全連接層進行改寫,使得最后輸出的結果只有兩個(只需要對貓狗進行分辨就可以了) # 將最后的全連接層 # (classifier): Sequential( # (0): Linear(in_features=25s=True) # (1): ReLU(inplace=True) # (2): Dropout(p=0.5, inplace=False) # (3): Linear(in_features=4096, out_features=4096, bias=True) # (4): ReLU(inplace=True) # (5): Dropout(p=0.5, inplace=False) # (6): Linear(in_features=4096, out_features=1000, bias=True) ) # 改寫為: # (classifier): Sequential( # (0): Linear(in_features=25088, out_features=4096, bias=True) # (1): ReLU() # (2): Dropout(p=0.5, inplace=False) # (3): Linear(in_features=4096, out_features=4096, bias=True) # (4): ReLU() # (5): Dropout(p=0.5, inplace=False) # (6): Linear(in_features=4096, out_features=2, bias=True) ) # 第二步(1) 首先凍結參數,即使發生新的訓練也不會進行參數的更新 # 目的是凍結參數,即使發生新的訓練也不會進行參數的更新 for parma in model.parameters(): parma.requires_grad = False # 第二步(2) 對網絡結構進行修改 # 對全連接層的最后一層進行了改寫,torch.nn.Linear(4096, 2)使得最后輸出的結果只有兩個(只需要對貓狗進行分辨就可以了)。 model.classifier = torch.nn.Sequential(torch.nn.Linear(25088, 4096), torch.nn.ReLU(), torch.nn.Dropout(p=0.5), torch.nn.Linear(4096, 4096), torch.nn.ReLU(), torch.nn.Dropout(p=0.5), torch.nn.Linear(4096, 2)) # 對改寫后的模型進行查看 print(model) if use_gpu: model = model.cuda() ''' ###################### 遷移學習 第三步:進行對修改的模型進行訓練(其實只對修改后的部分進行權重學習)###################### # 交叉熵 loss cost = torch.nn.CrossEntropyLoss() # 只對全連接層參數進行更新優化,loss計算依然使用交叉熵 optimizer = torch.optim.Adam(model.classifier.parameters()) # 進行訓練 進行 n_epochs 次訓練 n_epochs = 1 for epoch in range(n_epochs): since = time.time() print("Epoch{}/{}".format(epoch, n_epochs)) print("-" * 10) for param in ["train", "val"]: if param == "train": model.train = True else: model.train = False running_loss = 0.0 running_correct = 0 batch = 0 for data in data_loader_imge[param]: batch += 1 X, y = data if use_gpu: X, y = Variable(X.cuda()), Variable(y.cuda()) else: X, y = Variable(X), Variable(y) optimizer.zero_grad() y_pred = model(X) _, pred = torch.max(y_pred.data, 1) loss = cost(y_pred, y) if param == "train": loss.backward() optimizer.step() running_loss += loss.data running_correct += torch.sum(pred == y.data) if batch % 500 == 0 and param == "train": print("Batch {}, Train Loss:{:.4f}, Train ACC:{:.4f}".format( batch, running_loss / (4 * batch), 100 * running_correct / (4 * batch))) epoch_loss = running_loss / len(data_img[param]) epoch_correct = 100 * running_correct / len(data_img[param]) print("{} Loss:{:.4f}, Correct{:.4f}".format(param, epoch_loss, epoch_correct)) now_time = time.time() - since print("Training time is:{:.0f}m {:.0f}s".format(now_time // 60, now_time % 60)) ''' ###################### 遷移學習 第四步:對完成訓練的修改的模型權重進行保存 ###################### # 保存網絡中的參數, 速度快,占空間少 torch.save(model.state_dict(), "model_vgg16_finetune.pk") # 讀取網絡中的參數 model.load_state_dict(torch.load("model_vgg16_finetune.pk")) # 參數總量: pytorch自帶方法,計算模型參數總量 total = sum([param.nelement() for param in model.parameters()]) print("Number of parameter: %.2fM" % (total / 1e6)) ###################### 遷移學習 第五步:對完成訓練的修改的模型進行測試驗證 ###################### # 測試,固定BN和DropOut,不用再次參與計算,使其使用訓練好的參數值 model.eval() # 測試時,不再計算梯度提高運算效率 with torch.no_grad(): batch_size = 16 data_test_img = datasets.ImageFolder(root=os.path.join(path, 'test'), transform=transform) data_loader_test_img = torch.utils.data.DataLoader(dataset=data_test_img, batch_size=batch_size) image, label = next(iter(data_loader_test_img)) images = Variable(image.cuda()) ###### 提取任意層特征 1 (batch_size = 1) ###### ''' # 模塊 features模塊 avgpool模塊 classifier模塊 for name, module in model._modules.items(): print(name) # features # avgpool # classifier if name == 'features': feature_img = module(images) print(feature_img.shape) # features 模塊 第0層 的特征 for name, module in model._modules['features']._modules.items(): print(name) images = module(images) print(images.shape) if name == '0': break ''' ###### 提取任意層特征 2 ###### 推薦使用 features = [] def hook(module, input, output): # module: model.features # input: in forward function # output self.features() print('module: ', module) print('input val: ', input) print('output val ', output) features.append(output.clone().detach()) # features avgpool classifier # 替換 module-name 即可 ---> model.module-name.register_forward_hook handle = model.features.register_forward_hook(hook) model_out = model(images) features_out = features[0] print(features_out.size()) handle.remove() ###### end 提取任意層特征 2 ###### 推薦使用 infer_time_start = time.time() y_pred = model(images) infer_time = time.time() - infer_time_start print('infer time: {0} ms'.format(infer_time*1000)) # 每行 最大值索引 _, pred = torch.max(y_pred.data, 1) print(pred) img = make_grid(image)# image shape (B x C x H x W) img = img.numpy().transpose(1, 2, 0)# H x W x C mean = [0.5, 0.5, 0.5] std = [0.5, 0.5, 0.5] img = img*std+mean print("Pred Label:", [classes[i] for i in pred]) plt.imshow(img) # plt.show() def print_hi(name): print(f'Hi, {name}') if __name__ == '__main__': print_hi('transfer learning....')