近幾年,信息時代的快速發展產生了海量數據,誕生了無數前沿的大數據技術與應用。在當今大數據時代的產業界,商業決策日益基於數據的分析作出。當數據膨脹到一定規模時,基於機器學習對海量復雜數據的分析更能產生較好的價值,而深度學習在大數據場景下更能揭示數據內部的邏輯關系。本文就以大數據作為場景,通過自底向上的教程詳述在大數據架構體系中如何應用深度學習這一技術。大數據架構中采用的是hadoop系統以及Kerberos安全認證,深度學習采用的是分布式的Tensorflow架構,hadoop解決了大數據的存儲問題,而分布式Tensorflow解決了大數據訓練的問題。本教程是我們團隊在開發基於深度學習的實時欺詐預警服務時,部署深度學習這一模塊時總結出的經驗,感興趣的歡迎深入交流。
安裝Tensorflow
我們安裝Tensorflow選擇的是Centos7,因為Tensorflow需要使用GNU發布的1.5版本的libc庫,Centos6系統並不適用該版本庫而被拋棄。對於如何聯網在線安裝Tensorflow,官網有比較詳盡的教程。本教程着重講一下網上資料較少的離線安裝方式,系統的安裝更需要在意的是各軟件版本的一致性,下面教程也是解決了很多版本不一致的問題后給出的一個方案。首先我們先將整個系統搭建起來吧。
1.安裝編程語言Python3.5:在官網下載軟件並解壓后執行如下安裝命令:
-
./configure
-
make
-
make test
-
sudo make install
2.安裝基於Python的科學計算包python-numpy:在官網下載軟件並解壓后執行如下安裝命令:
python setup.py install
3.安裝Python模塊管理的工具wheel:在官網下載軟件后執行如下安裝命令:
pip install wheel-0.30.0a0-py2.py3-none-any.whl
4.安裝自動下載、構建、安裝和管理 python 模塊的工具setuptools:在官網下載軟件並解壓后執行如下安裝命令:
python setup.py install
5.安裝Python開發包python-devel:在官網下載軟件后執行如下安裝命令:
sudo rpm -i --nodeps python3-devel-3.5.2-4.fc25.x86_64.rpm
6.安裝Python包安裝管理工具six:在官網下載軟件后執行如下安裝命令:
sudo pip install six-1.10.0-py2.py3-none-any.whl
7.安裝Java 開發環境JDK8:在官網下載軟件並解壓后執行如下移動命令:
mv java1.8 /usr/local/software/jdk
設置JDK的環境變量,編輯文件 .bashrc,加入下面內容
-
export JAVA_HOME=/usr/local/software/jdk
-
export JRE_HOME=${JAVA_HOME}/jre
-
export CLASSPATH=$CLASSPATH:${JAVA_HOME}/lib:${JRE_HOME}/lib
-
export PATH=$PATH:${JAVA_HOME}/bin
進行Java版本的切換,選擇對應的版本
-
sudo update-alternatives --config java
-
sudo update-alternatives --config javac
8.安裝Bazel:Bazel是一個類似於Make的工具,是Google為其內部軟件開發的特點量身定制的工具,構建Tensorflow項目。在官網下載后執行如下安裝命令:
-
chmod +x bazel-0.4.3-installer-linux-x86_64.sh
-
./bazel-0.4.3-installer-linux-x86_64.sh –user
9.安裝Tensorflow:在官網下載軟件后執行如下安裝命令:
pip install --upgrade tensorflow-0.12.1-cp35-cp35m-linux_x86_64.whl
Tensorflow訪問HDFS的部署
1.首先安裝Hadoop客戶端,在官網下載后執行下面解壓移動命令:
-
tar zxvf hadoop- 2.6.0.tar.gz
-
mv hadoop- 2.6.0.tar.gz /usr/local/software/Hadoop
進行環境變量的配置/etc/profile,加入如下內容
-
export PATH=$PATH:/usr/local/software/hadoop/bin
-
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server
-
export HADOOP_HOME=/usr/local/software/hadoop
-
export HADOOP_HDFS_HOME=/usr/local/software/hadoop
配置完后進行配置更新source /etc/profile
2.其次,安裝完客戶端后,配置自己的hadoop集群環境文件。
Tensorflow與Kerberos驗證的部署
在Tesorflow0.12版本中已經支持了Kerberos驗證,本機只要配置好Kerberos文件即可使用。該文中不詳述Kerberos的配置內容,羅列一下相關的配置流程。
- 首先在/etc/krb5.conf文件中進行服務器跟驗證策略的配置;
- 然后在Kerberos服務端生成一個用戶文件傳至本機;
- 最后進行Kerberos客戶端的權限認證並設置定時任務。
大數據場景下基於分布式Tensorflow的深度學習示例
一、進行數據格式的轉換
本文的示例是做的MNIST數據的識別模型,為了更好的讀取數據更好的利用內存,我們將本地GZ文件轉換成Tensorflow的內定標准格式TFRecord,然后再將轉換后的文件上傳到HDFS存儲。在實際應用中,我們實際利用Spark做了大規模格式轉換的處理程序。我們對本地數據處理的相應的轉換代碼為:
-
from __future__ import absolute_import
-
from __future__ import division
-
from __future__ import print_function
-
import argparse
-
import os
-
import tensorflow as tf
-
from tensorflow.contrib.learn.python.learn.datasets import mnist
-
SOURCE_URL = 'http://yann.lecun.com/exdb/mnist/'
-
TRAIN_IMAGES = 'train-images-idx3-ubyte.gz' # MNIST filenames
-
TRAIN_LABELS = 'train-labels-idx1-ubyte.gz'
-
TEST_IMAGES = 't10k-images-idx3-ubyte.gz'
-
TEST_LABELS = 't10k-labels-idx1-ubyte.gz'
-
FLAGS = None
-
def _int64_feature(value):
-
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
-
-
def _bytes_feature(value):
-
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
-
-
def convert_to(data_set, name):
-
images = data_set.images
-
labels = data_set.labels
-
num_examples = data_set.num_examples
-
if images.shape[0] != num_examples:
-
raise ValueError('Images size %d does not match label size %d.' %
-
(images.shape[ 0], num_examples))
-
rows = images.shape[ 1]
-
cols = images.shape[ 2]
-
depth = images.shape[ 3]
-
filename = os.path.join(FLAGS.directory, name + '.tfrecords')
-
print( 'Writing', filename)
-
writer = tf.python_io.TFRecordWriter(filename)
-
for index in range(num_examples):
-
image_raw = images[index].tostring()
-
example = tf.train.Example(features=tf.train.Features(feature={
-
'height': _int64_feature(rows),
-
'width': _int64_feature(cols),
-
'depth': _int64_feature(depth),
-
'label': _int64_feature(int(labels[index])),
-
'image_raw': _bytes_feature(image_raw)}))
-
writer.write(example.SerializeToString())
-
writer.close()
-
-
def main(argv):
-
# Get the data.
-
data_sets = mnist.read_data_sets(FLAGS.directory,
-
dtype=tf.uint8,
-
reshape= False,
-
validation_size=FLAGS.validation_size)
-
# Convert to Examples and write the result to TFRecords.
-
convert_to(data_sets.train, 'train')
-
convert_to(data_sets.validation, 'validation')
-
convert_to(data_sets.test, 'test')
-
if __name__ == '__main__':
-
parser = argparse.ArgumentParser()
-
parser.add_argument(
-
'--directory',
-
type=str,
-
default= '/tmp/data',
-
help= 'Directory to download data files and write the converted result'
-
)
-
parser.add_argument(
-
'--validation_size',
-
type=int,
-
default= 5000,
-
help= """\
-
Number of examples to separate from the training data for the validation
-
set.\
-
"""
-
)
-
FLAGS = parser.parse_args()
-
tf.app.run()
二、Tensorflow讀取HDFS數據的設置
文中前面內容介紹了HDFS的配置以及將數據轉換后存儲到HDFS,Tensorflow讀取HDFS時只需要簡單的兩步,首先執行項目時需要加入環境前綴:
CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python example.py
其次讀取數據時,需要在數據的路徑前面加入HDFS前綴,比如:
hdfs://default/user/data/example.txt
三、分布式模型的示例代碼
該示例代碼是讀取HDFS上的MNIST數據,建立相應的server與work集群構建出一個三層的深度網絡,包含兩層卷積層以及一層SoftMax層。代碼如下:
-
from __future__ import print_function
-
import math
-
import os
-
import tensorflow as tf
-
flags = tf.app.flags
-
# Flags for configuring the task
-
flags.DEFINE_string( "job_name", None, "job name: worker or ps")
-
flags.DEFINE_integer( "task_index", 0,
-
"Worker task index, should be >= 0. task_index=0 is "
-
"the chief worker task the performs the variable "
-
"initialization")
-
flags.DEFINE_string( "ps_hosts", "",
-
"Comma-separated list of hostname:port pairs")
-
flags.DEFINE_string( "worker_hosts", "",
-
"Comma-separated list of hostname:port pairs")
-
# Training related flags
-
flags.DEFINE_string( "data_dir", None,
-
"Directory where the mnist data is stored")
-
flags.DEFINE_string( "train_dir", None,
-
"Directory for storing the checkpoints")
-
flags.DEFINE_integer( "hidden1", 128,
-
"Number of units in the 1st hidden layer of the NN")
-
flags.DEFINE_integer( "hidden2", 128,
-
"Number of units in the 2nd hidden layer of the NN")
-
flags.DEFINE_integer( "batch_size", 100, "Training batch size")
-
flags.DEFINE_float( "learning_rate", 0.01, "Learning rate")
-
FLAGS = flags.FLAGS
-
TRAIN_FILE = "train.tfrecords"
-
NUM_CLASSES = 10
-
IMAGE_SIZE = 28
-
IMAGE_PIXELS = IMAGE_SIZE * IMAGE_SIZE
-
-
def inference(images, hidden1_units, hidden2_units):
-
with tf.name_scope('hidden1'):
-
weights = tf.Variable(
-
tf.truncated_normal([IMAGE_PIXELS, hidden1_units],
-
stddev= 1.0 / math.sqrt(float(IMAGE_PIXELS))),name='weights')
-
biases = tf.Variable(tf.zeros([hidden1_units]),name= 'biases')
-
hidden1 = tf.nn.relu(tf.matmul(images, weights) + biases)
-
with tf.name_scope('hidden2'):
-
weights = tf.Variable(
-
tf.truncated_normal([hidden1_units, hidden2_units],
-
stddev= 1.0 / math.sqrt(float(hidden1_units))),
-
name= 'weights')
-
biases = tf.Variable(tf.zeros([hidden2_units]),
-
name= 'biases')
-
hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
-
with tf.name_scope('softmax_linear'):
-
weights = tf.Variable(
-
tf.truncated_normal([hidden2_units, NUM_CLASSES],
-
stddev= 1.0 / math.sqrt(float(hidden2_units))),name='weights')
-
biases = tf.Variable(tf.zeros([NUM_CLASSES]),name= 'biases')
-
logits = tf.matmul(hidden2, weights) + biases
-
return logits
-
-
def lossFunction(logits, labels):
-
labels = tf.to_int64(labels)
-
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
-
logits, labels, name= 'xentropy')
-
loss = tf.reduce_mean(cross_entropy, name= 'xentropy_mean')
-
return loss
-
-
def training(loss, learning_rate):
-
tf.summary.scalar(loss.op.name, loss)
-
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
-
global_step = tf.Variable( 0, name='global_step', trainable=False)
-
train_op = optimizer.minimize(loss, global_step=global_step)
-
return train_op
-
-
def read_and_decode(filename_queue):
-
reader = tf.TFRecordReader()
-
_, serialized_example = reader.read(filename_queue)
-
features = tf.parse_single_example(
-
serialized_example,
-
# Defaults are not specified since both keys are required.
-
features={
-
'image_raw': tf.FixedLenFeature([], tf.string),
-
'label': tf.FixedLenFeature([], tf.int64),
-
})
-
-
# Convert from a scalar string tensor (whose single string has
-
# length mnist.IMAGE_PIXELS) to a uint8 tensor with shape
-
# [mnist.IMAGE_PIXELS].
-
image = tf.decode_raw(features[ 'image_raw'], tf.uint8)
-
image.set_shape([IMAGE_PIXELS])
-
image = tf.cast(image, tf.float32) * ( 1. / 255) - 0.5
-
# Convert label from a scalar uint8 tensor to an int32 scalar.
-
label = tf.cast(features[ 'label'], tf.int32)
-
return image, label
-
-
def inputs(batch_size):
-
"""Reads input data.
-
-
Args:
-
batch_size: Number of examples per returned batch.
-
Returns:
-
A tuple (images, labels), where:
-
* images is a float tensor with shape [batch_size, mnist.IMAGE_PIXELS]
-
in the range [-0.5, 0.5].
-
* labels is an int32 tensor with shape [batch_size] with the true label,
-
a number in the range [0, mnist.NUM_CLASSES).
-
"""
-
filename = os.path.join(FLAGS.data_dir, TRAIN_FILE)
-
-
with tf.name_scope('input'):
-
filename_queue = tf.train.string_input_producer([filename])
-
# Even when reading in multiple threads, share the filename
-
# queue.
-
image, label = read_and_decode(filename_queue)
-
# Shuffle the examples and collect them into batch_size batches.
-
# (Internally uses a RandomShuffleQueue.)
-
# We run this in two threads to avoid being a bottleneck.
-
images, sparse_labels = tf.train.shuffle_batch(
-
[image, label], batch_size=batch_size, num_threads= 2,
-
capacity= 1000 + 3 * batch_size,
-
# Ensures a minimum amount of shuffling of examples.
-
min_after_dequeue= 1000)
-
return images, sparse_labels
-
-
def device_and_target():
-
# If FLAGS.job_name is not set, we're running single-machine TensorFlow.
-
# Don't set a device.
-
if FLAGS.job_name is None:
-
raise ValueError("Must specify an explicit `job_name`")
-
# Otherwise we're running distributed TensorFlow.
-
print( "Running distributed training")
-
if FLAGS.task_index is None or FLAGS.task_index == "":
-
raise ValueError("Must specify an explicit `task_index`")
-
if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":
-
raise ValueError("Must specify an explicit `ps_hosts`")
-
if FLAGS.worker_hosts is None or FLAGS.worker_hosts == "":
-
raise ValueError("Must specify an explicit `worker_hosts`")
-
cluster_spec = tf.train.ClusterSpec({
-
"ps": FLAGS.ps_hosts.split(","),
-
"worker": FLAGS.worker_hosts.split(","),
-
})
-
server = tf.train.Server(
-
cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
-
return (
-
cluster_spec,
-
server,
-
)
-
-
def main(unused_argv):
-
if FLAGS.data_dir is None or FLAGS.data_dir == "":
-
raise ValueError("Must specify an explicit `data_dir`")
-
if FLAGS.train_dir is None or FLAGS.train_dir == "":
-
raise ValueError("Must specify an explicit `train_dir`")
-
cluster_spec, server = device_and_target()
-
if FLAGS.job_name == "ps":
-
server.join()
-
elif FLAGS.job_name == "worker":
-
with tf.device(tf.train.replica_device_setter(worker_device = "/job:worker/task:{}".format(FLAGS.task_index), cluster=cluster_spec)):
-
images, labels = inputs(FLAGS.batch_size)
-
logits = inference(images, FLAGS.hidden1, FLAGS.hidden2)
-
loss = lossFunction(logits, labels)
-
train_op = training(loss, FLAGS.learning_rate)
-
with tf.train.MonitoredTrainingSession(
-
master=server.target,
-
is_chief=(FLAGS.task_index == 0),
-
checkpoint_dir=FLAGS.train_dir) as sess:
-
while not sess.should_stop():
-
sess.run(train_op)
-
-
if __name__ == "__main__":
-
tf.app.run()
四、分布式模型的啟動
首先關閉防火牆
sudo iptable –F
然后在不同的機器上面啟動服務
-
#在246.1機器上面運行參數服務器,命令:
-
CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py --ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=ps --task_index=0
-
-
-
#在78.41機器上面運行worker0,命令:
-
CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py --ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=worker --task_index=0
-
-
#在78.45機器上面運行worker1,命令:
-
CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py--ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=worker --task_index=1
-
-
#在78.41機器上面運行監控,命令:
-
tensorboard --logdir=/home/bdusr01/checkpoint/
五、模型監控
我們在剛剛的41機器上面啟動了TensorBoard,可以通過地址http://10.142.78.41:6006/進行模型的監控。模型訓練過程中參數可以動態的進行觀測,示例如下:
模型的網絡結構可以詳細的參看每個細節,示例如下:
當我們利用分布式的Tensorflow對大數據進行訓練完成后,可以利用Bazel構建一個靈活高可用的服務–TensorFlow Serving,能夠很方便的將深度學習生產化,解決了模型無法提供服務的弊端。到此為止,本文就將自己項目中的一個基礎模塊的示例介紹完了,本項目更有含金量的是模型建立、工程開發、業務邏輯部分,如有機會再進行更詳細的交流作者:丁廷鶴,碩士期間在復旦大學計算機學院上海市智能信息重點實驗室從事數據挖掘學習,目前在上海一家央企總部工作,從事大數據領域spark全棧模塊、機器學習、深度學習方面的開發和研究。
