利用hadoop分布式生成tfrecord格式文件
由於到處理的數據量較大,億為單位的數據條數,所以提前利用hadoop的分布式的優勢處理成tfrecord格式供tf訓練。
hadoop streaming執行的是mapper reduce流處理。
完整腳本文件放在了github上,
input data format:([one,another'\t'label],split(',' and '\t'))
1 2 3 4 5 6 7 0 8 9 10,1 2 3 4 5 6 7 0 8 9 10 0
mapper
shuffle data
利用mapper的自動排序功能做數據shuffle。
也就是在每一條數據前增加一個隨機數,然后使用其自動排序的機制達到shuffle數據的功能。
def mapper():
for line in sys.stdin:
print('{}\t{}'.format(random.random(), line.strip()))
reducer
generate tfrecord
其中有一個小技巧就是所有的reducer任務是在集群上的機器上運行的,在執行生成tfrecord文件后,要使用hadoop fs -put .....
命令將本機生成的文件上傳到hadoop上。
def reducer(save_path, hadoop_path):
writer = tf.python_io.TFRecordWriter(save_path)
for line in sys.stdin:
line = line.strip().split('\t')
if len(line) != 3:
print(line)
continue
#info = np.array(line[1],dtype='int') #not work
#label = np.array(line[2],dtype='int')
info = line[1].strip().split(',')
pair_info = []
label_info = []
for i in xrange(len(info)):
one_info = info[i].strip().split(' ')
one_info_str = []
for j in xrange(len(one_info)):
one_info_str.append(float(one_info[j]))
pair_info.append(one_info_str)
info = np.array(pair_info, np.float32)
label = line[2].strip()
label_info = int(label)
example = tf.train.Example(
features=tf.train.Features(
feature={'pair' : bytes_feature(info.tobytes()),
'label' : int64_feature(label_info)}))
writer.write(example.SerializeToString())
writer.close()
os.system('hadoop fs -put '+save_path+' '+hadoop_path)
問題記錄
在reducer階段,直接將stdin的數據轉np.array
后用tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
轉換后是能夠生成tfrecord文件的,但是在模型訓練輸入階段卻解析不出來,不知道為啥。
#info = np.array(line[1],dtype='int') #not work
#label = np.array(line[2],dtype='int')
因此選擇將stdin的數據切分開之后再轉換,就能用了。
Amazing!