實驗介紹
數據采用Criteo Display Ads。這個數據一共11G,有13個integer features,26個categorical features。
Spark
由於數據比較大,且只在一個txt文件,處理前用
split -l 400000 train.txt
對數據進行切分。
連續型數據利用log進行變換,因為從實時訓練的角度上來判斷,一般的標准化方式,如Z-Score和最大最小標准化中用到的值都跟某一批數據的整體統計結果有關,換一批數據后標准化就程度就不一樣了。
而對於離散型分類數據,一般企業應該都會有類別表而不需要自己從數據中獲取(這樣能節省計算時間,而且流處理下只能針對特定批量或者時間段出現的數據進行數字編碼,所以對超出該批量和時間的新類別就無法進行編碼了)。雖然如此,如果在離線情況且真的需要自己從數據中提取類別並進行編碼,比如現在這種情況,最直接的方法是使用ML模塊的StringIndexer。這個工具方面使用,但是對於數據類別過多或者需要進行編碼的列數量較多時容易出現OOM。通過StringIndexer的源碼可以知道,它的實現是先利用rdd的countByValue得出特定列的統計map,然后出現頻率最大的編碼為0,第二的為1,如此類推。另外,它會copy這個map,而StringIndexer本身並沒有提供刪除這個map的方法,所以如果出現上述數據類別過多或者需要進行編碼的列數量較多便會積累大量的map。而剛好這份數據有26種類別數據,且某些類別的種類居然能有三百多萬種,所以只能另辟蹊徑。下面的方法效仿StringIndexer的部分實現來達到目的,而且運行效率比之前有了很大的提升。當然,由於某些類別出現的頻率很低,也可以采取一些cutoff措施,比如該用countByValue,只保留前n個類別,或者保留頻率在某個數值以上的類別。
下面實現考慮cutoff,出現次數少於萬分之一的類別統一歸類為UNK。
val spark = SparkSession
.builder()
.master("local[*]")
// 這里的driver.memory和memory.fraction只做展示,實際使用中要在driver啟動前設置才有效。即如果在idea中想增大driver的大小,這需要在VM option中設置堆大小。另外,local模式下設置提高driver大小即可,因為executor也是在同一個JVM進程中。
.config("spark.driver.memory", 5G)
.config("spark.sql.shuffle.partitions", 12)
.config("spark.default.parallelism", 12)
.config("spark.memory.fraction", 0.75)
.getOrCreate()
import org.apache.spark.sql.functions._
val path = ""
// 數據源是txt文件,但可以通過csv來推斷格式
val df = spark.read
.option("header", false)
.option("delimiter", "\t")
.option("inferSchema", true)
.format("csv")
.load(path + "..")
// 如果內存夠大,先把它全部加載到內存,減少IO
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
val dataSize = df.count()
val cutoff = dataSize * 0.0001
val numCols = (1 until 14).map(i => s"_c$i").toArray
var df1 = df
numCols.foreach(column => {
df1 = df1.withColumn(column, when(col(column).isNull, 0).otherwise(log(col(column) + 10)))
})
val catCols = (14 until 40).map(i => s"_c$i")
var df2 = df1
// 所有cat列統一編碼
// 使用java的map,通常java的集合比scala的更效率,而且java的hashmap能夠初始化大小
val inderMap: util.HashMap[String, util.HashMap[String, Int]] = new util.HashMap(catCols.length)
var i = 0
for (column <- catCols) {
val uniqueElem = df2.select(column)
.groupBy(column)
.agg(count(column))
.filter(col(s"count($column)") >= cutoff)
.select(column)
.map(_.getAs[String](0))
.collect()
val len = uniqueElem.length
var index = 0
val freqMap = new util.HashMap[String, Int](len)
while (index < len){
freqMap.put(uniqueElem(index), i)
index += 1
i += 1
}
freqMap.put("UNK", i)
i += 1
inderMap.put(column, freqMap)
}
val bcMap = spark.sparkContext.broadcast(inderMap)
for (column <- catCols) {
val Indexer = udf { elem: String =>
val curMap = bcMap.value.get(column)
if (elem == null || !curMap.containsKey(elem)) curMap.get("UNK")
else curMap.get(elem)
}
df2 = df2.withColumn(column + "_e", Indexer(col(column))).drop(column)
}
// 如需要划分訓練集和測試集
val Array(train, test) = df2.randomSplit(Array(0.9, 0.1))
// parquet輸出
df2.write
.mode("overwrite")
.save(path + "res/")
// txt輸出
df2.map(x => x.mkString(","))
.write
.mode("overwrite")
.format("text")
.save(path + "res/txt")
// 后面tensorflow需要
print("The total dimension of all categorical features is " + i) // 14670
Tensorflow
下面代碼大致介紹深度學習的整體流程,生產環境的代碼需要做一定的修改,可以參考“https://github.com/yangxudong/deeplearning/tree/master/DCN” 和 “https://github.com/lambdaji/tf_repos/tree/master/deep_ctr” 兩個GitHub的實現。
大致流程:定義數據輸入函數input_fn,然后開始規划模型和它的訓練和測試operation,最后是執行階段的代碼。
input function
def input_fn(filenames, batch_size=32):
def _parse_line(line):
fields = tf.decode_csv(line,FIELD_DEFAULTS)
label = fields[0]
num_features = fields[1:14]
cat_features = fields[14:]
return num_features, cat_features, label
num_features, cat_features, label = tf.data.TextLineDataset(filenames)\
.repeat(2)\
.prefetch(1024)\
.batch(batch_size)\
.map(_parse_line, num_parallel_calls=2)\
.make_one_shot_iterator()\
.get_next()
return num_features, cat_features, label
數據和模型的變量
# 構建一些input需要用到的參數。
NUM_COLUMNS = ["c%d" % i for i in range(14)]
CAT_COLUMNS = ["c%d" % i for i in range(14,40)]
FIELD_DEFAULTS = []
for i in range(14):
FIELD_DEFAULTS.append([0.0])
for i in range(14,40):
FIELD_DEFAULTS.append([0])
filenames = []
for i in range(24):
...
# 本地調試
num_col = 13
cat_col = 26
cat_size = 14670
embedding_size = 12
cross_layers = 3
deep_layers = [200,100,33]
label_size = 1
learning_rate = 0.0005
DCN模型
# DCN模型的構建,這里利用LOW Level API,實際上按照custom Estimator的流程會更好。
with tf.name_scope("DCN_model"):
he_init = tf.variance_scaling_initializer()
with tf.name_scope("Embedding_layer"):
x1, x2, label = input_fn(filenames,32)
Embed_W = tf.get_variable(name='embed_w', shape=[cat_size, embedding_size],
initializer=he_init) # TC * E
embeddings = tf.nn.embedding_lookup(Embed_W, x2) # ? * C * E
oned_embed = tf.reshape(embeddings, shape=[-1, cat_col * embedding_size]) # ? * (C * E)
embed_layer_res = tf.concat([x1, oned_embed], 1) # ? * (N + C * E)
with tf.name_scope("Cross_Network"):
x0 = embed_layer_res
cross_x = embed_layer_res
for level in range(cross_layers):
Cross_W = tf.get_variable(name='cross_w%s' % level, shape=[num_col + cat_col * embedding_size, 1],
initializer=he_init) # (N + C * E) * 1
Cross_B = tf.get_variable(name='cross_b%s' % level, shape=[1,num_col + cat_col * embedding_size],
initializer=he_init) # (N + C * E) * 1
xtw = tf.matmul(cross_x, Cross_W) # ? * 1
cross_x = x0 * xtw + cross_x + Cross_B # (N + C * E) * 1
with tf.name_scope("Deep_Network"):
deep_x = embed_layer_res
for neurons in deep_layers:
deep_x = tf.layers.dense(inputs=deep_x, units=neurons, name='deep_%s' % neurons,
activation=tf.nn.selu, kernel_initializer=he_init)
with tf.variable_scope("Output-layer"):
x_stack = tf.concat([cross_x, deep_x], 1) # ? * ((N + C * E) + deep_layers[-1])
logits = tf.layers.dense(inputs=x_stack, units=label_size, name="outputs")
z = tf.reshape(logits, shape=[-1])
pred = tf.sigmoid(z)
訓練和評估指標
with tf.name_scope("loss"):
xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=label, logits=z)
loss = tf.reduce_mean(xentropy, name="loss")
loss_summary = tf.summary.scalar('log_loss', loss)
with tf.name_scope("train"):
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
training_op = optimizer.minimize(loss)
with tf.name_scope("eval"):
acc, upacc = tf.metrics.accuracy(label, tf.math.round(pred))
auc, upauc = tf.metrics.auc(label, pred)
acc_summary = tf.summary.scalar('accuracy', upacc)
auc_summary = tf.summary.scalar('auc', upauc)
TensorBroad相關設置,optional
from datetime import datetime
def log_dir(prefix=""):
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
root_logdir = "tf_logs"
if prefix:
prefix += "-"
name = prefix + "run-" + now
return "{}/{}/".format(root_logdir, name)
logdir = log_dir("my_dcn")
file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph())
執行階段
# 包含checkpoint、early stop。同樣,這里利用LOW Level API,實際上按照custom Estimator的流程會更好。
n_epochs = 2
data_size = 12000000
batch_size = 64
n_batches = int(np.ceil(data_size / batch_size))
checkpoint_path = ".../model/my_dcn_model.ckpt"
checkpoint_epoch_path = checkpoint_path + ".epoch"
final_model_path = "./my_deep_mnist_model"
best_auc = np.infty
epochs_without_progress = 0
max_epochs_without_progress = 20
saver = tf.train.Saver()
gb_init = tf.global_variables_initializer()
lc_init = tf.local_variables_initializer()
with tf.Session() as sess:
if os.path.isfile(checkpoint_epoch_path):
with open(checkpoint_epoch_path, "rb") as f:
start_epoch = int(f.read())
print("Training was interrupted. Continuing at epoch", start_epoch)
saver.restore(sess, checkpoint_path)
else:
start_epoch = 0
sess.run([gb_init,lc_init])
for epoch in range(start_epoch, n_epochs):
for batch_index in range(n_batches):
# 每2000批數據測試一遍
if batch_index % 2000 != 0:
sess.run(training_op)
else:
loss_tr, loss_summary_str, up1, up2, acc_summary_str, auc_summary_str = sess.run([loss, loss_summary, upacc, upauc, acc_summary, auc_summary])
print("Epoch:", epoch, ",Batch_index:", batch_index,
"\tLoss: {:.5f}".format(loss_tr),
"\tACC: ", up1,
"\tAUC", up2)
file_writer.add_summary(acc_summary_str, batch_index)
file_writer.add_summary(auc_summary_str, batch_index)
file_writer.add_summary(loss_summary_str, batch_index)
if batch_index % 5000 == 0:
saver.save(sess, checkpoint_path)
with open(checkpoint_epoch_path, "wb") as f:
f.write(b"%d" % (epoch + 1))
if up2 < best_auc:
saver.save(sess, final_model_path)
best_auc = up2
else:
epochs_without_progress += 1
if epochs_without_progress > max_epochs_without_progress:
print("Early stopping")
break
參考: