High level api
custom model
定義模型需要繼承自tff.learning.Model
類,同時根據聯邦學習的流程需要定義好,模型訓練和輔助訓練變量。變量必須要使用tf
的類型,因為在實際環境中,聯邦學習是要部署到移動端的,調用的不一定是python。
MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
# total variable
def create_mnist_variables():
return MnistVariables(
weights=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name='weights',
trainable=True),
bias=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10)),
name='bias',
trainable=True),
num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))
有了模型的變量之后定義模型的前向傳播過程,注意在前向傳播過程中variable
的loss
等參數都進行了修改,同時定義Server
從Client
得到的數據。
def mnist_forward_pass(variables, batch):
y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(
tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
accuracy = tf.reduce_mean(
tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
variables.num_examples.assign_add(num_examples)
variables.loss_sum.assign_add(loss * num_examples)
variables.accuracy_sum.assign_add(accuracy * num_examples)
return loss, predictions
def get_local_mnist_metrics(variables):
return collections.OrderedDict(
num_examples=variables.num_examples,
loss=variables.loss_sum / variables.num_examples,
accuracy=variables.accuracy_sum / variables.num_examples)
在從client
得到數據后,server
要做的就是對數據進行整合。這里metrics
參數對應的是get_local_mnist_metrics
的所有結果。tff
是面向所有client
的,我理解的下面的操作都是從一個list dict中做加權平均,這里的metrics
參數沒有體現list。
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
return collections.OrderedDict(
num_examples=tff.federated_sum(metrics.num_examples),
loss=tff.federated_mean(metrics.loss, metrics.num_examples),
accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))
有了上面的模型參數
、前向傳播
、返回結果
和聚合結果
后,定義模型。這個定義模型,我理解的是對一個client
的模型,上述的模型參數
、前向傳播
、返回結果
和聚合結果
都是針對client
而言的,猜測tff
從一個client
到多個clients
實現了一些包裝,是這個過程沒有體現在代碼里。這里model
要實現定義模型參數,可訓練參數,不可訓練參數,前向傳播,本地變量和指定輸入數據類型,匯報結果和結果整合。其中tff.learning.BatchOutput
是tff
中封裝輸出結果的結構。
class MnistModel(tff.learning.Model):
def __init__(self):
self._variables = create_mnist_variables()
@property
def trainable_variables(self):
return [self._variables.weights, self._variables.bias]
@property
def non_trainable_variables(self):
return []
@property
def local_variables(self):
return [
self._variables.num_examples, self._variables.loss_sum,
self._variables.accuracy_sum
]
@property
def input_spec(self):
return collections.OrderedDict(
x=tf.TensorSpec([None, 784], tf.float32),
y=tf.TensorSpec([None, 1], tf.int32))
@tf.function
def forward_pass(self, batch, training=True):
del training
loss, predictions = mnist_forward_pass(self._variables, batch)
num_exmaples = tf.shape(batch['x'])[0]
return tff.learning.BatchOutput(
loss=loss, predictions=predictions, num_examples=num_exmaples)
@tf.function
def report_local_outputs(self):
return get_local_mnist_metrics(self._variables)
@property
def federated_output_computation(self):
return aggregate_mnist_metrics_across_clients
建立好模型之后進行模型訓練:
iterative_process = tff.learning.build_federated_averaging_process(
MnistModel,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))
FC core
這部分是如何從底層創建聯邦學習模型,首先要准備要了解一些概念。
數據類型
要顯示的指出數據存儲在C/S端,是全局唯一的還是多份拷貝的。注意的是使用print
函數輸出時,會將數據類型和值一起輸出,成為compat notation
。
-
與端無關的數據
- 張量類型,
tff.TensorType
。需要指定它的元素數據類型dtype
和形狀shape
- 列表類型,
tff.SequenceType
。其中的元素類型應當為TFF的tff.Type
或者是能轉換成tff.Type
的東西。print
打印列表類型數據時,會出現*
表示列表。 - 元組類型,
tff.NamedTupleType
。tff.NamedTupleType
接受三種類型的輸入:list
,tuple
和collections.OrderedDict
。print
打印元組類型以<>
作為標記。 - 函數類型,
tff.FunctionType
。tff.FunctionType
需要指定函數的輸入類型,且只能有一個輸入值,和一個函數返回值。
- 張量類型,
-
與端有關的數據類型
端有關的類型,主要完成兩件任務:
- 顯式地定義數據值應該存放在C端還是S端(
Placement
)tff.SERVER
ortff.CLIENTS
- 定義這個數據是否全局一致(
All equal
)
- 顯式地定義數據值應該存放在C端還是S端(
-
聯邦數據類型
tff.FederatedType
把上面提到的端無關類型包裝起來,並增加placement
和all_equal
兩個屬性。其中all_equal
可選,如果placement=tff.SERVER
,則默認為True
。使用print
函數打印變量時,花括號{}
表示非全局唯一,而沒有花括號就表示全局唯一。 -
變量聲明
定義變量類型后,聲明變量使用
tff.utils.create_variables(name, type)
,如OUR_TYPE = tff.TensorType(tf.int8, shape=[10]) var = tff.utils.create_variables('var_name', OUR_TYPE) print(OUR_TYPE) print(var)
函數定義
-
與端無關的函數
函數需要使用
tff.tf_computation(type)
來wrap up
函數,其中type
表示函數傳入形參x
的類型。@tff.tf_computation(tff.SequenceType(tf.int32)) def add_up_integeres(x): return x.reduce(np.int32(0), lambda x, y: x+y)
-
與端有關的函數
與端有關的函數不僅需要指定類型,還需要指定
placement
。裝飾器也變為tff.federated_computation
@tff.federated_computation(tff.FederatedType(tf.float32, tff.Clients)) def get_average_temperature(sensor_readings): return tff.federated_mean(sensor_readings) print(get_average_temperature.type_signature)
邏輯回歸實例
下面以邏輯回歸為例,整理數據准備到模型訓練的過程。跟上面的high-level api
的明顯區別是,從底層構建聯邦學習要明確定義好,函數的輸入輸出類型。
-
准備數據
數據存放是長度為10的list->每個數字user個batch這樣的格式。例如
federated_train_data[5]
表示就是數字都為5的batch list。import collections import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.enable_v2_behavior() tff.framework.set_default_executor(tff.framework.ReferenceExecutor()) mnist_train, mnist_test = tf.keras.datasets.mnist.load_data() NUM_EXAMPLES_PER_USER = 1000 BATCH_SIZE = 50 def get_data_for_digit(source, digit): output_sequence = [] all_samples = [i for i, d in enumerate(source[1]) if d == digit] for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE): batch_samples = all_samples[i:i + BATCH_SIZE] output_sequence.append({ 'x': np.array([source[0][i].flatten() / 255.0 for i in batch_samples], dtype=np.float32), 'y': np.array([source[1][i] for i in batch_samples], dtype=np.int32) }) return output_sequence federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)] federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
在整理好訓練數據后,定義每個batch的數據類型
BATCH_SPEC = collections.OrderedDict( x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32), y=tf.TensorSpec(shape=[None], dtype=tf.int32)) BATCH_TYPE = tff.to_type(BATCH_SPEC)
-
batch的前向傳播計算
在計算前向傳播的時候需要模型的參數,先定義好參數數據類型
MODEL_SPEC = collection.OrderedDict( weights = tf.TensorSpec(shape=[784, 10], dtype=tf.float32), bias = tf.TensorSpec(shape=[10], dtype=tf.float32)) MODEL_TYPEP = tff.to_type(MODEL_SPEC)
在封裝前向傳播的時候,使用了tf定義的前向傳播函數。
@tf.function def forward_pass(model, batch): predicted_y = tf.nn.softmax(tf.matmul(batch['x'], model['weights'])+model['bias']) return -tf.reduce_mean(tf.reduce_sum(tf.onehot(batch['y'], 10)*tf.math.log(predicted_y), axis=[1])) @tff.tf.computation(MODEL_TYPE, BATCH_TYPE) def batch_loss(model, batch): return forward_pass(model, batch) ''' # check initial_model = collections.OrderedDict( weights=np.zeros([784, 10], dtype=np.float32), bias=np.zeros([10], dtype=np.float32)) sample_batch = federated_train_data[5][-1] print(batch_loss(initial_model, sample_batch)) '''
-
batch optimization
batch優化的時候,仍然使用
tf
的優化器,返回的是一個參數修改后的model
# optimizer @tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32) def batch_train(initial_model, batch, learning_rate): # Define a group of model variables and set them to `initial_model`. Must # be defined outside the @tf.function. model_vars = collections.OrderedDict([ (name, tf.Variable(name=name, initial_value=value)) for name, value in initial_model.items() ]) optimizer = tf.keras.optimizers.SGD(learning_rate) @tf.function def _train_on_batch(model_vars, batch): # Perform one step of gradient descent using loss from `batch_loss`. with tf.GradientTape() as tape: loss = forward_pass(model_vars, batch) grads = tape.gradient(loss, model_vars) optimizer.apply_gradients( zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars))) return model_vars return _train_on_batch(model_vars, batch) print(str(batch_train.type_signature)) ''' # check model = initial_model losses = [] for _ in range(5): model = batch_train(model, sample_batch, 0.1) losses.append(batch_loss(model, sample_batch)) print("5 loops loss:", losses) '''
-
sequence batch optimization
上面的
batch_train
是針對一個batch而言的,tff
是要面對多個clients
的,調用tff
的api構建sequence batch訓練函數。LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE) @tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE) def local_train(initial_model, learning_rate, all_batches): # Mapping function to apply to each batch. @tff.federated_computation(MODEL_TYPE, BATCH_TYPE) def batch_fn(model, batch): return batch_train(model, batch, learning_rate) return tff.sequence_reduce(all_batches, initial_model, batch_fn) print(str(local_train.type_signature)) ''' # check locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5]) '''
tff.sequence_reduce
的參數是value, zero, op
,其中op
是一個輸入<U, T> -> U
的函數。 -
sequence_metric
同上面一樣,對sequence進行評估
# metric @tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE) def local_eval(model, all_batches): return tff.sequence_sum( tff.sequence_map( tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE), all_batches))
-
federated train
到目前為止,我們只是對某一種數字圖片
sequence batch
進行了訓練,下面進行對所有類型圖片進行評估和學習。SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER) CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS) @tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE) def federated_eval(model, data): return tff.federated_mean( tff.federated_map(local_eval, [tff.federated_broadcast(model), data])) ''' print('initial_model loss =', federated_eval(initial_model, federated_train_data)) print('locally_trained_model loss =', federated_eval(locally_trained_model, federated_train_data)) ''' SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER) @tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE, CLIENT_DATA_TYPE) def federated_train(model, learning_rate, data): return tff.federated_mean( tff.federated_map(local_train, [ tff.federated_broadcast(model), tff.federated_broadcast(learning_rate), data ])) ''' # implement model = initial_model learning_rate = 0.1 for round_num in range(5): # 每一輪,把大家的模型分別更新一下,取平均之后拿回來(做賦值替換) model = federated_train(model, learning_rate, federated_train_data) # 把學習率減小一點 learning_rate = learning_rate * 0.9 # 算個loss輸出一下 loss = federated_eval(model, federated_train_data) print('round {}, loss={}'.format(round_num, loss)) '''
最后
tff.federated_mean
對MODEL_TYPE
類型進行了平均。
例子總結
這個例子有點奇奇怪怪的,比如它的數據的分割方式,是client
只有一種類型圖片的batch,最后聯邦學習average的竟然是從不同數據學習到的model
,這和聯邦學習有點不一樣。之后改進的話
- 數據分割方式。應該改是
clients->batchs
- 訓練方式。應該是先定義
batch
再到一個client``batches
的完整訓練,然后在調用tff.federated_map
、tff.federated_mean
進行聚合。
編寫算法的時候也可以借鑒它的流程,首先完成對batch的優化,然后對client的優化,然后對所有的client的結果聚合。最近自己也會編一個類似的代碼(可能不是基於tff的),嘗試一下。