Pytorch 是目前最好用的神經網絡庫之一,最近我寫了一個pytorch的簡單代碼,在這里對其做一個全面的介紹。
在pytorch 中一些常用的功能都已經被封裝成了模塊,所以我們只需要繼承並重寫部分函數即可。首先介紹一下本文最終希望實現的目標, 對本地的一維數據 (1xn)的ndarry 進行一個多分類,數據集為mn的數據,標簽為m1的數組。下面是結合代碼記錄一下踩坑過程。
- 繼承Dataset類,可以看到我這里重寫了三個函數,init 函數用於載入numpy數據並將其轉化為相應的tensor,__gititem__函數用於定義訓練時會返回的單個數據與標簽,__len__表示數據數量m。
class MyDataSet(torch.utils.data.Dataset):
def __init__(self, data, label):
self.data = torch.from_numpy(data).float()
self.label = torch.from_numpy(label)
self.length = label.shape[0]
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return self.length
- 通過繼承nn.Module來自定義神經網絡
其中__init__函數來自定義定義我們需要的網絡參數,這里我們block1 的in_channels為1,輸出參數可根據需要自己設定,但而且當前層的輸出channel應該和下一層的輸入channel相同,
注意:MaxPool1d的inchannel需要自己計算一下,當然如果你不想算,可以給個參數直接運行,看報錯信息的提示
__forward__ 函數定義了網絡的連接方式,注意此處應返回x。
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.block1 = nn.Sequential(
nn.Conv1d(in_channels=filter_num[0], out_channels=filter_num[1], padding='same', stride=conv_stride_size,
kernel_size=conv_kernel_size, dtype=dtype),
nn.BatchNorm1d(filter_num[1],dtype=dtype),
nn.ReLU(),
nn.Conv1d(filter_num[1], filter_num[1], kernel_size=conv_kernel_size, stride=conv_stride_size, padding='same',dtype=dtype),
nn.BatchNorm1d(filter_num[1],dtype=dtype),
nn.ReLU(),
nn.MaxPool1d(kernel_size=pool_kernel_size, stride=pool_stride, padding=pool_padding),
nn.Dropout(drop_float),
)
self.block2=nn.Sequential(
nn.Conv1d(in_channels=filter_num[1],out_channels=filter_num[2], padding='same', stride=conv_stride_size,
kernel_size=conv_kernel_size, dtype=dtype),
nn.BatchNorm1d(num_features=filter_num[2],dtype=dtype),
nn.ReLU(),
nn.Conv1d(filter_num[2],filter_num[2],kernel_size=conv_kernel_size,stride=conv_stride_size,
padding='same',dtype=dtype),
nn.BatchNorm1d(filter_num[2],dtype=dtype),
nn.ReLU(),
nn.MaxPool1d(kernel_size=pool_kernel_size, stride=pool_stride,padding=pool_padding),
nn.Dropout(drop_float),
)
self.block3=nn.Sequential(
nn.Conv1d(in_channels=filter_num[2],out_channels=filter_num[3], padding='same', stride=conv_stride_size,
kernel_size=conv_kernel_size, dtype=dtype),
nn.BatchNorm1d(num_features=filter_num[3],dtype=dtype),
nn.ReLU(),
nn.Conv1d(filter_num[3],filter_num[3],kernel_size=conv_kernel_size,stride=conv_stride_size,
padding='same',dtype=dtype),
nn.BatchNorm1d(filter_num[3],dtype=dtype),
nn.ReLU(),
nn.MaxPool1d(kernel_size=pool_kernel_size, stride=pool_stride,padding=pool_padding),
nn.Dropout(drop_float),
)
self.block4=nn.Sequential(
nn.Conv1d(in_channels=filter_num[3],out_channels=filter_num[4], padding='same', stride=conv_stride_size,
kernel_size=conv_kernel_size, dtype=dtype),
nn.BatchNorm1d(num_features=filter_num[4],dtype=dtype),
nn.ReLU(),
nn.Conv1d(filter_num[4],filter_num[4],kernel_size=conv_kernel_size,stride=conv_stride_size,
padding='same',dtype=dtype),
nn.BatchNorm1d(filter_num[4],dtype=dtype),
nn.ReLU(),
nn.MaxPool1d(kernel_size=pool_kernel_size, stride=pool_stride,padding=pool_padding),
nn.Dropout(drop_float),
)
self.connected = nn.Sequential(
nn.Flatten(),
nn.BatchNorm1d(num_features=1280 ,dtype=dtype),
nn.ReLU(),
nn.Dropout(0.7),
nn.Flatten(),
nn.BatchNorm1d(num_features=1280 ,dtype=dtype),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(1280, 180,dtype=dtype)
)
def forward(self, x):
x = self.block1(x)
x= self.block2(x)
x= self.block3(x)
x= self.block4(x)
x = self.connected(x)
return x
- 主程序。為了更好的說明,先放一下主程序。這里的程序是已經載入了數據的,data是mn 數組,label為m1數組。
實例化DataLoader的第一個參數是Dataset的實例,通過DataLoader,其功能是為下文訓練和測試過程提供數據。
print("data loading ...")
train_data_set = MyDataSet(trainingDataLoadProcess.data, trainingDataLoadProcess.label)
test_data_set = MyDataSet(testDataLoadProcess.data, testDataLoadProcess.label)
train_dataloader = DataLoader(train_data_set, batch_size=batch_size, shuffle=shuffle)
test_dataloader = DataLoader(test_data_set, batch_size=batch_size, shuffle=shuffle)
print("model constructing...")
model_test = NeuralNetwork()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adamax(model_test.parameters(), lr=learning_rate, betas=(beta_1, beta_2), weight_decay=0.0)
print(model_test)
for t in range(epochs):
print(f'Epoch{t + 1}\n-------------')
train_loop(train_dataloader, model_test, loss_fn, optimizer)
test_loop(test_dataloader, model_test, loss_fn)
print("Done!")
- 定義訓練階段,從DataLoader中取出數據,這里X,y分別為batch_sizen,batch_size1的數據。
首先要進行一個調整,將X調整為batch_size1n的float,設置float的轉化過程放在Dataset的初始化函數里完成了
注意:如果沒有這一步會報錯
期望是long但得到了float的錯誤。(雖然我也不明白為啥錯誤不是期望float...)
y為1*batch的數組並轉化成long(這里y的形式可能與損失函數有關)
def train_loop(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset)
for batch, (X, y) in enumerate(dataloader):
X = X.unsqueeze(1)
y=y.squeeze(1).long()
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
- 定義測試過程(同上)
def test_loop(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
X = X.unsqueeze(1)
y=y.squeeze(1).long()
pred = model(X)
test_loss = loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
大體流程就是這些,最后記得修改加入輸出語句與保存模型等操作。