由於隨着神經網絡層數的增多,需要訓練的參數也會增多,隨之而來需要的數據集就會很大,這樣會造成需要更大的運算資源,而且還要消耗很長的運算時間。TensorFlow提供了一個可以分布式部署的模式,將一個訓練任務拆分成多個小任務,配置到不同的計算機上完成協同運算,這樣使用計算機群運算來代替單機運算,可以使訓練時間大幅度縮短。
一 分布式TensorFlow角色以及原理
要想配置TensorFlow為分布訓練,首先需要了解TensorFlow中關於分布式的角色分配。
- ps:作為分布式訓練的服務端,等到各個終端(supervisors)來連接。
- worker:在TensorFlow的代碼注釋中被稱為supervisors,作為分布式訓練的運算終端。
- chief supervisors:在眾多運算終端中必須選中一個作為主要的運算終端。該終端是在運算終端中最先啟動的,它的功能是合並各個終端運算后的學習參數,將其保存再寫入。
每個具體角色網絡標識都是唯一的,即分布在不同IP的機器上(或者同一主機但不同端口號)。
在實際運行中,各個角色的網絡構建部分代碼必須完全相同。三者的分工如下:
- 服務器端作為一個多方協調者,等待各個運算終端來連接。
- cheif supervisors會在啟動時統一管理全局的學習參數,進行初始化或從模型載入。
- 其它的運算終端只是負責得到其相應的任務並進行計算,並不會保存檢查點以及用於TensorBoard可視化的summary日志等任何參數信息。
二 分布部署TensorFlow的具體方法
配置過程中,首先創建一個server,在server中會將ps以及所有worker的ip端口准備好,接着使用tf.train.Supervisor中的managed_seesion來管理打開的session,session只負責運算,而通信協調的事情就都交給Supervisor來管理了。
三 使用TensorFlow實現分布式部署訓練
下面開始實現一個分布式訓練的網絡模型,仍然以線性回歸的模型作為原型,並將其改為分布式。使我們需要在本機通過3個端口來建立3個終端,分別是ps,兩個worker。代碼主要分為以下幾部分:
1.為每個角色創建IP地址和端口,創建server
首先創建集群(cluster), ClusterSpec的定義,需要把你要跑這個任務的所有的ps和worker 的節點的ip和端口的信息都包含進去, 所有的角色都要執行這段代碼, 就大家互相知道了, 這個集群里面都有哪些成員,不同的成員的類型是什么, 是ps還是worker。
然后創建一個server,在server中會將ps以及所有worker的ip端口准備好,在同一台電腦開三個不同的端口,分別代表ps,chief supervisors和worker。角色的名稱用strjob_name表示。從 tf.train.Server這個的定義開始,就每個角色不一樣了。 如果角色名字是ps的話, 程序就join到這里,作為參數更新的服務, 等待其他worker角色給它提交參數更新的數據。如果是worker角色,就執行后面的計算任務。以ps為例(先創建ps文件):
''' (1)為每個角色添加IP地址和端口,創建server ''' '''定義IP和端口號''' #指定服務器ip和port strps_hosts = '127.0.0.1:1234' #指定兩個終端的ip和port strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236' #定義角色名稱 strjob_name = 'ps' task_index = 0 #將字符串轉為數組 ps_hosts = strps_hosts.split(',') worker_hosts = strworker_hosts.split(',') cluster_spec= tf.train.ClusterSpec({'ps':ps_hosts,'worker':worker_hosts}) #創建Server server = tf.train.Server( cluster_spec, job_name = strjob_name, task_index = task_index)
2.為ps角色添加等待函數
ps角色使用server.join()函數進行線程掛起,開始接受連接消息。
''' (2) 為ps角色添加等待函數 ''' #ps角色處於監聽狀態,等待終端連接 if strjob_name == 'ps': print('waiting....') server.join()
3.創建網絡結構
與正常的程序不同,在創建網絡結構時,使用tf.device()函數將全部的節點都放在當前任務下。task:0對應worker1(可以理解為任務0對應着角色1),task:1對應worker2。
在rf.device()函數中的任務是通過tf.train.replica_device_setter()來指定的。
在tf.train.replica_device_setter()中使用worker_device()來定義具體任務名稱:使用cluster的配置來指定角色和對應的ip地址,從而實現整個任務下的圖節點,
''' (3) 創建網絡結構 ''' #設定訓練集數據長度 n_train = 100 #生成x數據,[-1,1]之間,均分成n_train個數據 train_x = np.linspace(-1,1,n_train).reshape(n_train,1) #把x乘以2,在加入(0,0.3)的高斯正太分布 train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1]) #繪制x,y波形 plt.figure() plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點標記一個點 plt.legend() plt.show() #創建網絡結構時,通過tf.device()函數將全部的節點都放在當前任務下 task:0對應worker1 task:1對應worker2 with tf.device(tf.train.replica_device_setter( worker_device = '/job:worker/task:{0}'.format(task_index), cluster = cluster_spec)): ''' 前向反饋 ''' #創建占位符 input_x = tf.placeholder(dtype=tf.float32) input_y = tf.placeholder(dtype=tf.float32) #模型參數 w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設置正太分布參數 初始化權重 b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設置正太分布參數 初始化偏置 #創建一個global_step變量 global_step = tf.train.get_or_create_global_step() #前向結構 pred = tf.multiply(w,input_x) + b #將預測值以直方圖形式顯示,給直方圖命名為'pred' tf.summary.histogram('pred',pred) ''' 反向傳播bp ''' #定義代價函數 選取二次代價函數 cost = tf.reduce_mean(tf.square(input_y - pred)) #將損失以標量形式顯示 該變量命名為loss_function tf.summary.scalar('loss_function',cost) #設置求解器 采用梯度下降法 學習了設置為0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,globle_step就會自動獲得當前迭代的次數 train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step) saver = tf.train.Saver(max_to_keep = 1) #合並所有的summary merged_summary_op = tf.summary.merge_all() #初始化所有變量,因此變量需要放在其前面定義 init =tf.global_variables_initializer()
為了使載入檢查點文件能夠同步循環次數,這里添加了一個global_step變量,將其放到優化器中。這樣每運行一次優化器,global_step就會自動加1.
4.創建Supervisor,管理session
''' (4)創建Supervisor,管理session ''' training_epochs = 2000 display_step = 20 sv = tf.train.Supervisor(is_chief = (task_index == 0), #0號worker為chief logdir='./LinearRegression/super/', #檢查點和summary文件保存的路徑 init_op = init, #初始化所有變量 summary_op = None, #summary_op用於自動保存summary文件,設置為None,表示不自動保存 saver = saver, #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。否則設置為None global_step = global_step, save_model_secs = 50 #保存檢查點文件的時間間隔 )
- 在tf.train.Supervisor()函數中,is_cheif表明了是否為cheif supervisors角色,這里將task_index = 0的worker設置成chief supervisors。
- logdir:為檢查點和summary日志文件的保存路徑。不過這個似乎啟動就會去這個logdir的目錄去看有沒有checkpoint的文件, 有的話就自動裝載了,沒有就用init_op指定的初始化參數。
- init_op:表示使用初始化變量的函數。
- summary_op:將保存summary的對象傳入,就會自動保存summary文件。這里設置為None,表示不自動保存。
- saver:將保存檢查點的saver對象傳入,Supervisor就會自動保存檢查點文件。如果不想自動保存,就設置為None。
- global_step:為迭代次數。
- save_model_op:為保存檢查點文件的時間間隔,這里設置成50,表明每50秒自動保存一次檢查點文件。為了使程序運行時間長一些,我們更改了training_epochs參數。
5.迭代訓練
session中的內容和之前的一樣,直接迭代訓練即可,由於使用了Supervisor管理session,將使用sv.summary_computed函數來保存summary文件,同樣,如果想要手動保存監測點文件,也可以使用sv.saver.save()函數。
''' (5) 迭代訓練 ''' #連接目標角色創建session with sv.managed_session(server.target) as sess: print("sess ok:") print(global_step.eval(session=sess)) print('開始迭代:') #存放批次值和代價值 plotdata = {'batch_size':[],'loss':[]} #開始迭代 這里step表示當前執行步數,迭代training_epochs輪 需要執行training_epochs*n_train步 for step in range(training_epochs*n_train): for (x,y) in zip(train_x,train_y): #開始執行圖 並返回當前步數 _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y}) #生成summary summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y}) #將summary寫入文件 手動保存summary日志文件 sv.summary_computed(sess,summary_str,global_step = step) #一輪訓練完成后 打印輸出信息 if step % display_step == 0: #計算代價值 loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y}) print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b))) #保存每display_step輪訓練后的代價值以及當前迭代輪數 if not loss == np.nan: plotdata['batch_size'].append(step) plotdata['loss'].append(loss) print('Finished!') #手動保存檢查點文件 #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step) sv.stop()
- 在設置了自動保存檢查點文件后,手動保存仍然有效。程序里我們在Supervisor對象創建的時候指定了自動保存檢查點文件,程序里被我注釋掉的最后一行是采用手動保存檢查點文件。
- 在Supervisor對象創建的時候指定了不自動保存summary日志文件,我們采用了手動保存,調用了sv.summary_computed()函數。
- 在運行一半后終止,再運行Supervisor時會自動載入模型的參數,不需要手動調用saver.restore()。
- 在session中,不需要再運行tf.global_variables_initializer()函數。因為在Supervisor建立的時候回調用傳入的init_op進行初始化,如果加了sess.run(tf.global_variables_initializer()),則會導致所載入模型的變量被二次清空。
6.建立worker文件
將ps.py文件復制兩份,一個叫worker1.py,一個叫worker2.py。將角色名稱修改為worker,並將worker2.py中的task_index修改為1。同時需要將worker2.py文件中手動保存summary日志的代碼注釋掉。
worker1.py文件修改如下:
#定義角色名稱 strjob_name = 'worker' task_index = 0
worker2.py文件修改如下:
#定義角色名稱 strjob_name = 'worker' task_index = 1
在這個程序中使用了sv.summary_computed()函數手動將運行時動態的數據保存下來,以便於在TensorBoard中查看,但是在分布式部署的時候,使用該功能還需要注意以下幾點:
- worker2文件中不能使用sv.summary_computed()函數,因為worker2不是chief supervisors,在worker2中是不會為Supervisor對象構造默認summary_writer(所有的summary日志信息都要通過該對象進行寫)對象的,所以即使程序調用sv.summary_computed()也無法執行下去,程序會報錯。
- 手寫控制summary日志和檢查點文件保存時,需要將chief supervisors以外的worker全部去掉才可以,可以使用Supervisor按時間間隔保存的形式來管理,這樣用一套代碼就可以解決了。
7.部署運行
在spyder中先將ps.py文件運行起來,選擇菜單Consoles->Open an Ipython console,新打開一個Consoles,如下圖
在spider面板右下角,可以看到在原有標簽為'Console 1/A'標簽又多了一個‘Console 2/A’標簽,選中這個標簽,就激活了這個標簽。
運行worker2.py文件。同理,啟動'Console 3/A'運行worker1.py文件。
下面我們可以看到worker1.py文件的輸出:
我們在程序中設置display_step為20,即迭代20次輸出一次信息,我們可能看到這個輸出並不是連續的,這是因為跳過的步驟被分配到了worker2中去運算了。
worker2.py文件對應的窗口顯示的信息如下:
從圖中可以看到worker2和chief supervisors的迭代順序是互補,但也有可能是沒有絕對互補的,但是為什么有時候沒有絕對互補?可能與Supervisor中的同步算法有關。
分布運算的目的是為了提高整體運算速度,如果同步epoch的准確度需要以犧牲總體運算速度為代價,自然很不合適。所以更合理的推斷是因為單機單次運算太快迫使算法使用了更寬松的同步機制。
重要的一點是對於指定步數的學習參數w和b是一致的。即統一迭代論述的值是一樣的,這表明兩個終端是在相同的起點上進行運算的。
對於ps.py文件,其對應的窗口一直默默的只顯示打印的那句話waiting....,因為它只負責連接參與運算。
四 最后再補充一些名詞解釋
客戶端(Client)
- 客戶端是一個用於建立TensorFlow計算圖並創立與集群進行交互的會話層
tensorflow::Session
的程序。一般客戶端是通過python或C++實現的。一個獨立的客戶端進程可以同時與多個TensorFlow的服務端相連 ,同時一個獨立的服務端也可以與多個客戶端相連。
集群(Cluster)
- 一個TensorFlow的集群里包含了一個或多個作業(job), 每一個作業又可以拆分成一個或多個任務(task)。集群的概念主要用與一個特定的高層次對象中,比如說訓練神經網絡,並行化操作多台機器等等。集群對象可以通過
tf.train.ClusterSpec
來定義。
作業(Job)
- 一個作業可以拆封成多個具有相同目的的任務(task),比如說,一個稱之為ps(parameter server,參數服務器)的作業中的任務主要是保存和更新變量,而一個名為worker(工作)的作業一般是管理無狀態且主要從事計算的任務。一個作業中的任務可以運行於不同的機器上,作業的角色也是靈活可變的,比如說稱之為”worker”的作業可以保存一些狀態。
任務(Task)
- 任務相當於是一個特定的TesnsorFlow服務端,其相當於一個獨立的進程,該進程屬於特定的作業並在作業中擁有對應的序號。
TensorFlow服務端(TensorFlow server) 。
ps.py完整代碼:

# -*- coding: utf-8 -*- """ Created on Thu Apr 19 08:52:30 2018 @author: zy """ import tensorflow as tf import numpy as np import os import matplotlib.pyplot as plt ''' 分布式計算 ''' ''' (1)為每個角色添加IP地址和端口,創建server ''' '''定義IP和端口號''' #指定服務器ip和port strps_hosts = '127.0.0.1:1234' #指定兩個終端的ip和port strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236' #定義角色名稱 strjob_name = 'ps' task_index = 0 #將字符串轉為數組 ps_hosts = strps_hosts.split(',') worker_hosts = strworker_hosts.split(',') cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts}) #創建server server = tf.train.Server( cluster_spec, job_name = strjob_name, task_index = task_index) ''' (2) 為ps角色添加等待函數 ''' #ps角色處於監聽狀態,等待終端連接 if strjob_name == 'ps': print('waiting....') server.join() ''' (3) 創建網絡結構 ''' #設定訓練集數據長度 n_train = 100 #生成x數據,[-1,1]之間,均分成n_train個數據 train_x = np.linspace(-1,1,n_train).reshape(n_train,1) #把x乘以2,在加入(0,0.3)的高斯正太分布 train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1]) #繪制x,y波形 plt.figure() plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點標記一個點 plt.legend() plt.show() #創建網絡結構時,通過tf.device()函數將全部的節點都放在當前任務下 with tf.device(tf.train.replica_device_setter( worker_device = '/job:worker/task:{0}'.format(task_index), cluster = cluster_spec)): ''' 前向反饋 ''' #創建占位符 input_x = tf.placeholder(dtype=tf.float32) input_y = tf.placeholder(dtype=tf.float32) #模型參數 w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設置正太分布參數 初始化權重 b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設置正太分布參數 初始化偏置 #創建一個global_step變量 global_step = tf.train.get_or_create_global_step() #前向結構 pred = tf.multiply(w,input_x) + b #將預測值以直方圖形式顯示,給直方圖命名為'pred' tf.summary.histogram('pred',pred) ''' 反向傳播bp ''' #定義代價函數 選取二次代價函數 cost = tf.reduce_mean(tf.square(input_y - pred)) #將損失以標量形式顯示 該變量命名為loss_function tf.summary.scalar('loss_function',cost) #設置求解器 采用梯度下降法 學習了設置為0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,global_step就會自動獲得當前迭代的次數 train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step) saver = tf.train.Saver(max_to_keep = 1) #合並所有的summary merged_summary_op = tf.summary.merge_all() #初始化所有變量,因此變量需要放在其前面定義 init =tf.global_variables_initializer() ''' (4)創建Supervisor,管理session ''' training_epochs = 2000 display_step = 20 sv = tf.train.Supervisor(is_chief = (task_index == 0), #0號worker為chief logdir='./LinearRegression/super/', #檢查點和summary文件保存的路徑 init_op = init, #初始化所有變量 summary_op = None, #summary_op用於自動保存summary文件,設置為None,表示不自動保存 saver = saver, #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。否則設置為None global_step = global_step, save_model_secs = 50 #保存檢查點文件的時間間隔 ) ''' (5) 迭代訓練 ''' #連接目標角色創建session with sv.managed_session(server.target) as sess: print("sess ok:") print(global_step.eval(session=sess)) print('開始迭代:') #存放批次值和代價值 plotdata = {'batch_size':[],'loss':[]} #開始迭代 這里step表示當前執行步數,迭代training_epochs輪 需要執行training_epochs*n_train步 for step in range(training_epochs*n_train): for (x,y) in zip(train_x,train_y): #開始執行圖 並返回當前步數 _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y}) #生成summary summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y}) #將summary寫入文件 手動保存summary日志文件 sv.summary_computed(sess,summary_str,global_step = step) #一輪訓練完成后 打印輸出信息 if step % display_step == 0: #計算代價值 loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y}) print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b))) #保存每display_step輪訓練后的代價值以及當前迭代輪數 if not loss == np.nan: plotdata['batch_size'].append(step) plotdata['loss'].append(loss) print('Finished!') #手動保存檢查點文件 #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step) sv.stop()
worker1.py完整代碼:

# -*- coding: utf-8 -*- """ Created on Thu Apr 19 08:52:30 2018 @author: zy """ import tensorflow as tf import numpy as np import os import matplotlib.pyplot as plt ''' 分布式計算 ''' ''' (1)為每個角色添加IP地址和端口,創建worker ''' '''定義IP和端口號''' #指定服務器ip和port strps_hosts = '127.0.0.1:1234' #指定兩個終端的ip和port strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236' #定義角色名稱 strjob_name = 'worker' task_index = 0 #將字符串轉為數組 ps_hosts = strps_hosts.split(',') worker_hosts = strworker_hosts.split(',') cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts}) #創建server server = tf.train.Server( cluster_spec, job_name = strjob_name, task_index = task_index) ''' (2) 為ps角色添加等待函數 ''' #ps角色處於監聽狀態,等待終端連接 if strjob_name == 'ps': print('waiting....') server.join() ''' (3) 創建網絡結構 ''' #設定訓練集數據長度 n_train = 100 #生成x數據,[-1,1]之間,均分成n_train個數據 train_x = np.linspace(-1,1,n_train).reshape(n_train,1) #把x乘以2,在加入(0,0.3)的高斯正太分布 train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1]) #繪制x,y波形 plt.figure() plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點標記一個點 plt.legend() plt.show() #創建網絡結構時,通過tf.device()函數將全部的節點都放在當前任務下 with tf.device(tf.train.replica_device_setter( worker_device = '/job:worker/task:{0}'.format(task_index), cluster = cluster_spec)): ''' 前向反饋 ''' #創建占位符 input_x = tf.placeholder(dtype=tf.float32) input_y = tf.placeholder(dtype=tf.float32) #模型參數 w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設置正太分布參數 初始化權重 b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設置正太分布參數 初始化偏置 #創建一個global_step變量 global_step = tf.train.get_or_create_global_step() #前向結構 pred = tf.multiply(w,input_x) + b #將預測值以直方圖形式顯示,給直方圖命名為'pred' tf.summary.histogram('pred',pred) ''' 反向傳播bp ''' #定義代價函數 選取二次代價函數 cost = tf.reduce_mean(tf.square(input_y - pred)) #將損失以標量形式顯示 該變量命名為loss_function tf.summary.scalar('loss_function',cost) #設置求解器 采用梯度下降法 學習了設置為0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,global_step就會自動獲得當前迭代的次數 train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step) saver = tf.train.Saver(max_to_keep = 1) #合並所有的summary merged_summary_op = tf.summary.merge_all() #初始化所有變量,因此變量需要放在其前面定義 init =tf.global_variables_initializer() ''' (4)創建Supervisor,管理session ''' training_epochs = 2000 display_step = 20 sv = tf.train.Supervisor(is_chief = (task_index == 0), #0號worker為chief logdir='./LinearRegression/super/', #檢查點和summary文件保存的路徑 init_op = init, #初始化所有變量 summary_op = None, #summary_op用於自動保存summary文件,設置為None,表示不自動保存 saver = saver, #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。否則設置為None global_step = global_step, save_model_secs = 50 #保存檢查點文件的時間間隔 ) ''' (5) 迭代訓練 ''' #連接目標角色創建session with sv.managed_session(server.target) as sess: print("sess ok:") print(global_step.eval(session=sess)) print('開始迭代:') #存放批次值和代價值 plotdata = {'batch_size':[],'loss':[]} #開始迭代 這里step表示當前執行步數,迭代training_epochs輪 需要執行training_epochs*n_train步 for step in range(training_epochs*n_train): for (x,y) in zip(train_x,train_y): #開始執行圖 並返回當前步數 _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y}) #生成summary summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y}) #將summary寫入文件 手動保存summary日志文件 sv.summary_computed(sess,summary_str,global_step = step) #一輪訓練完成后 打印輸出信息 if step % display_step == 0: #計算代價值 loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y}) print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b))) #保存每display_step輪訓練后的代價值以及當前迭代輪數 if not loss == np.nan: plotdata['batch_size'].append(step) plotdata['loss'].append(loss) print('Finished!') #手動保存檢查點文件 #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step) sv.stop()
worker2.py完整代碼:

# -*- coding: utf-8 -*- """ Created on Thu Apr 19 08:52:30 2018 @author: zy """ import tensorflow as tf import numpy as np import os import matplotlib.pyplot as plt ''' 分布式計算 ''' ''' (1)為每個角色添加IP地址和端口,創建worker ''' '''定義IP和端口號''' #指定服務器ip和port strps_hosts = '127.0.0.1:1234' #指定兩個終端的ip和port strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236' #定義角色名稱 strjob_name = 'worker' task_index = 1 #將字符串轉為數組 ps_hosts = strps_hosts.split(',') worker_hosts = strworker_hosts.split(',') cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts}) #創建server server = tf.train.Server( cluster_spec, job_name = strjob_name, task_index = task_index) ''' (2) 為ps角色添加等待函數 ''' #ps角色處於監聽狀態,等待終端連接 if strjob_name == 'ps': print('waiting....') server.join() ''' (3) 創建網絡結構 ''' #設定訓練集數據長度 n_train = 100 #生成x數據,[-1,1]之間,均分成n_train個數據 train_x = np.linspace(-1,1,n_train).reshape(n_train,1) #把x乘以2,在加入(0,0.3)的高斯正太分布 train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1]) #繪制x,y波形 plt.figure() plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點標記一個點 plt.legend() plt.show() #創建網絡結構時,通過tf.device()函數將全部的節點都放在當前任務下 with tf.device(tf.train.replica_device_setter( worker_device = '/job:worker/task:{0}'.format(task_index), cluster = cluster_spec)): ''' 前向反饋 ''' #創建占位符 input_x = tf.placeholder(dtype=tf.float32) input_y = tf.placeholder(dtype=tf.float32) #模型參數 w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設置正太分布參數 初始化權重 b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設置正太分布參數 初始化偏置 #創建一個global_step變量 global_step = tf.train.get_or_create_global_step() #前向結構 pred = tf.multiply(w,input_x) + b #將預測值以直方圖形式顯示,給直方圖命名為'pred' tf.summary.histogram('pred',pred) ''' 反向傳播bp ''' #定義代價函數 選取二次代價函數 cost = tf.reduce_mean(tf.square(input_y - pred)) #將損失以標量形式顯示 該變量命名為loss_function tf.summary.scalar('loss_function',cost) #設置求解器 采用梯度下降法 學習了設置為0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,global_step就會自動獲得當前迭代的次數 train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step) saver = tf.train.Saver(max_to_keep = 1) #合並所有的summary merged_summary_op = tf.summary.merge_all() #初始化所有變量,因此變量需要放在其前面定義 init =tf.global_variables_initializer() ''' (4)創建Supervisor,管理session ''' training_epochs = 2000 display_step = 20 sv = tf.train.Supervisor(is_chief = (task_index == 0), #0號worker為chief logdir='./LinearRegression/super/', #檢查點和summary文件保存的路徑 init_op = init, #初始化所有變量 summary_op = None, #summary_op用於自動保存summary文件,設置為None,表示不自動保存 saver = saver, #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。否則設置為None global_step = global_step, save_model_secs = 50 #保存檢查點文件的時間間隔 ) ''' (5) 迭代訓練 ''' #連接目標角色創建session with sv.managed_session(server.target) as sess: print("sess ok:") print(global_step.eval(session=sess)) print('開始迭代:') #存放批次值和代價值 plotdata = {'batch_size':[],'loss':[]} #開始迭代 這里step表示當前執行步數,迭代training_epochs輪 需要執行training_epochs*n_train步 for step in range(training_epochs*n_train): for (x,y) in zip(train_x,train_y): #開始執行圖 並返回當前步數 _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y}) #生成summary summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y}) #將summary寫入文件 手動保存summary日志文件 #sv.summary_computed(sess,summary_str,global_step = step) #一輪訓練完成后 打印輸出信息 if step % display_step == 0: #計算代價值 loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y}) print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b))) #保存每display_step輪訓練后的代價值以及當前迭代輪數 if not loss == np.nan: plotdata['batch_size'].append(step) plotdata['loss'].append(loss) print('Finished!') #手動保存檢查點文件 #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step) sv.stop()