模型
架構假設
TFF基本假設:模型代碼必須可序列化為 TensorFlow 計算圖,以便支持資源有限的移動和嵌入式設備。TFF 中的序列化目前遵循 TF 1.0 模式,其中所有代碼必須在 TFF 控制的 tf.Graph 中構造。作為一個強類型環境,TFF 需要一些額外的元數據,例如您的模型輸入類型的規范。
聯合學習中至少有兩個聚合層:本地設備端聚合和跨設備聚合
- 本地聚合:這種級別的聚合是指跨各個客戶端所擁有的多批次樣本進行的聚合。
- 該模型先構造
tf.Variable來存放聚合- TFF 在
Model上按順序對客戶端數據的后續批次多次調用forward_pass方法,從而以副作用形式讓您更新存放各種聚合的變量。- 最后,TFF 在您的模型上調用
report_local_outputs方法,從而讓您的模型將它收集的所有匯總統計數據
- 聯合聚合:這種級別的聚合是指跨系統中多個客戶端(設備)的聚合。
- 初始模型(以及訓練所需要的任何參數)由服務器分發給將參與一輪訓練或評估的客戶端子集。
- 對於每個客戶端,在本地數據批次流上反復調用您的模型代碼(無論是獨立還是並行),從而產生一組新的模型參數(訓練時),以及一組新的本地指標
- TFF 運行分布聚合協議來累積和聚合整個系統中的模型參數以及本地導出的指標。該邏輯使用 TFF 自有的
聯合計算語言(不是在 TensorFlow 中),在模型的federated_output_computation中以聲明式方式進行表達。
抽象接口
這種基本構造函數 + 元數據接口由接口 tff.learning.Model 表示:
- 構造函數、
forward_pass和report_local_outputs方法應相應地構造模型變量、前向傳遞以及要報告的統計數據。如上所述,這些方法構造的 TensorFlow 必須可序列化。input_spec屬性以及返回可訓練、不可訓練和本地變量的 3 個屬性表示元數據。TFF 使用此信息確定如何將模型的各個部分連接到聯合優化算法,同時定義內部類型簽名來幫助驗證構造的系統的正確性。
抽象接口 tff.learning.Model 會公開屬性 federated_output_computation。
# variable namedtuple/container. weights bias are model variable, the rest are metrics
MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
# All state that your model will use must be captured as TensorFlow variables
# Initialize variables
def create_mnist_variable():
return MnistVariables(
weights=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name = 'weights',
trainable=True),
bias=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10, )),
name='bias',
trainable=True),
num_examples = tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum = tf.Variable(0.0, name="loss_sum", trainable=False),
accuracy_sum = tf.Variable(0.0, name="accuracy_sum", trainable=False)
)
# batch local training process (the same with the tensorflow)
def predict_on_batch(variables, x):
return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)
# forward pass pices, which is reused by clients
def mnist_forward_pass(variables, batch):
y = predict_on_batch(variables, batch['x'])
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
variables.num_examples.assign_add(num_examples)
variables.loss_sum.assign_add(loss * num_examples)
variables.accuracy_sum.assign_add(accuracy * num_examples)
return loss, predictions
def get_local_mnist_metrics(variables):
return collections.OrderedDict(
num_examples=variables.num_examples,
loss=variables.loss_sum / variables.num_examples,
accuracy = variables.accuracy_sum / variables.num_examples
)
# protocol of federated_output_computation
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
return collections.OrderedDict(
num_examples = tff.federated_sum(metrics.num_examples), # tff code
loss = tff.federated_mean(metrics.loss, metrics.num_examples), # tff code
accuracy = tff.federated_mean(metrics.accuracy, metrics.num_examples) # tff code
)
from typing import Callable, List
# Creating Model, inheirting from tff.learning.Model
class MnistModel(tff.learning.Model):
def __init__(self) -> None:
self._variables = create_mnist_variable() # initialize variables
@property
def trainable_variables(self):
return [self._variables.weights, self._variables.bias]
@property
def non_trainable_variables(self):
return []
@property
def local_variables(self):
return [self._variables.num_examples, self._variables.loss_sum,
self._variables.accuracy_sum]
# describe what form of data it accepts
@property
def input_spec(self):
return collections.OrderedDict(
x=tf.TensorSpec([None, 784], tf.float32),
y=tf.TensorSpec([None, 1], tf.int32)
)
# wrap by tf.function decorator to abide tf1.0
@tf.function
def predict_on_batch(self, x, training=True):
del training
return predict_on_batch(self._variables, x)
# complete forwar_pass method, which focus on local training
@tf.function
def forward_pass(self, batch, training=True):
del training
loss, predictions = mnist_forward_pass(self._variables, batch)
num_examples = tf.shape(batch['x'])[0]
return tff.learning.BatchOutput(loss=loss, predictions=predictions,
num_examples=num_examples)
# complete report_local_outputs methods, which summarizes metrics
@tf.function
def report_local_outputs(self):
return get_local_mnist_metrics(self._variables)
# expose federated_output_computation method, which is a collaboration protocol
@property
def federated_output_computation(self):
return aggregate_mnist_metrics_across_clients
@tf.function
def report_local_unfinalized_metrics(
self):
"""Creates an `OrderedDict` of metric names to unfinalized values."""
return collections.OrderedDict(
num_examples=[self._variables.num_examples],
loss=[self._variables.loss_sum, self._variables.num_examples],
accuracy=[self._variables.accuracy_sum, self._variables.num_examples])
def metric_finalizers(
self):
"""Creates an `OrderedDict` of metric names to finalizers."""
return collections.OrderedDict(
num_examples=tf.function(func=lambda x: x[0]),
loss=tf.function(func=lambda x: x[0] / x[1]),
accuracy=tf.function(func=lambda x: x[0] / x[1]))
Keras轉換器
TFF 需要的幾乎所有信息都可以通過調用 tf.keras 接口獲得,因此,如果您有一個 Keras 模型,則可以利用 tff.learning.from_keras_model 來構造 tff.learning.Model。相比於繼承tff.learning.Model並重寫forward_pass,report_local_outputs和federated_output_computation來說,是一種高級的API。
'''
# template
def model_fn():
keras_model = ...
return tff.learning.from_keras_model(keras_model, sample_batch, loss=...)
'''
# return model instance
def create_keras_model():
return tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
def model_fn():
# We _must_ create a new model here, and _not_ capture it from an external
# scope. TFF will call this within different graph contexts.
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=preprocessed_example_dataset.element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
聯合計算構建器
架構假設
運行聯合計算包括兩個不同階段:
- 編譯:首先,TFF 將聯合學習算法編譯成整個分布計算的抽象序列化表示形式。這時就會發生 TensorFlow 序列化,我們將編譯器產生的序列化表示形式稱為聯合計算。
- 執行:TFF 提供了各種方法來執行這些計算。目前,只有通過本地模擬才能支持執行。
由於 TFF 是函數式編程環境,該計算接受當前狀態以作為輸入,然后提供更新的狀態以作為輸出。為了完整定義有狀態的處理,需要指定初始狀態的來源(否則無法啟動該處理)。這通過幫助類 tff.templates.IterativeProcess 的定義獲取,它具有分別與初始化和迭代對應的 2 個屬性:initialize 和 next。
Aggregation 方法通過tff.learning.build_federated_averaging_process中的model_update_aggregation_factory參數來設置如何聚合client的模型參數。
mean = tff.aggregators.MeanFactory()
iterative_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=mean)
除了MeanFactory以外,TFF還提供了以下功能的Aggregator
- 百分數匹配
- zero過大的數值
- clip
- DP
- lossy compression
- secure aggregation
這個Aggregation方法還可進行組合。
可用構建器
TFF 為聯合訓練和評估提供了兩個生成聯合計算的構建器函數:
tff.learning.build_federated_averaging_process使用一個模型函數和一個客戶端優化器,並返回一個有狀態的tff.templates.IterativeProcess。tff.learning.build_federated_evaluation使用一個 model 函數,並為模型的聯合評估返回一個單一的聯合計算,因為評估沒有狀態。
iterative_process = tff.learning.build_federated_averaging_process(
model_fn, # argument needs to be a constructor, not an already-constructed instance,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), # local optimizer
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0), # global optimizer
use_experimental_simulation_loop=True)
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))
for round_num in range(2, NUM_ROUNDS):
state, metrics = iterative_process.next(state, federated_train_data)
print('round {:2d}, metrics={}'.format(round_num, metrics))
evaluation = tff.learning.build_federated_evaluation(MnistModel, use_experimental_simulation_loop=True)
test_metrics = evaluation(state.model, federated_test_data)
# evaluation every round.
state = tff.learning.state_with_new_model_weights(
state,
trainable_weights=[v.numpy() for v in keras_model.trainable_weights],
non_trainable_weights=[
v.numpy() for v in keras_model.non_trainable_weights
]) # Returns a ServerState with updated model weights.
def keras_evaluate(state, round_num):
# Take our global model weights and push them back into a Keras model to
# use its standard `.evaluate()` method.
keras_model = load_model(batch_size=BATCH_SIZE) # load pretrained model
keras_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[FlattenedCategoricalAccuracy()])
state.model.assign_weights_to(keras_model) # assign weights_to is a ModelWeight method and copy model to keras model or tff.learning.Model
# tff.learning.assign_weights_to_keras_model(keras_model, state.model)
loss, accuracy = keras_model.evaluate(example_dataset, steps=2, verbose=0) # example_dataset is a specific dataset.
print('\tEval: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))
for round_num in range(NUM_ROUNDS):
print('Round {r}'.format(r=round_num))
keras_evaluate(state, round_num)
state, metrics = fed_avg.next(state, train_datasets)
print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(
l=metrics["train"]["loss"], a=metrics["train"]["accuracy"]))
數據集
架構假設
為了模擬聯合學習代碼的實際部署,通常需要編寫如下訓練循環:
trainer = tff.learning.build_federated_averaging_process(...)
state = trainer.initialize()
federated_training_data = ...
def sample(federate_data):
return ...
while True:
data_for_this_round = sample(federated_training_data)
state, metrics = trainer.next(state, data_for_this_round)
為了實現這一點,在模擬中使用 TFF 時,將聯合數據視為 Python list,每一個元素來表示參與設備的本地 tf.data.Dataset。
抽象接口
為了標准化模擬聯合數據集的處理,TFF 提供了一個抽象接口 tff.simulation.ClientData。通過該接口,用戶可以枚舉客戶端集,並構造包含特定客戶端的數據的 tf.data.Dataset。在 Eager 模式下,這些 tf.data.Dataset 可作為輸入直接提供給生成的聯合計算。
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data() # an instance of tff.simulation.ClientData class
len(emnist_train.client_ids) # 3383 cliens
emnist_train.element_type_structure # OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[0]) # create dataset for client by tff.simulation.ClientData method create_tf_dataset_for_client -> tf.data.Dataset
example_element = next(iter(example_dataset))
example_element['label'].numpy()
# one of the ways to feed federated data to TFF in a simulation is simply as a Python list, with each element of the list holding the data of an individual user, whether as a list or as a tf.data.Dataset
NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10
def preprocess(dataset):
def batch_format_fn(element):
"""Flatten a batch `pixels` and return the features as an `OrderedDict`."""
return collections.OrderedDict(
x=tf.reshape(element['pixels'], [-1, 784]),
y=tf.reshape(element['label'], [-1, 1]))
return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)
def make_federated_data(client_data, client_ids):
return [
preprocess(client_data.create_tf_dataset_for_client(x))
for x in client_ids
]
federated_train_data = make_federated_data(emnist_train, sample_clients)
federated_test_data = make_federated_data(emnist_test, sample_clients)
Case
Reconstruction Model
In this case, we will use matrix factorization to get two matrix: user matrix and item matrix. Note that: user matrix should be private and stored locally, whereas, the item matrix is common.
we will use tff.leraning.reconstruction.Model to build our model and the federated computer builder should also be replaced with tff.learning.reconstruction.build_training_process method and tff.learning.construction.build_federated_evaluation.
Data preparation
# -> tf.data.Dataset*
def create_tf_datasets(ratings_df:pd.DataFrame, batch_size:int=1, max_examples_per_user: Optional[int]=None, max_clients: Optional[int] = None) -> List[tf.data.Dataset]:
num_users = len(set(ratings_df.UserID))
if max_clients is not None:
num_users = min(num_users, max_clients)
def rating_batch_map_fn(rating_batch):
"""Maps a rating batch to an OrderedDict with tensor values."""
# Each example looks like: {x: movie_id, y: rating}.
# We won't need the UserID since each client will only look at their own
# data.
return collections.OrderedDict([
("x", tf.cast(rating_batch[:, 1:2], tf.int64)),
("y", tf.cast(rating_batch[:, 2:3], tf.float32))
])
tf_datasets = []
for user_id in range(num_users):
user_ratings_df = ratings_df[ratings_df.UserID == user_id]
tf_dataset = tf.data.Dataset.from_tensor_slices(user_ratings_df)
tf_dataset = tf_dataset.take(max_examples_per_user).shuffle(buffer_size=max_examples_per_user, seed=42).batch(batch_size).map(
rating_batch_map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_datasets.append(tf_dataset)
return tf_datasets
# train dataset and eval dataset
def split_tf_datasets(
tf_datasets: List[tf.data.Dataset],
train_fraction: float = 0.8,
val_fraction: float = 0.1,
) -> Tuple[List[tf.data.Dataset], List[tf.data.Dataset], List[tf.data.Dataset]]:
np.random.seed(42)
np.random.shuffle(tf_datasets)
train_idx = int(len(tf_datasets) * train_fraction)
val_idx = int(len(tf_datasets) * (train_fraction + val_fraction))
return (tf_datasets[:train_idx], tf_datasets[train_idx:val_idx],
tf_datasets[val_idx:])
Build Model
There are also two methods to build model: converting from keras model by from_keras_model or inheriting from tff.learning.reconstruction.Model class. There, the first method is adopted
# user matrix
class UserEmbedding(tf.keras.layers.Layer):
def __init__(self, num_latent_factors, **kwargs):
super().__init__(**kwargs)
self.num_latent_factors = num_latent_factors
def build(self, input_shape):
self.embeding = self.add_weight(
shape=(1, self.num_latent_factors),
initializer='uniform',
dtype=tf.float32,
name='UserEmbeddingKernel')
super().build(input_shape)
def call(self, inputs):
return self.embeding # there are only one-dimension
def compute_output_shape(self):
return (1, self.num_latent_factors)
def get_matrix_factorization_model(
num_items:int,
num_latent_factors: int) -> tff.learning.reconstruction.Model:
global_layers = []
local_layers = []
# extract the item embedding
item_input = tf.keras.layers.Input(shape=[1], name='Item')
item_embedding_layer = tf.keras.layers.Embedding(
num_items,
num_latent_factors,
name="ItemEmbedding")
global_layers.append(item_embedding_layer)
flat_item_vec = tf.keras.layers.Flatten(name="FlattenItem")(
item_embedding_layer(item_input))
# extract the user embedding
user_embedding_layer = UserEmbedding(
num_latent_factors,
name='UserEmbedding')
local_layers.append(user_embedding_layer)
flat_user_vec = user_embedding_layer(item_input)
pred = tf.keras.layers.Dot(
1, normalize=False, name='Dot')([flat_user_vec, flat_item_vec])
input_spec = collections.OrderedDict(x=tf.TensorSpec(shape=[None, 1], dtype=tf.int64),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.float32))
model = tf.keras.Model(inputs=item_input, outputs=pred)
return tff.learning.reconstruction.from_keras_model(
keras_model=model,
global_layers=global_layers,
local_layers=local_layers,
input_spec=input_spec
)
model_fn = functools.partial(
get_matrix_factorization_model,
num_items=3706,
num_latent_factors=50
)
class RatingAccuracy(tf.keras.metrics.Mean):
"""Keras metric computing accuracy of reconstructed ratings."""
def __init__(self,
name: str = 'rating_accuracy',
**kwargs):
super().__init__(name=name, **kwargs)
def update_state(self,
y_true: tf.Tensor,
y_pred: tf.Tensor,
sample_weight: Optional[tf.Tensor] = None):
absolute_diffs = tf.abs(y_true - y_pred)
# A [batch_size, 1] tf.bool tensor indicating correctness within the
# threshold for each example in a batch. A 0.5 threshold corresponds
# to correctness when predictions are rounded to the nearest whole
# number.
example_accuracies = tf.less_equal(absolute_diffs, 0.5)
super().update_state(example_accuracies, sample_weight=sample_weight)
training_process = tff.learning.reconstruction.build_training_process(
model_fn=model_fn,
loss_fn=loss_fn,
metrics_fn=metrics_fn,
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(1.0),
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.5),
reconstruction_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.1))
evaluation_computation = tff.learning.reconstruction.build_federated_evaluation(
model_fn,
loss_fn=loss_fn,
metrics_fn=metrics_fn,
reconstruction_optimizer_fn=functools.partial(
tf.keras.optimizers.SGD, 0.1))
Training
state = training_process.initialize()
print(state.model)
print('Item variables shape:', state.model.trainable[0].shape)
eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Initial Eval:', eval_metrics['eval'])
# select clients
federated_train_data = np.random.choice(tf_train_datasets, size=50, replace=False).tolist()
state, metrics = training_process.next(state, federated_train_data)
print(f'Train metrics:', metrics['train'])
NUM_ROUNDS = 20
train_losses = []
train_accs = []
state = training_process.initialize()
for i in range(NUM_ROUNDS):
federated_train_data = np.random.choice(tf_train_datasets, size=50, replace=False).tolist()
state, metrics = training_process.next(state, federated_train_data)
print(f'Train metrics:', metrics['train'])
train_losses.append(metrics['train']['loss'])
train_accs.append(metrics['train']['rating_accuracy'])
eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Final Eval:', eval_metrics['eval'])
eval_metrics = evaluation_computation(state.model, tf_test_datasets)
print('Final Test:', eval_metrics['eval'])
Summary
大致流程如下:
- 構建數據,將數據變為
tf.data.Datasetsequence/list類型 - 構建Model,
tff提供了兩種途徑:一個是直接tff.learning.from_keras_model然后將tf.keras.Model類實例轉為tff.learning.Model,里面仍需要輸入input_spec提供輸入類型參數;另外一個是需要繼承tff.learning.Model類型,並重寫forward_pass和report_local_outputs函數,最后還需要提供federated_output_computation屬性用來聯合計算,這種方式一般的流程是創建variables實例划函數,然后像在Tensorflow中那樣進行前向傳播和計算metrics並且更改variables相應的值,federated_output_computation則需要tff.federated_computation進行裝飾並且調用tff的內聯函數。 - 構建聯合計算器,
tff中的聯合計算器來自於tff.templates.IterativeProcess類,有initialize和next兩種方法,實例化時需要傳入對應的Model、metrics和優化器 - Training,這部分的代碼不需要額外的改動,copy別人的用就行。
