一、概述
鏈接:https://pan.baidu.com/s/18nfxLB2cDE-ZLPXuS-TCjg
提取碼:v5li
關注公眾號:嬉皮工匠
獲取更多論文筆記~
聯邦學習(Federated Learning)結構由Server和若干Client組成,在聯邦學習方法過程中,沒有任何用戶數據被傳送到Server端,這保護了用戶數據的隱私。此外,通信中傳輸的參數是特定於改進當前模型的,因此一旦應用了它們,Server就沒有理由存儲它們,這進一步提高了安全性。
下圖為聯邦學習的總體框架,由Server和若干Client構成,大概的思路是“數據不動模型動”。具體而言,Server提供全局共享的模型,Client下載模型並訓練自己的數據集,同時更新模型參數。在Server和Client的每一次通信中,Server將當前的模型參數分發給各個Client(或者說Client下載服務端的模型參數),經過Client的訓練之后,將更新后的模型參數返回給Server,Server通過某種方法將聚合得到的N個模型參數融合成一個作為更新后的Server模型參數。以此循環。
本文以FedAvg方法、Mnist數據集為例,假設共100個Client,實戰聯邦學習方法,梳理其方法流程,完成代碼見文末。
首先,我們要為每個客戶端分配數據,實際上是每個客戶端自身有獨有的數據,這里為了模擬,手動划分數據集給各個客戶端。
客戶端之間的數據可能是獨立同分布IID,也可能是非獨立同分布Non-IID的。
對於IID的情況,我們首先將數據集打亂,然后為每個Client分配600個樣本。
對於Non-IID的情況,我們首先根據數據標簽將數據集排序(即MNIST中的數字大小),然后將其划分為200組大小為300的數據切片,然后分給每個Client兩個切片。
這兩種數據分配方式的代碼如下:
if isIID:
order = np.arange(self.train_data_size)
np.random.shuffle(order)
self.train_data = train_images[order]
self.train_label = train_labels[order]
else:
labels = np.argmax(train_labels, axis=1)
order = np.argsort(labels)
self.train_data = train_images[order]
self.train_label = train_labels[order]
然后就可以用一個循環來為每個Client分配數據。
下面進入正題:聯邦訓練。
首先,Server初始化並共享其模型的參數。
net = Model() # 初始化模型
global_parameters = net.state_dict() # 獲取模型參數以共享
獲取到共享的模型參數后,即可開始若干次的Server和Client間通信。通信的流程見代碼注釋:
# num_comm 表示通信次數,此處設置為1k
for i in range(args['num_comm']):
# 隨機選擇一部分Client,全部選擇會增大通信量,且實驗效果可能會不好
# clients_in_comm表示每次通訊中隨機選擇的Client數量
order = np.random.permutation(args['num_of_clients'])
clients_in_comm = ['client{}'.format(i) for i in order[0:num_in_comm]]
sum_parameters = None
# 每個Client基於當前模型參數和自己的數據訓練並更新模型,返回每個Client更新后的參數
for client in tqdm(clients_in_comm):
# 獲取當前Client訓練得到的參數
local_parameters = myClients.clients_set[client].localUpdate(args['epoch'], args['batchsize'], net,loss_func, opti, global_parameters)
# 對所有的Client返回的參數累加(最后取平均值)
if sum_parameters is None:
sum_parameters = local_parameters
else:
for var in sum_parameters:
sum_parameters[var] = sum_parameters[var] + local_parameters[var]
# 取平均值,得到本次通信中Server得到的更新后的模型參數
for var in global_parameters:
global_parameters[var] = (sum_parameters[var] / num_in_comm)
local_parameters = myClients.clients_set[client].localUpdate(args['epoch'], args['batchsize'], net,loss_func, opti, global_parameters)這一行代碼表示Client端的訓練函數,我們詳細展開:
def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters):
'''
:param localEpoch: 當前Client的迭代次數
:param localBatchSize: 當前Client的batchsize大小
:param Net: Server共享的模型
:param lossFun: 損失函數
:param opti: 優化函數
:param global_parameters: 當前通信中最新全局參數
:return: 返回當前Client基於自己的數據訓練得到的新的模型參數
'''
# 加載當前通信中最新全局參數
Net.load_state_dict(global_parameters, strict=True)
# 載入Client自有數據集
self.train_dl = DataLoader(self.train_ds, batch_size=localBatchSize, shuffle=True)
# 設置迭代次數
for epoch in range(localEpoch):
for data, label in self.train_dl:
data, label = data.to(self.dev), label.to(self.dev)
preds = Net(data)
loss = lossFun(preds, label)
loss.backward()
opti.step()
opti.zero_grad()
# 返回當前Client基於自己的數據訓練得到的新的模型參數
return Net.state_dict()
訓練結束之后,我們要通過測試集來驗證方法的泛化性,注意,**雖然訓練時,Server沒有得到過任何一條數據,但是聯邦學習最終的目的還是要在Server端學習到一個魯棒的模型,所以在做測試的時候,是在Server端進行的,**如下:
with torch.no_grad():
# 加載Server在最后得到的模型參數
net.load_state_dict(global_parameters, strict=True)
sum_accu = 0
num = 0
# 載入測試集
for data, label in testDataLoader:
data, label = data.to(dev), label.to(dev)
preds = net(data)
preds = torch.argmax(preds, dim=1)
sum_accu += (preds == label).float().mean()
num += 1
print('accuracy: {}'.format(sum_accu / num))
總結:
回顧上述方法流程,有幾個關鍵的參數。一個是每個Client的訓練迭代次數epoch,隨着epoch的增加,意味着Client運算量的增加;另一個參數是通信次數,通信次數的增加意味着會增加網絡傳輸的負擔,且可能收到網絡帶寬的限制。
參考:
二、實戰
原文鏈接: https://towardsdatascience.com/federated-learning-a-simple-implementation-of-fedavg-federated-averaging-with-pytorch-90187c9c9577
搬運理由: 知乎沒有聯邦學習具體代碼實現的教程(侵刪)
在 Fedrated Learning 中,每個客戶數據都分散地在本地訓練其模型,僅將學習到的模型參數發送到受信任的 Server,通過差分隱私加密和安全聚合等技術得到主模型。然后,受信任的 Server 將聚合的主模型發回給這些客戶端,並重復此過程。
在這種情況下,准備了一個具有 IID(獨立同分布)數據的簡單實現,以演示如何將在不同節點上運行的數百個不同模型的參數與 FedAvg 方法結合使用,以及該模型是否會給出合理的結果。此實現是在 MNIST 數據集上執行的。MNIST 數據集包含數量為 0 到 9 的 28 * 28 像素灰度圖像。
FedAvg 訓練過程:
- 由於主模型的參數和節點中所有局部模型的參數都是隨機初始化的,所有這些參數將彼此不同。因此,在對節點中的本地模型進行訓練之前,主模型會將模型參數發送給節點。
- 節點使用這些參數在其自身的數據上訓練本地模型。
- 每個節點在訓練自己的模型時都會更新其參數。訓練過程完成后,每個節點會將其參數發送到主模型。
- 主模型采用這些參數的平均值並將其設置為新的權重參數,並將其傳遞回節點以進行下一次迭代。
函數介紹
數據分發
- split_and_shuffle_labels(y_data,seed,amount): 數據集的每個標簽包含的樣本量不相等,為了保證數據作為 IID 分發到節點,必須采用相等數量的數據。
該函數保證數據數量相同,並在其內部隨機排序。(此處改組的是數據索引,以后我們將在檢索數據時使用該索引)。
def split_and_shuffle_labels(y_data, seed, amount): y_data=pd.DataFrame(y_data,columns=["labels"]) y_data["i"]=np.arange(len(y_data)) label_dict = dict() for i in range(10): var_name="label" + str(i) label_info=y_data[y_data["labels"]==i] np.random.seed(seed) label_info=np.random.permutation(label_info) label_info=label_info[0:amount] label_info=pd.DataFrame(label_info, columns=["labels","i"]) label_dict.update({var_name: label_info }) return label_dict
- get_iid_subsamples_indices(label_dict,number_of_samples,數量):均分每個索引,保證每個節點中各標簽數量相等。
def get_iid_subsamples_indices(label_dict, number_of_samples, amount): sample_dict= dict() batch_size=int(math.floor(amount/number_of_samples)) for i in range(number_of_samples): sample_name="sample"+str(i) dumb=pd.DataFrame() for j in range(10): label_name=str("label")+str(j) a=label_dict[label_name][i*batch_size:(i+1)*batch_size] dumb=pd.concat([dumb,a], axis=0) dumb.reset_index(drop=True, inplace=True) sample_dict.update({sample_name: dumb}) return sample_dict
- create_iid_subsamples(sample_dict, x_data, y_data, x_name, y_name) : 將數據分發到節點。
def create_iid_subsamples(sample_dict, x_data, y_data, x_name, y_name):
x_data_dict= dict()
y_data_dict= dict()
for i in range(len(sample_dict)): ### len(sample_dict)= number of samples
xname= x_name+str(i)
yname= y_name+str(i)
sample_name="sample"+str(i)
indices=np.sort(np.array(sample_dict[sample_name]["i"]))
x_info= x_data[indices,:]
x_data_dict.update({xname : x_info})
y_info= y_data[indices]
y_data_dict.update({yname : y_info})
return x_data_dict, y_data_dict
Functions for FedAvg
- create_model_optimizer_criterion_dict(number_of_samples) : 創建模型,優化器和損失函數。
def create_model_optimizer_criterion_dict(number_of_samples):
model_dict = dict()
optimizer_dict= dict()
criterion_dict = dict()
for i in range(number_of_samples):
model_name="model"+str(i)
model_info=Net2nn()
model_dict.update({model_name : model_info })
optimizer_name="optimizer"+str(i)
optimizer_info = torch.optim.SGD(model_info.parameters(), lr=learning_rate, momentum=momentum)
optimizer_dict.update({optimizer_name : optimizer_info })
criterion_name = "criterion"+str(i)
criterion_info = nn.CrossEntropyLoss()
criterion_dict.update({criterion_name : criterion_info})
return model_dict, optimizer_dict, criterion_dict
- get_averaged_weights(model_dict, number_of_samples) : 獲取各個節點中權重的平均值。
def get_averaged_weights(model_dict, number_of_samples):
fc1_mean_weight = torch.zeros(size=model_dict[name_of_models[0]].fc1.weight.shape)
fc1_mean_bias = torch.zeros(size=model_dict[name_of_models[0]].fc1.bias.shape)
fc2_mean_weight = torch.zeros(size=model_dict[name_of_models[0]].fc2.weight.shape)
fc2_mean_bias = torch.zeros(size=model_dict[name_of_models[0]].fc2.bias.shape)
fc3_mean_weight = torch.zeros(size=model_dict[name_of_models[0]].fc3.weight.shape)
fc3_mean_bias = torch.zeros(size=model_dict[name_of_models[0]].fc3.bias.shape)
with torch.no_grad():
for i in range(number_of_samples):
fc1_mean_weight += model_dict[name_of_models[i]].fc1.weight.data.clone()
fc1_mean_bias += model_dict[name_of_models[i]].fc1.bias.data.clone()
fc2_mean_weight += model_dict[name_of_models[i]].fc2.weight.data.clone()
fc2_mean_bias += model_dict[name_of_models[i]].fc2.bias.data.clone()
fc3_mean_weight += model_dict[name_of_models[i]].fc3.weight.data.clone()
fc3_mean_bias += model_dict[name_of_models[i]].fc3.bias.data.clone()
fc1_mean_weight =fc1_mean_weight/number_of_samples
fc1_mean_bias = fc1_mean_bias/ number_of_samples
fc2_mean_weight =fc2_mean_weight/number_of_samples
fc2_mean_bias = fc2_mean_bias/ number_of_samples
fc3_mean_weight =fc3_mean_weight/number_of_samples
fc3_mean_bias = fc3_mean_bias/ number_of_samples
return fc1_mean_weight, fc1_mean_bias, fc2_mean_weight, fc2_mean_bias, fc3_mean_weight, fc3_mean_bias
- set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples) : 將各個節點的平均權重發送到主模型,並將它們設置為主模型的新權重。
def set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples):
fc1_mean_weight, fc1_mean_bias, fc2_mean_weight, fc2_mean_bias, fc3_mean_weight, fc3_mean_bias = get_averaged_weights(model_dict, number_of_samples=number_of_samples)
with torch.no_grad():
main_model.fc1.weight.data = fc1_mean_weight.data.clone()
main_model.fc2.weight.data = fc2_mean_weight.data.clone()
main_model.fc3.weight.data = fc3_mean_weight.data.clone()
main_model.fc1.bias.data = fc1_mean_bias.data.clone()
main_model.fc2.bias.data = fc2_mean_bias.data.clone()
main_model.fc3.bias.data = fc3_mean_bias.data.clone()
return main_model
- compare_local_and_merged_model_performance(number_of_samples : 比較主模型和在各節點運行的本地模型的准確性。
def compare_local_and_merged_model_performance(number_of_samples):
accuracy_table=pd.DataFrame(data=np.zeros((number_of_samples,3)), columns=["sample", "local_ind_model", "merged_main_model"])
for i in range (number_of_samples):
test_ds = TensorDataset(x_test_dict[name_of_x_test_sets[i]], y_test_dict[name_of_y_test_sets[i]])
test_dl = DataLoader(test_ds, batch_size=batch_size * 2)
model=model_dict[name_of_models[i]]
criterion=criterion_dict[name_of_criterions[i]]
optimizer=optimizer_dict[name_of_optimizers[i]]
individual_loss, individual_accuracy = validation(model, test_dl, criterion)
main_loss, main_accuracy =validation(main_model, test_dl, main_criterion )
accuracy_table.loc[i, "sample"]="sample "+str(i)
accuracy_table.loc[i, "local_ind_model"] = individual_accuracy
accuracy_table.loc[i, "merged_main_model"] = main_accuracy
return accuracy_table
- send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples) : 將主模型的參數發送到各節點。
def send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples):
with torch.no_grad():
for i in range(number_of_samples):
model_dict[name_of_models[i]].fc1.weight.data =main_model.fc1.weight.data.clone()
model_dict[name_of_models[i]].fc2.weight.data =main_model.fc2.weight.data.clone()
model_dict[name_of_models[i]].fc3.weight.data =main_model.fc3.weight.data.clone()
model_dict[name_of_models[i]].fc1.bias.data =main_model.fc1.bias.data.clone()
model_dict[name_of_models[i]].fc2.bias.data =main_model.fc2.bias.data.clone()
model_dict[name_of_models[i]].fc3.bias.data =main_model.fc3.bias.data.clone()
return model_dict
- start_train_end_node_process_without_print() : 在節點中訓練各個局部模型。
def start_train_end_node_process_without_print(number_of_samples):
for i in range (number_of_samples):
train_ds = TensorDataset(x_train_dict[name_of_x_train_sets[i]], y_train_dict[name_of_y_train_sets[i]])
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_ds = TensorDataset(x_test_dict[name_of_x_test_sets[i]], y_test_dict[name_of_y_test_sets[i]])
test_dl = DataLoader(test_ds, batch_size= batch_size * 2)
model=model_dict[name_of_models[i]]
criterion=criterion_dict[name_of_criterions[i]]
optimizer=optimizer_dict[name_of_optimizers[i]]
for epoch in range(numEpoch):
train_loss, train_accuracy = train(model, train_dl, criterion, optimizer)
test_loss, test_accuracy = validation(model, test_dl, criterion)
基於 centralized-data 模型的性能如何?
centralized_model = Net2nn()
centralized_optimizer = torch.optim.SGD(centralized_model.parameters(), lr=0.01, momentum=0.9)
centralized_criterion = nn.CrossEntropyLoss()
train_ds = TensorDataset(x_train, y_train)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
valid_ds = TensorDataset(x_valid, y_valid)
valid_dl = DataLoader(valid_ds, batch_size=batch_size * 2)
test_ds = TensorDataset(x_test, y_test)
test_dl = DataLoader(test_ds, batch_size=batch_size * 2)
print("------ Centralized Model ------")
for epoch in range(numEpoch):
central_train_loss, central_train_accuracy = train(centralized_model, train_dl, centralized_criterion, centralized_optimizer)
central_test_loss, central_test_accuracy = validation(centralized_model, test_dl, centralized_criterion)
print("epoch: {:3.0f}".format(epoch+1) + " | train accuracy: {:7.4f}".format(central_train_accuracy) + " | test accuracy: {:7.4f}".format(central_test_accuracy))
— — — Centralized Model — — —
epoch: 1 | train accuracy: 0.8743 | test accuracy: 0.9437
epoch: 2 | train accuracy: 0.9567 | test accuracy: 0.9654
epoch: 3 | train accuracy: 0.9712 | test accuracy: 0.9701
epoch: 4 | train accuracy: 0.9785 | test accuracy: 0.9738
epoch: 5 | train accuracy: 0.9834 | test accuracy: 0.9713
epoch: 6 | train accuracy: 0.9864 | test accuracy: 0.9768
epoch: 7 | train accuracy: 0.9898 | test accuracy: 0.9763
epoch: 8 | train accuracy: 0.9923 | test accuracy: 0.9804
epoch: 9 | train accuracy: 0.9941 | test accuracy: 0.9784
epoch: 10 | train accuracy: 0.9959 | test accuracy: 0.9792
— — — Training finished — — -
此處的目的是比較通過將在自己的數據上訓練的局部模型的參數與在所有訓練數據上訓練的集中式模型相結合而形成的主模型的性能。
將數據分發到節點
label_dict_train=split_and_shuffle_labels(y_data=y_train, seed=1, amount=train_amount)
sample_dict_train=get_iid_subsamples_indices(label_dict=label_dict_train, number_of_samples=number_of_samples, amount=train_amount)
x_train_dict, y_train_dict = create_iid_subsamples(sample_dict=sample_dict_train, x_data=x_train, y_data=y_train, x_name="x_train", y_name="y_train")
label_dict_valid = split_and_shuffle_labels(y_data=y_valid, seed=1, amount=train_amount)
sample_dict_valid = get_iid_subsamples_indices(label_dict=label_dict_valid, number_of_samples=number_of_samples, amount=valid_amount)
x_valid_dict, y_valid_dict = create_iid_subsamples(sample_dict=sample_dict_valid, x_data=x_valid, y_data=y_valid, x_name="x_valid", y_name="y_valid")
label_dict_test = split_and_shuffle_labels(y_data=y_test, seed=1, amount=test_amount)
sample_dict_test = get_iid_subsamples_indices(label_dict=label_dict_test, number_of_samples=number_of_samples, amount=test_amount)
x_test_dict, y_test_dict = create_iid_subsamples(sample_dict=sample_dict_test, x_data=x_test, y_data=y_test, x_name="x_test", y_name="y_test")
創建主模型
main_model = Net2nn()
main_optimizer = torch.optim.SGD(main_model.parameters(), lr=learning_rate, momentum=0.9)
main_criterion = nn.CrossEntropyLoss()
定義節點中的模型,優化器和損失函數
model_dict, optimizer_dict, criterion_dict = create_model_optimizer_criterion_dict(number_of_samples)
將字典鍵變為可迭代類型
name_of_x_train_sets=list(x_train_dict.keys())
name_of_y_train_sets=list(y_train_dict.keys())
name_of_x_valid_sets=list(x_valid_dict.keys())
name_of_y_valid_sets=list(y_valid_dict.keys())
name_of_x_test_sets=list(x_test_dict.keys())
name_of_y_test_sets=list(y_test_dict.keys())
name_of_models=list(model_dict.keys())
name_of_optimizers=list(optimizer_dict.keys())
name_of_criterions=list(criterion_dict.keys())
主模型的參數發送到節點
由於主模型的參數和節點中所有本地模型的參數都是隨機初始化的,因此所有這些參數將彼此不同。因此,在對節點中的本地模型進行訓練之前,主模型會將其參數發送給節點。
model_dict=send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples)
訓練節點中的模型
start_train_end_node_process_without_print(number_of_samples)
讓我們比較一下聯邦學習主模型和 centralized-model 的性能
before_test_loss, before_test_accuracy = validation(main_model, test_dl, main_criterion)
main_model= set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples)
after_test_loss, after_test_accuracy = validation(main_model, test_dl, main_criterion)
print("Before 1st iteration main model accuracy on all test data: {:7.4f}".format(before_test_accuracy))
print("After 1st iteration main model accuracy on all test data: {:7.4f}".format(after_test_accuracy))
print("Centralized model accuracy on all test data: {:7.4f}".format(central_test_accuracy))
Before 1st iteration main model accuracy on all test data: 0.1180
After 1st iteration main model accuracy on all test data: 0.8529
Centralized model accuracy on all test data: 0.9790
當重復迭代10次以上時主模型的性能:
for i in range(10):
model_dict=send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples)
start_train_end_node_process_without_print(number_of_samples)
main_model= set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples)
test_loss, test_accuracy = validation(main_model, test_dl, main_criterion)
print("Iteration", str(i+2), ": main_model accuracy on all test data: {:7.4f}".format(test_accuracy))
Iteration 2 : main_model accuracy on all test data: 0.8928
Iteration 3 : main_model accuracy on all test data: 0.9073
Iteration 4 : main_model accuracy on all test data: 0.9150
Iteration 5 : main_model accuracy on all test data: 0.9209
Iteration 6 : main_model accuracy on all test data: 0.9273
Iteration 7 : main_model accuracy on all test data: 0.9321
Iteration 8 : main_model accuracy on all test data: 0.9358
Iteration 9 : main_model accuracy on all test data: 0.9382
Iteration 10 : main_model accuracy on all test data: 0.9411
Iteration 11 : main_model accuracy on all test data: 0.9431
Github 源碼地址https://github.com/eceisik/fl_p