模型
架構假設
TFF基本假設:模型代碼必須可序列化為 TensorFlow 計算圖,以便支持資源有限的移動和嵌入式設備。TFF 中的序列化目前遵循 TF 1.0 模式,其中所有代碼必須在 TFF 控制的 tf.Graph
中構造。作為一個強類型環境,TFF 需要一些額外的元數據,例如您的模型輸入類型的規范。
聯合學習中至少有兩個聚合層:本地設備端聚合和跨設備聚合
- 本地聚合:這種級別的聚合是指跨各個客戶端所擁有的多批次樣本進行的聚合。
- 該模型先構造
tf.Variable
來存放聚合- TFF 在
Model
上按順序對客戶端數據的后續批次多次調用forward_pass
方法,從而以副作用形式讓您更新存放各種聚合的變量。- 最后,TFF 在您的模型上調用
report_local_outputs
方法,從而讓您的模型將它收集的所有匯總統計數據
- 聯合聚合:這種級別的聚合是指跨系統中多個客戶端(設備)的聚合。
- 初始模型(以及訓練所需要的任何參數)由服務器分發給將參與一輪訓練或評估的客戶端子集。
- 對於每個客戶端,在本地數據批次流上反復調用您的模型代碼(無論是獨立還是並行),從而產生一組新的模型參數(訓練時),以及一組新的本地指標
- TFF 運行分布聚合協議來累積和聚合整個系統中的模型參數以及本地導出的指標。該邏輯使用 TFF 自有的
聯合計算
語言(不是在 TensorFlow 中),在模型的federated_output_computation
中以聲明式方式進行表達。
抽象接口
這種基本構造函數
+ 元數據
接口由接口 tff.learning.Model
表示:
- 構造函數、
forward_pass
和report_local_outputs
方法應相應地構造模型變量、前向傳遞以及要報告的統計數據。如上所述,這些方法構造的 TensorFlow 必須可序列化。input_spec
屬性以及返回可訓練、不可訓練和本地變量的 3 個屬性表示元數據。TFF 使用此信息確定如何將模型的各個部分連接到聯合優化算法,同時定義內部類型簽名來幫助驗證構造的系統的正確性。
抽象接口 tff.learning.Model
會公開屬性 federated_output_computation
。
# variable namedtuple/container. weights bias are model variable, the rest are metrics
MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
# All state that your model will use must be captured as TensorFlow variables
# Initialize variables
def create_mnist_variable():
return MnistVariables(
weights=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name = 'weights',
trainable=True),
bias=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10, )),
name='bias',
trainable=True),
num_examples = tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum = tf.Variable(0.0, name="loss_sum", trainable=False),
accuracy_sum = tf.Variable(0.0, name="accuracy_sum", trainable=False)
)
# batch local training process (the same with the tensorflow)
def predict_on_batch(variables, x):
return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)
# forward pass pices, which is reused by clients
def mnist_forward_pass(variables, batch):
y = predict_on_batch(variables, batch['x'])
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
variables.num_examples.assign_add(num_examples)
variables.loss_sum.assign_add(loss * num_examples)
variables.accuracy_sum.assign_add(accuracy * num_examples)
return loss, predictions
def get_local_mnist_metrics(variables):
return collections.OrderedDict(
num_examples=variables.num_examples,
loss=variables.loss_sum / variables.num_examples,
accuracy = variables.accuracy_sum / variables.num_examples
)
# protocol of federated_output_computation
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
return collections.OrderedDict(
num_examples = tff.federated_sum(metrics.num_examples), # tff code
loss = tff.federated_mean(metrics.loss, metrics.num_examples), # tff code
accuracy = tff.federated_mean(metrics.accuracy, metrics.num_examples) # tff code
)
from typing import Callable, List
# Creating Model, inheirting from tff.learning.Model
class MnistModel(tff.learning.Model):
def __init__(self) -> None:
self._variables = create_mnist_variable() # initialize variables
@property
def trainable_variables(self):
return [self._variables.weights, self._variables.bias]
@property
def non_trainable_variables(self):
return []
@property
def local_variables(self):
return [self._variables.num_examples, self._variables.loss_sum,
self._variables.accuracy_sum]
# describe what form of data it accepts
@property
def input_spec(self):
return collections.OrderedDict(
x=tf.TensorSpec([None, 784], tf.float32),
y=tf.TensorSpec([None, 1], tf.int32)
)
# wrap by tf.function decorator to abide tf1.0
@tf.function
def predict_on_batch(self, x, training=True):
del training
return predict_on_batch(self._variables, x)
# complete forwar_pass method, which focus on local training
@tf.function
def forward_pass(self, batch, training=True):
del training
loss, predictions = mnist_forward_pass(self._variables, batch)
num_examples = tf.shape(batch['x'])[0]
return tff.learning.BatchOutput(loss=loss, predictions=predictions,
num_examples=num_examples)
# complete report_local_outputs methods, which summarizes metrics
@tf.function
def report_local_outputs(self):
return get_local_mnist_metrics(self._variables)
# expose federated_output_computation method, which is a collaboration protocol
@property
def federated_output_computation(self):
return aggregate_mnist_metrics_across_clients
@tf.function
def report_local_unfinalized_metrics(
self):
"""Creates an `OrderedDict` of metric names to unfinalized values."""
return collections.OrderedDict(
num_examples=[self._variables.num_examples],
loss=[self._variables.loss_sum, self._variables.num_examples],
accuracy=[self._variables.accuracy_sum, self._variables.num_examples])
def metric_finalizers(
self):
"""Creates an `OrderedDict` of metric names to finalizers."""
return collections.OrderedDict(
num_examples=tf.function(func=lambda x: x[0]),
loss=tf.function(func=lambda x: x[0] / x[1]),
accuracy=tf.function(func=lambda x: x[0] / x[1]))
Keras轉換器
TFF 需要的幾乎所有信息都可以通過調用 tf.keras
接口獲得,因此,如果您有一個 Keras 模型,則可以利用 tff.learning.from_keras_model
來構造 tff.learning.Model
。相比於繼承tff.learning.Model
並重寫forward_pass
,report_local_outputs
和federated_output_computation
來說,是一種高級的API。
'''
# template
def model_fn():
keras_model = ...
return tff.learning.from_keras_model(keras_model, sample_batch, loss=...)
'''
# return model instance
def create_keras_model():
return tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
def model_fn():
# We _must_ create a new model here, and _not_ capture it from an external
# scope. TFF will call this within different graph contexts.
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=preprocessed_example_dataset.element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
聯合計算構建器
架構假設
運行聯合計算包括兩個不同階段:
- 編譯:首先,TFF 將聯合學習算法編譯成整個分布計算的抽象序列化表示形式。這時就會發生 TensorFlow 序列化,我們將編譯器產生的序列化表示形式稱為聯合計算。
- 執行:TFF 提供了各種方法來執行這些計算。目前,只有通過本地模擬才能支持執行。
由於 TFF 是函數式編程環境,該計算接受當前狀態以作為輸入,然后提供更新的狀態以作為輸出。為了完整定義有狀態的處理,需要指定初始狀態的來源(否則無法啟動該處理)。這通過幫助類 tff.templates.IterativeProcess
的定義獲取,它具有分別與初始化和迭代對應的 2 個屬性:initialize
和 next
。
Aggregation 方法通過tff.learning.build_federated_averaging_process
中的model_update_aggregation_factory
參數來設置如何聚合client
的模型參數。
mean = tff.aggregators.MeanFactory()
iterative_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=mean)
除了MeanFactory
以外,TFF
還提供了以下功能的Aggregator
- 百分數匹配
- zero過大的數值
- clip
- DP
- lossy compression
- secure aggregation
這個Aggregation
方法還可進行組合。
可用構建器
TFF 為聯合訓練和評估提供了兩個生成聯合計算的構建器函數:
tff.learning.build_federated_averaging_process
使用一個模型函數和一個客戶端優化器,並返回一個有狀態的tff.templates.IterativeProcess
。tff.learning.build_federated_evaluation
使用一個 model 函數,並為模型的聯合評估返回一個單一的聯合計算,因為評估沒有狀態。
iterative_process = tff.learning.build_federated_averaging_process(
model_fn, # argument needs to be a constructor, not an already-constructed instance,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), # local optimizer
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0), # global optimizer
use_experimental_simulation_loop=True)
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))
for round_num in range(2, NUM_ROUNDS):
state, metrics = iterative_process.next(state, federated_train_data)
print('round {:2d}, metrics={}'.format(round_num, metrics))
evaluation = tff.learning.build_federated_evaluation(MnistModel, use_experimental_simulation_loop=True)
test_metrics = evaluation(state.model, federated_test_data)
# evaluation every round.
state = tff.learning.state_with_new_model_weights(
state,
trainable_weights=[v.numpy() for v in keras_model.trainable_weights],
non_trainable_weights=[
v.numpy() for v in keras_model.non_trainable_weights
]) # Returns a ServerState with updated model weights.
def keras_evaluate(state, round_num):
# Take our global model weights and push them back into a Keras model to
# use its standard `.evaluate()` method.
keras_model = load_model(batch_size=BATCH_SIZE) # load pretrained model
keras_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[FlattenedCategoricalAccuracy()])
state.model.assign_weights_to(keras_model) # assign weights_to is a ModelWeight method and copy model to keras model or tff.learning.Model
# tff.learning.assign_weights_to_keras_model(keras_model, state.model)
loss, accuracy = keras_model.evaluate(example_dataset, steps=2, verbose=0) # example_dataset is a specific dataset.
print('\tEval: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))
for round_num in range(NUM_ROUNDS):
print('Round {r}'.format(r=round_num))
keras_evaluate(state, round_num)
state, metrics = fed_avg.next(state, train_datasets)
print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(
l=metrics["train"]["loss"], a=metrics["train"]["accuracy"]))
數據集
架構假設
為了模擬聯合學習代碼的實際部署,通常需要編寫如下訓練循環:
trainer = tff.learning.build_federated_averaging_process(...)
state = trainer.initialize()
federated_training_data = ...
def sample(federate_data):
return ...
while True:
data_for_this_round = sample(federated_training_data)
state, metrics = trainer.next(state, data_for_this_round)
為了實現這一點,在模擬中使用 TFF 時,將聯合數據視為 Python list
,每一個元素來表示參與設備的本地 tf.data.Dataset
。
抽象接口
為了標准化模擬聯合數據集的處理,TFF 提供了一個抽象接口 tff.simulation.ClientData
。通過該接口,用戶可以枚舉客戶端集,並構造包含特定客戶端的數據的 tf.data.Dataset
。在 Eager 模式下,這些 tf.data.Dataset
可作為輸入直接提供給生成的聯合計算。
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data() # an instance of tff.simulation.ClientData class
len(emnist_train.client_ids) # 3383 cliens
emnist_train.element_type_structure # OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[0]) # create dataset for client by tff.simulation.ClientData method create_tf_dataset_for_client -> tf.data.Dataset
example_element = next(iter(example_dataset))
example_element['label'].numpy()
# one of the ways to feed federated data to TFF in a simulation is simply as a Python list, with each element of the list holding the data of an individual user, whether as a list or as a tf.data.Dataset
NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10
def preprocess(dataset):
def batch_format_fn(element):
"""Flatten a batch `pixels` and return the features as an `OrderedDict`."""
return collections.OrderedDict(
x=tf.reshape(element['pixels'], [-1, 784]),
y=tf.reshape(element['label'], [-1, 1]))
return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)
def make_federated_data(client_data, client_ids):
return [
preprocess(client_data.create_tf_dataset_for_client(x))
for x in client_ids
]
federated_train_data = make_federated_data(emnist_train, sample_clients)
federated_test_data = make_federated_data(emnist_test, sample_clients)
Case
Reconstruction Model
In this case, we will use matrix factorization to get two matrix: user matrix and item matrix. Note that: user matrix should be private and stored locally, whereas, the item matrix is common.
we will use tff.leraning.reconstruction.Model
to build our model and the federated computer builder should also be replaced with tff.learning.reconstruction.build_training_process
method and tff.learning.construction.build_federated_evaluation
.
Data preparation
# -> tf.data.Dataset*
def create_tf_datasets(ratings_df:pd.DataFrame, batch_size:int=1, max_examples_per_user: Optional[int]=None, max_clients: Optional[int] = None) -> List[tf.data.Dataset]:
num_users = len(set(ratings_df.UserID))
if max_clients is not None:
num_users = min(num_users, max_clients)
def rating_batch_map_fn(rating_batch):
"""Maps a rating batch to an OrderedDict with tensor values."""
# Each example looks like: {x: movie_id, y: rating}.
# We won't need the UserID since each client will only look at their own
# data.
return collections.OrderedDict([
("x", tf.cast(rating_batch[:, 1:2], tf.int64)),
("y", tf.cast(rating_batch[:, 2:3], tf.float32))
])
tf_datasets = []
for user_id in range(num_users):
user_ratings_df = ratings_df[ratings_df.UserID == user_id]
tf_dataset = tf.data.Dataset.from_tensor_slices(user_ratings_df)
tf_dataset = tf_dataset.take(max_examples_per_user).shuffle(buffer_size=max_examples_per_user, seed=42).batch(batch_size).map(
rating_batch_map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_datasets.append(tf_dataset)
return tf_datasets
# train dataset and eval dataset
def split_tf_datasets(
tf_datasets: List[tf.data.Dataset],
train_fraction: float = 0.8,
val_fraction: float = 0.1,
) -> Tuple[List[tf.data.Dataset], List[tf.data.Dataset], List[tf.data.Dataset]]:
np.random.seed(42)
np.random.shuffle(tf_datasets)
train_idx = int(len(tf_datasets) * train_fraction)
val_idx = int(len(tf_datasets) * (train_fraction + val_fraction))
return (tf_datasets[:train_idx], tf_datasets[train_idx:val_idx],
tf_datasets[val_idx:])
Build Model
There are also two methods to build model: converting from keras model by from_keras_model
or inheriting from tff.learning.reconstruction.Model
class. There, the first method is adopted
# user matrix
class UserEmbedding(tf.keras.layers.Layer):
def __init__(self, num_latent_factors, **kwargs):
super().__init__(**kwargs)
self.num_latent_factors = num_latent_factors
def build(self, input_shape):
self.embeding = self.add_weight(
shape=(1, self.num_latent_factors),
initializer='uniform',
dtype=tf.float32,
name='UserEmbeddingKernel')
super().build(input_shape)
def call(self, inputs):
return self.embeding # there are only one-dimension
def compute_output_shape(self):
return (1, self.num_latent_factors)
def get_matrix_factorization_model(
num_items:int,
num_latent_factors: int) -> tff.learning.reconstruction.Model:
global_layers = []
local_layers = []
# extract the item embedding
item_input = tf.keras.layers.Input(shape=[1], name='Item')
item_embedding_layer = tf.keras.layers.Embedding(
num_items,
num_latent_factors,
name="ItemEmbedding")
global_layers.append(item_embedding_layer)
flat_item_vec = tf.keras.layers.Flatten(name="FlattenItem")(
item_embedding_layer(item_input))
# extract the user embedding
user_embedding_layer = UserEmbedding(
num_latent_factors,
name='UserEmbedding')
local_layers.append(user_embedding_layer)
flat_user_vec = user_embedding_layer(item_input)
pred = tf.keras.layers.Dot(
1, normalize=False, name='Dot')([flat_user_vec, flat_item_vec])
input_spec = collections.OrderedDict(x=tf.TensorSpec(shape=[None, 1], dtype=tf.int64),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.float32))
model = tf.keras.Model(inputs=item_input, outputs=pred)
return tff.learning.reconstruction.from_keras_model(
keras_model=model,
global_layers=global_layers,
local_layers=local_layers,
input_spec=input_spec
)
model_fn = functools.partial(
get_matrix_factorization_model,
num_items=3706,
num_latent_factors=50
)
class RatingAccuracy(tf.keras.metrics.Mean):
"""Keras metric computing accuracy of reconstructed ratings."""
def __init__(self,
name: str = 'rating_accuracy',
**kwargs):
super().__init__(name=name, **kwargs)
def update_state(self,
y_true: tf.Tensor,
y_pred: tf.Tensor,
sample_weight: Optional[tf.Tensor] = None):
absolute_diffs = tf.abs(y_true - y_pred)
# A [batch_size, 1] tf.bool tensor indicating correctness within the
# threshold for each example in a batch. A 0.5 threshold corresponds
# to correctness when predictions are rounded to the nearest whole
# number.
example_accuracies = tf.less_equal(absolute_diffs, 0.5)
super().update_state(example_accuracies, sample_weight=sample_weight)
training_process = tff.learning.reconstruction.build_training_process(
model_fn=model_fn,
loss_fn=loss_fn,
metrics_fn=metrics_fn,
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(1.0),
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.5),
reconstruction_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.1))
evaluation_computation = tff.learning.reconstruction.build_federated_evaluation(
model_fn,
loss_fn=loss_fn,
metrics_fn=metrics_fn,
reconstruction_optimizer_fn=functools.partial(
tf.keras.optimizers.SGD, 0.1))
Training
state = training_process.initialize()
print(state.model)
print('Item variables shape:', state.model.trainable[0].shape)
eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Initial Eval:', eval_metrics['eval'])
# select clients
federated_train_data = np.random.choice(tf_train_datasets, size=50, replace=False).tolist()
state, metrics = training_process.next(state, federated_train_data)
print(f'Train metrics:', metrics['train'])
NUM_ROUNDS = 20
train_losses = []
train_accs = []
state = training_process.initialize()
for i in range(NUM_ROUNDS):
federated_train_data = np.random.choice(tf_train_datasets, size=50, replace=False).tolist()
state, metrics = training_process.next(state, federated_train_data)
print(f'Train metrics:', metrics['train'])
train_losses.append(metrics['train']['loss'])
train_accs.append(metrics['train']['rating_accuracy'])
eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Final Eval:', eval_metrics['eval'])
eval_metrics = evaluation_computation(state.model, tf_test_datasets)
print('Final Test:', eval_metrics['eval'])
Summary
大致流程如下:
- 構建數據,將數據變為
tf.data.Dataset
sequence/list類型 - 構建Model,
tff
提供了兩種途徑:一個是直接tff.learning.from_keras_model
然后將tf.keras.Model
類實例轉為tff.learning.Model
,里面仍需要輸入input_spec
提供輸入類型參數;另外一個是需要繼承tff.learning.Model
類型,並重寫forward_pass
和report_local_outputs
函數,最后還需要提供federated_output_computation
屬性用來聯合計算,這種方式一般的流程是創建variables
實例划函數,然后像在Tensorflow
中那樣進行前向傳播和計算metrics
並且更改variables
相應的值,federated_output_computation
則需要tff.federated_computation
進行裝飾並且調用tff
的內聯函數。 - 構建聯合計算器,
tff
中的聯合計算器來自於tff.templates.IterativeProcess
類,有initialize
和next
兩種方法,實例化時需要傳入對應的Model、metrics
和優化器 - Training,這部分的代碼不需要額外的改動,copy別人的用就行。