Tensorflow Federated(TFF)框架整理(上)


模型

架構假設

TFF基本假設:模型代碼必須可序列化為 TensorFlow 計算圖,以便支持資源有限的移動和嵌入式設備。TFF 中的序列化目前遵循 TF 1.0 模式,其中所有代碼必須在 TFF 控制的 tf.Graph 中構造。作為一個強類型環境,TFF 需要一些額外的元數據,例如您的模型輸入類型的規范。

聯合學習中至少有兩個聚合層:本地設備端聚合和跨設備聚合

  • 本地聚合:這種級別的聚合是指跨各個客戶端所擁有的多批次樣本進行的聚合。
  1. 該模型先構造 tf.Variable 來存放聚合
  2. TFF 在 Model 上按順序對客戶端數據的后續批次多次調用 forward_pass 方法,從而以副作用形式讓您更新存放各種聚合的變量。
  3. 最后,TFF 在您的模型上調用 report_local_outputs 方法,從而讓您的模型將它收集的所有匯總統計數據
  • 聯合聚合:這種級別的聚合是指跨系統中多個客戶端(設備)的聚合。
  1. 初始模型(以及訓練所需要的任何參數)由服務器分發給將參與一輪訓練或評估的客戶端子集。
  2. 對於每個客戶端,在本地數據批次流上反復調用您的模型代碼(無論是獨立還是並行),從而產生一組新的模型參數(訓練時),以及一組新的本地指標
  3. TFF 運行分布聚合協議來累積和聚合整個系統中的模型參數以及本地導出的指標。該邏輯使用 TFF 自有的聯合計算語言(不是在 TensorFlow 中),在模型的 federated_output_computation中以聲明式方式進行表達。

抽象接口

這種基本構造函數 + 元數據接口由接口 tff.learning.Model 表示:

  1. 構造函數、forward_passreport_local_outputs 方法應相應地構造模型變量、前向傳遞以及要報告的統計數據。如上所述,這些方法構造的 TensorFlow 必須可序列化。
  2. input_spec 屬性以及返回可訓練、不可訓練和本地變量的 3 個屬性表示元數據。TFF 使用此信息確定如何將模型的各個部分連接到聯合優化算法,同時定義內部類型簽名來幫助驗證構造的系統的正確性。

抽象接口 tff.learning.Model 會公開屬性 federated_output_computation

# variable namedtuple/container. weights bias are model variable, the rest are metrics
MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

# All state that your model will use must be captured as TensorFlow variables
# Initialize variables
def create_mnist_variable():
 return MnistVariables(
 weights=tf.Variable(
     lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
     name = 'weights', 
     trainable=True),
 bias=tf.Variable(
     lambda: tf.zeros(dtype=tf.float32, shape=(10, )),
     name='bias', 
     trainable=True),
 num_examples = tf.Variable(0.0, name='num_examples', trainable=False), 
 loss_sum = tf.Variable(0.0, name="loss_sum", trainable=False), 
 accuracy_sum = tf.Variable(0.0, name="accuracy_sum", trainable=False)
 )

# batch local training process (the same with the tensorflow)
def predict_on_batch(variables, x):
 return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)

# forward pass pices, which is reused by clients
def mnist_forward_pass(variables, batch):
 y = predict_on_batch(variables, batch['x'])
 predictions = tf.cast(tf.argmax(y, 1), tf.int32)

 flat_labels = tf.reshape(batch['y'], [-1])
 loss = -tf.reduce_mean(tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
 accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, flat_labels), tf.float32))
 num_examples = tf.cast(tf.size(batch['y']), tf.float32)
 variables.num_examples.assign_add(num_examples)
 variables.loss_sum.assign_add(loss * num_examples)
 variables.accuracy_sum.assign_add(accuracy * num_examples)
 return loss, predictions

def get_local_mnist_metrics(variables):
    return collections.OrderedDict(
        num_examples=variables.num_examples, 
        loss=variables.loss_sum / variables.num_examples,
        accuracy = variables.accuracy_sum / variables.num_examples
    )

# protocol of federated_output_computation
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
    return collections.OrderedDict(
        num_examples = tff.federated_sum(metrics.num_examples),  # tff code
        loss = tff.federated_mean(metrics.loss, metrics.num_examples),   # tff code
        accuracy = tff.federated_mean(metrics.accuracy, metrics.num_examples)  # tff code
    )

from typing import Callable, List
# Creating Model, inheirting from tff.learning.Model
class MnistModel(tff.learning.Model):
    def __init__(self) -> None:
        self._variables = create_mnist_variable()  # initialize variables
    
    @property
    def trainable_variables(self):
        return [self._variables.weights, self._variables.bias]

    @property
    def non_trainable_variables(self):
        return []
    
    @property
    def local_variables(self):
        return [self._variables.num_examples, self._variables.loss_sum, 
        self._variables.accuracy_sum]
    
    # describe what form of data it accepts
    @property
    def input_spec(self):
        return collections.OrderedDict(
            x=tf.TensorSpec([None, 784], tf.float32),
            y=tf.TensorSpec([None, 1], tf.int32)
        )
    
    # wrap by tf.function decorator to abide tf1.0 
    @tf.function
    def predict_on_batch(self, x, training=True):
        del training
        return predict_on_batch(self._variables, x)
    
    # complete forwar_pass method, which focus on local training
    @tf.function
    def forward_pass(self, batch, training=True):
        del training
        loss, predictions = mnist_forward_pass(self._variables, batch)
        num_examples = tf.shape(batch['x'])[0]
        return tff.learning.BatchOutput(loss=loss, predictions=predictions, 
        num_examples=num_examples)
    
    # complete report_local_outputs methods, which summarizes metrics
    @tf.function
    def report_local_outputs(self):
        return get_local_mnist_metrics(self._variables)

    # expose federated_output_computation method, which is a collaboration protocol
    @property
    def federated_output_computation(self): 
        return aggregate_mnist_metrics_across_clients
    

    @tf.function
    def report_local_unfinalized_metrics(
        self):
        """Creates an `OrderedDict` of metric names to unfinalized values."""
        return collections.OrderedDict(
            num_examples=[self._variables.num_examples],
            loss=[self._variables.loss_sum, self._variables.num_examples],
            accuracy=[self._variables.accuracy_sum, self._variables.num_examples])

    def metric_finalizers(
        self):
        """Creates an `OrderedDict` of metric names to finalizers."""
        return collections.OrderedDict(
            num_examples=tf.function(func=lambda x: x[0]),
            loss=tf.function(func=lambda x: x[0] / x[1]),
            accuracy=tf.function(func=lambda x: x[0] / x[1]))

Keras轉換器

TFF 需要的幾乎所有信息都可以通過調用 tf.keras 接口獲得,因此,如果您有一個 Keras 模型,則可以利用 tff.learning.from_keras_model 來構造 tff.learning.Model。相比於繼承tff.learning.Model並重寫forward_passreport_local_outputsfederated_output_computation來說,是一種高級的API。

'''
# template
def model_fn():
  keras_model = ...
  return tff.learning.from_keras_model(keras_model, sample_batch, loss=...)
'''
# return model instance
def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

聯合計算構建器

架構假設

運行聯合計算包括兩個不同階段:

  1. 編譯:首先,TFF 將聯合學習算法編譯成整個分布計算的抽象序列化表示形式。這時就會發生 TensorFlow 序列化,我們將編譯器產生的序列化表示形式稱為聯合計算。
  2. 執行:TFF 提供了各種方法來執行這些計算。目前,只有通過本地模擬才能支持執行。

由於 TFF 是函數式編程環境,該計算接受當前狀態以作為輸入,然后提供更新的狀態以作為輸出。為了完整定義有狀態的處理,需要指定初始狀態的來源(否則無法啟動該處理)。這通過幫助類 tff.templates.IterativeProcess 的定義獲取,它具有分別與初始化和迭代對應的 2 個屬性:initializenext

Aggregation 方法通過tff.learning.build_federated_averaging_process中的model_update_aggregation_factory參數來設置如何聚合client的模型參數。

mean = tff.aggregators.MeanFactory()
iterative_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=mean)

除了MeanFactory以外,TFF還提供了以下功能的Aggregator

  • 百分數匹配
  • zero過大的數值
  • clip
  • DP
  • lossy compression
  • secure aggregation

這個Aggregation方法還可進行組合。

可用構建器

TFF 為聯合訓練和評估提供了兩個生成聯合計算的構建器函數:

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,  # argument needs to be a constructor, not an already-constructed instance,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),  # local optimizer
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),  # global optimizer
	use_experimental_simulation_loop=True)

state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))

for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))

evaluation = tff.learning.build_federated_evaluation(MnistModel, use_experimental_simulation_loop=True)
test_metrics = evaluation(state.model, federated_test_data)

# evaluation every round.
state = tff.learning.state_with_new_model_weights(
    state,
    trainable_weights=[v.numpy() for v in keras_model.trainable_weights],
    non_trainable_weights=[
        v.numpy() for v in keras_model.non_trainable_weights
    ])  # Returns a ServerState with updated model weights.

def keras_evaluate(state, round_num):
    # Take our global model weights and push them back into a Keras model to
    # use its standard `.evaluate()` method.
    keras_model = load_model(batch_size=BATCH_SIZE)  # load pretrained model
    keras_model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[FlattenedCategoricalAccuracy()])
    state.model.assign_weights_to(keras_model)  # assign weights_to is a ModelWeight method and copy model to keras model or tff.learning.Model
    # tff.learning.assign_weights_to_keras_model(keras_model, state.model)
    loss, accuracy = keras_model.evaluate(example_dataset, steps=2, verbose=0)  # example_dataset is a specific dataset.
    print('\tEval: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))
for round_num in range(NUM_ROUNDS):
  print('Round {r}'.format(r=round_num))
  keras_evaluate(state, round_num)
  state, metrics = fed_avg.next(state, train_datasets)
  print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(
      l=metrics["train"]["loss"], a=metrics["train"]["accuracy"]))

數據集

架構假設

為了模擬聯合學習代碼的實際部署,通常需要編寫如下訓練循環:

trainer = tff.learning.build_federated_averaging_process(...)
state = trainer.initialize()
federated_training_data = ...

def sample(federate_data):
  return ...

while True:
  data_for_this_round = sample(federated_training_data)
  state, metrics = trainer.next(state, data_for_this_round)

為了實現這一點,在模擬中使用 TFF 時,將聯合數據視為 Python list,每一個元素來表示參與設備的本地 tf.data.Dataset

抽象接口

為了標准化模擬聯合數據集的處理,TFF 提供了一個抽象接口 tff.simulation.ClientData。通過該接口,用戶可以枚舉客戶端集,並構造包含特定客戶端的數據的 tf.data.Dataset。在 Eager 模式下,這些 tf.data.Dataset 可作為輸入直接提供給生成的聯合計算。

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()  # an instance of tff.simulation.ClientData class

len(emnist_train.client_ids)  # 3383 cliens

emnist_train.element_type_structure  # OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

example_dataset = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[0])  # create dataset for client by tff.simulation.ClientData method create_tf_dataset_for_client -> tf.data.Dataset
example_element = next(iter(example_dataset))
example_element['label'].numpy()

# one of the ways to feed federated data to TFF in a simulation is simply as a Python list, with each element of the list holding the data of an individual user, whether as a list or as a tf.data.Dataset

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]
federated_train_data = make_federated_data(emnist_train, sample_clients)
federated_test_data = make_federated_data(emnist_test, sample_clients)

Case

Reconstruction Model

In this case, we will use matrix factorization to get two matrix: user matrix and item matrix. Note that: user matrix should be private and stored locally, whereas, the item matrix is common.

we will use tff.leraning.reconstruction.Model to build our model and the federated computer builder should also be replaced with tff.learning.reconstruction.build_training_process method and tff.learning.construction.build_federated_evaluation.

Data preparation

# -> tf.data.Dataset*
def create_tf_datasets(ratings_df:pd.DataFrame, batch_size:int=1, max_examples_per_user: Optional[int]=None, max_clients: Optional[int] = None) -> List[tf.data.Dataset]:
    num_users = len(set(ratings_df.UserID))
    if max_clients is not None:
        num_users = min(num_users, max_clients)
    def rating_batch_map_fn(rating_batch):
        """Maps a rating batch to an OrderedDict with tensor values."""
        # Each example looks like: {x: movie_id, y: rating}.
        # We won't need the UserID since each client will only look at their own
        # data.
        return collections.OrderedDict([
            ("x", tf.cast(rating_batch[:, 1:2], tf.int64)),
            ("y", tf.cast(rating_batch[:, 2:3], tf.float32))
        ])
    
    tf_datasets = []
    for user_id in range(num_users):
        user_ratings_df = ratings_df[ratings_df.UserID == user_id]
        tf_dataset = tf.data.Dataset.from_tensor_slices(user_ratings_df)
        tf_dataset = tf_dataset.take(max_examples_per_user).shuffle(buffer_size=max_examples_per_user, seed=42).batch(batch_size).map(
            rating_batch_map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
        tf_datasets.append(tf_dataset)
    return tf_datasets

# train dataset and eval dataset
def split_tf_datasets(
    tf_datasets: List[tf.data.Dataset], 
    train_fraction: float = 0.8, 
    val_fraction: float = 0.1,
) -> Tuple[List[tf.data.Dataset], List[tf.data.Dataset], List[tf.data.Dataset]]:
    np.random.seed(42)
    np.random.shuffle(tf_datasets)

    train_idx = int(len(tf_datasets) * train_fraction)
    val_idx = int(len(tf_datasets) * (train_fraction + val_fraction))
    return (tf_datasets[:train_idx], tf_datasets[train_idx:val_idx],
            tf_datasets[val_idx:])

Build Model

There are also two methods to build model: converting from keras model by from_keras_model or inheriting from tff.learning.reconstruction.Model class. There, the first method is adopted

# user matrix
class UserEmbedding(tf.keras.layers.Layer):
    def __init__(self, num_latent_factors, **kwargs):
        super().__init__(**kwargs)
        self.num_latent_factors = num_latent_factors
    
    def build(self, input_shape):
        self.embeding = self.add_weight(
            shape=(1, self.num_latent_factors), 
            initializer='uniform', 
            dtype=tf.float32, 
            name='UserEmbeddingKernel')
        super().build(input_shape)
    
    def call(self, inputs):
        return self.embeding  # there are only one-dimension
    
    def compute_output_shape(self):
        return (1, self.num_latent_factors)
def get_matrix_factorization_model(
    num_items:int, 
    num_latent_factors: int) -> tff.learning.reconstruction.Model:
    global_layers = []
    local_layers = []

    # extract the item embedding
    item_input = tf.keras.layers.Input(shape=[1], name='Item')
    item_embedding_layer = tf.keras.layers.Embedding(
        num_items, 
        num_latent_factors, 
        name="ItemEmbedding")
    global_layers.append(item_embedding_layer)
    flat_item_vec = tf.keras.layers.Flatten(name="FlattenItem")(
        item_embedding_layer(item_input))
    
    # extract the user embedding
    user_embedding_layer = UserEmbedding(
        num_latent_factors, 
        name='UserEmbedding')
    local_layers.append(user_embedding_layer)
    flat_user_vec = user_embedding_layer(item_input)

    pred = tf.keras.layers.Dot(
        1, normalize=False, name='Dot')([flat_user_vec, flat_item_vec])
    
    input_spec = collections.OrderedDict(x=tf.TensorSpec(shape=[None, 1], dtype=tf.int64), 
                                         y=tf.TensorSpec(shape=[None, 1], dtype=tf.float32))
    model = tf.keras.Model(inputs=item_input, outputs=pred)
    return tff.learning.reconstruction.from_keras_model(
        keras_model=model, 
        global_layers=global_layers, 
        local_layers=local_layers, 
        input_spec=input_spec
    )

model_fn = functools.partial(
    get_matrix_factorization_model, 
    num_items=3706, 
    num_latent_factors=50
)
class RatingAccuracy(tf.keras.metrics.Mean):
  """Keras metric computing accuracy of reconstructed ratings."""

  def __init__(self,
               name: str = 'rating_accuracy',
               **kwargs):
    super().__init__(name=name, **kwargs)

  def update_state(self,
                   y_true: tf.Tensor,
                   y_pred: tf.Tensor,
                   sample_weight: Optional[tf.Tensor] = None):
    absolute_diffs = tf.abs(y_true - y_pred)
    # A [batch_size, 1] tf.bool tensor indicating correctness within the
    # threshold for each example in a batch. A 0.5 threshold corresponds
    # to correctness when predictions are rounded to the nearest whole
    # number.
    example_accuracies = tf.less_equal(absolute_diffs, 0.5)
    super().update_state(example_accuracies, sample_weight=sample_weight)
training_process = tff.learning.reconstruction.build_training_process(
    model_fn=model_fn,
    loss_fn=loss_fn,
    metrics_fn=metrics_fn,
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(1.0),
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.5),
    reconstruction_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.1))
    
evaluation_computation = tff.learning.reconstruction.build_federated_evaluation(
    model_fn,
    loss_fn=loss_fn,
    metrics_fn=metrics_fn,
    reconstruction_optimizer_fn=functools.partial(
            tf.keras.optimizers.SGD, 0.1))

Training

state = training_process.initialize()
print(state.model)
print('Item variables shape:', state.model.trainable[0].shape)

eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Initial Eval:', eval_metrics['eval'])

# select clients
federated_train_data = np.random.choice(tf_train_datasets, size=50, replace=False).tolist()
state, metrics = training_process.next(state, federated_train_data)
print(f'Train metrics:', metrics['train'])


NUM_ROUNDS = 20
train_losses = []
train_accs = [] 

state = training_process.initialize()

for i in range(NUM_ROUNDS):
    federated_train_data = np.random.choice(tf_train_datasets, size=50, replace=False).tolist()
    state, metrics = training_process.next(state, federated_train_data)
    print(f'Train metrics:', metrics['train'])
    train_losses.append(metrics['train']['loss'])
    train_accs.append(metrics['train']['rating_accuracy'])

eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Final Eval:', eval_metrics['eval'])
eval_metrics = evaluation_computation(state.model, tf_test_datasets)
print('Final Test:', eval_metrics['eval'])

Summary

大致流程如下:

  1. 構建數據,將數據變為 tf.data.Dataset sequence/list類型
  2. 構建Model,tff提供了兩種途徑:一個是直接tff.learning.from_keras_model然后將tf.keras.Model類實例轉為tff.learning.Model,里面仍需要輸入input_spec提供輸入類型參數;另外一個是需要繼承tff.learning.Model類型,並重寫forward_passreport_local_outputs函數,最后還需要提供federated_output_computation屬性用來聯合計算,這種方式一般的流程是創建variables實例划函數,然后像在Tensorflow中那樣進行前向傳播和計算metrics並且更改variables相應的值,federated_output_computation則需要tff.federated_computation進行裝飾並且調用tff的內聯函數。
  3. 構建聯合計算器,tff中的聯合計算器來自於tff.templates.IterativeProcess類,有initializenext兩種方法,實例化時需要傳入對應的Model、metrics和優化器
  4. Training,這部分的代碼不需要額外的改動,copy別人的用就行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM