tensorflow聯邦學習框架整理


High level api

custom model

定義模型需要繼承自tff.learning.Model類,同時根據聯邦學習的流程需要定義好,模型訓練和輔助訓練變量。變量必須要使用tf的類型,因為在實際環境中,聯邦學習是要部署到移動端的,調用的不一定是python。

MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')  

# total variable
def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

有了模型的變量之后定義模型的前向傳播過程,注意在前向傳播過程中variableloss等參數都進行了修改,同時定義ServerClient得到的數據。

def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions
def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

在從client得到數據后,server要做的就是對數據進行整合。這里metrics參數對應的是get_local_mnist_metrics的所有結果。tff是面向所有client的,我理解的下面的操作都是從一個list dict中做加權平均,這里的metrics參數沒有體現list。

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

有了上面的模型參數前向傳播返回結果聚合結果后,定義模型。這個定義模型,我理解的是對一個client的模型,上述的模型參數前向傳播返回結果聚合結果都是針對client而言的,猜測tff從一個client到多個clients實現了一些包裝,是這個過程沒有體現在代碼里。這里model要實現定義模型參數,可訓練參數,不可訓練參數,前向傳播,本地變量和指定輸入數據類型,匯報結果和結果整合。其中tff.learning.BatchOutputtff中封裝輸出結果的結構。

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

建立好模型之后進行模型訓練:

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))

state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))

FC core

這部分是如何從底層創建聯邦學習模型,首先要准備要了解一些概念。

數據類型

要顯示的指出數據存儲在C/S端,是全局唯一的還是多份拷貝的。注意的是使用print函數輸出時,會將數據類型和值一起輸出,成為compat notation

  • 與端無關的數據

    • 張量類型,tff.TensorType。需要指定它的元素數據類型dtype和形狀shape
    • 列表類型,tff.SequenceType。其中的元素類型應當為TFF的tff.Type或者是能轉換成tff.Type的東西。print打印列表類型數據時,會出現*表示列表。
    • 元組類型,tff.NamedTupleTypetff.NamedTupleType接受三種類型的輸入:listtuplecollections.OrderedDictprint打印元組類型以<>作為標記。
    • 函數類型,tff.FunctionTypetff.FunctionType需要指定函數的輸入類型,且只能有一個輸入值,和一個函數返回值。
  • 與端有關的數據類型

    端有關的類型,主要完成兩件任務:

    • 顯式地定義數據值應該存放在C端還是S端(Placementtff.SERVERortff.CLIENTS
    • 定義這個數據是否全局一致(All equal
  • 聯邦數據類型

    tff.FederatedType把上面提到的端無關類型包裝起來,並增加placementall_equal兩個屬性。其中all_equal可選,如果placement=tff.SERVER,則默認為True。使用print函數打印變量時,花括號{}表示非全局唯一,而沒有花括號就表示全局唯一。

  • 變量聲明

    定義變量類型后,聲明變量使用tff.utils.create_variables(name, type),如

    OUR_TYPE = tff.TensorType(tf.int8, shape=[10])
    var = tff.utils.create_variables('var_name', OUR_TYPE)
    print(OUR_TYPE)
    print(var)
    

函數定義

  • 與端無關的函數

    函數需要使用tff.tf_computation(type)wrap up函數,其中type表示函數傳入形參x的類型。

    @tff.tf_computation(tff.SequenceType(tf.int32))
    def add_up_integeres(x):
        return x.reduce(np.int32(0), lambda x, y: x+y)
    
  • 與端有關的函數

    與端有關的函數不僅需要指定類型,還需要指定placement。裝飾器也變為tff.federated_computation

    @tff.federated_computation(tff.FederatedType(tf.float32, tff.Clients))
    def get_average_temperature(sensor_readings):
        return tff.federated_mean(sensor_readings)
    
    print(get_average_temperature.type_signature)
    

邏輯回歸實例

下面以邏輯回歸為例,整理數據准備到模型訓練的過程。跟上面的high-level api的明顯區別是,從底層構建聯邦學習要明確定義好,函數的輸入輸出類型。

  • 准備數據

    數據存放是長度為10的list->每個數字user個batch這樣的格式。例如federated_train_data[5]表示就是數字都為5的batch list。

    import collections
    
    import numpy as np
    import tensorflow as tf
    import tensorflow_federated as tff
    
    tf.compat.v1.enable_v2_behavior()
    tff.framework.set_default_executor(tff.framework.ReferenceExecutor())
    
    mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
    
    NUM_EXAMPLES_PER_USER = 1000
    BATCH_SIZE = 50
    
    
    def get_data_for_digit(source, digit):
        output_sequence = []
        all_samples = [i for i, d in enumerate(source[1]) if d == digit]
        for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE):
            batch_samples = all_samples[i:i + BATCH_SIZE]
            output_sequence.append({
                'x':
                    np.array([source[0][i].flatten() / 255.0 for i in batch_samples],
                             dtype=np.float32),
                'y':
                    np.array([source[1][i] for i in batch_samples], dtype=np.int32)
            })
        return output_sequence
    
    
    federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)]
    
    federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
    

    在整理好訓練數據后,定義每個batch的數據類型

    BATCH_SPEC = collections.OrderedDict(
        x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
        y=tf.TensorSpec(shape=[None], dtype=tf.int32))
    BATCH_TYPE = tff.to_type(BATCH_SPEC)
    
  • batch的前向傳播計算

    在計算前向傳播的時候需要模型的參數,先定義好參數數據類型

    MODEL_SPEC = collection.OrderedDict(
        weights = tf.TensorSpec(shape=[784, 10], dtype=tf.float32), 
    	bias = tf.TensorSpec(shape=[10], dtype=tf.float32))
    MODEL_TYPEP = tff.to_type(MODEL_SPEC)
    

    在封裝前向傳播的時候,使用了tf定義的前向傳播函數。

    @tf.function
    def forward_pass(model, batch):
        predicted_y = tf.nn.softmax(tf.matmul(batch['x'], model['weights'])+model['bias'])
        return -tf.reduce_mean(tf.reduce_sum(tf.onehot(batch['y'], 10)*tf.math.log(predicted_y), axis=[1]))
    
    @tff.tf.computation(MODEL_TYPE, BATCH_TYPE)
    def batch_loss(model, batch):
        return forward_pass(model, batch)
    '''
    # check
    initial_model = collections.OrderedDict(
        weights=np.zeros([784, 10], dtype=np.float32),
        bias=np.zeros([10], dtype=np.float32))
    
    sample_batch = federated_train_data[5][-1]
    
    print(batch_loss(initial_model, sample_batch))
    '''
    
  • batch optimization

    batch優化的時候,仍然使用tf的優化器,返回的是一個參數修改后的model

    # optimizer
    
    @tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32)
    def batch_train(initial_model, batch, learning_rate):
        # Define a group of model variables and set them to `initial_model`. Must
        # be defined outside the @tf.function.
        model_vars = collections.OrderedDict([
          (name, tf.Variable(name=name, initial_value=value))
          for name, value in initial_model.items()
        ])
        optimizer = tf.keras.optimizers.SGD(learning_rate)
    
        @tf.function
        def _train_on_batch(model_vars, batch):
            # Perform one step of gradient descent using loss from `batch_loss`.
            with tf.GradientTape() as tape:
                loss = forward_pass(model_vars, batch)
            grads = tape.gradient(loss, model_vars)
            optimizer.apply_gradients(
                zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars)))
            return model_vars
    
        return _train_on_batch(model_vars, batch)
    
    print(str(batch_train.type_signature))
    
    '''
    # check
    model = initial_model
    losses = []
    for _ in range(5):
        model = batch_train(model, sample_batch, 0.1)
        losses.append(batch_loss(model, sample_batch))
    
    print("5 loops loss:", losses)
    '''
    
  • sequence batch optimization

    上面的batch_train是針對一個batch而言的,tff是要面對多個clients的,調用tff的api構建sequence batch訓練函數。

    LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE)
    
    @tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE)
    def local_train(initial_model, learning_rate, all_batches):
    
        # Mapping function to apply to each batch.
        @tff.federated_computation(MODEL_TYPE, BATCH_TYPE)
        def batch_fn(model, batch):
            return batch_train(model, batch, learning_rate)
    
        return tff.sequence_reduce(all_batches, initial_model, batch_fn)
    
    print(str(local_train.type_signature))
    '''
    # check
    locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5])
    '''
    

    tff.sequence_reduce的參數是value, zero, op,其中op是一個輸入<U, T> -> U的函數。

  • sequence_metric

    同上面一樣,對sequence進行評估

    # metric
    @tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE)
    def local_eval(model, all_batches):
        return tff.sequence_sum(
            tff.sequence_map(
                tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE),
                all_batches))
    
  • federated train

    到目前為止,我們只是對某一種數字圖片sequence batch進行了訓練,下面進行對所有類型圖片進行評估和學習。

    SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER)
    CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS)
    
    @tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE)
    def federated_eval(model, data):
        return tff.federated_mean(
            tff.federated_map(local_eval, [tff.federated_broadcast(model), data]))
    
    '''
    print('initial_model loss =', federated_eval(initial_model,
                                                 federated_train_data))
    print('locally_trained_model loss =',
          federated_eval(locally_trained_model, federated_train_data))
    '''
    
    SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER)
    
    @tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE,
                               CLIENT_DATA_TYPE)
    def federated_train(model, learning_rate, data):
        return tff.federated_mean(
            tff.federated_map(local_train, [
                tff.federated_broadcast(model),
                tff.federated_broadcast(learning_rate), data
            ]))
    '''
    # implement
    model = initial_model
    learning_rate = 0.1
    for round_num in range(5):
        # 每一輪,把大家的模型分別更新一下,取平均之后拿回來(做賦值替換)
        model = federated_train(model, learning_rate, federated_train_data)
        # 把學習率減小一點
        learning_rate = learning_rate * 0.9
        # 算個loss輸出一下
        loss = federated_eval(model, federated_train_data)
        print('round {}, loss={}'.format(round_num, loss))
    '''
    

    最后tff.federated_meanMODEL_TYPE類型進行了平均。

例子總結

這個例子有點奇奇怪怪的,比如它的數據的分割方式,是client只有一種類型圖片的batch,最后聯邦學習average的竟然是從不同數據學習到的model,這和聯邦學習有點不一樣。之后改進的話

  • 數據分割方式。應該改是clients->batchs
  • 訓練方式。應該是先定義batch再到一個client``batches的完整訓練,然后在調用tff.federated_maptff.federated_mean進行聚合。

編寫算法的時候也可以借鑒它的流程,首先完成對batch的優化,然后對client的優化,然后對所有的client的結果聚合。最近自己也會編一個類似的代碼(可能不是基於tff的),嘗試一下。

參考資料

  1. Tensorflow Federated Tutorial: Federated Learning for Image Classification
  2. Tensorflow Federated Tutorial: Custom Federated Algorithms
  3. Zing22, Tensorflow Federated Framework 谷歌聯邦學習框架:自底向上簡明入門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM