背景
手上有个 Libsvm 格式数据集,已经跑过 LR 和 GBDT,想快速看下 DNN 的效果?那本文正适合你。
尽管深度学习研究和应用的热潮已持续高涨多年,TensorFlow 早已为算法同学所熟知,但并非所有人都对这个工具驾轻就熟,要在个人数据集上跑个简易 DNN 模型出来也不是顷刻间的事,特别是当数据集是 Libsvm 格式时。Libsvm 是机器学习常用格式,很多工具包括 Liblinear、XGBoost、LightGBM、ytk-learn、xlearn 都支持,但 Tensorflow 官方及民间均未见提供优雅的解决方案,这给新手造成了诸多不便,对应用如此广泛的工具来说是个遗憾。对此,本文提供了经过充分验证的解决方案(some code),相信可以帮助新同学节省些时间。
简介
本文代码可用于:
- 快速验证 Libsvm 数据集在 DNN 上的效果,以与其它线性模型或树模型做对比,探索模型的极限。
- 对高维特征做降维,可取第一隐层的输出作为 embedding,加入到其它训练过程中。
- 新手入门,学习 Tensorflow keras、Estimator 和 Dataset 的使用。
本次编码遵循如下原则:
- 尽量不自己造轮子,尽量用官方的或其它公认性能最好的代码,除非迫不得已。
- 代码尽量精简。
- 追求极致的时间复杂度和空间复杂度。
本文只介绍最初级的 DNN 多分类训练评估代码,其它更高阶复杂模型可参考 DeepCTR 等优秀的开源项目,后续会另外发文分享这些复杂模型在实际调研中的应用。
下面是 Tensorflow 针对 Libsvm 数据训练 DNN 的四个进阶代码及其思路,推荐使用后两者。
Keras generator
这里面临三个选择:
- Tensorflow API:要用 Tensorflow 构建个 DNN 模型,对熟手来说很容易,用低阶 API 也能马上建个 DNN,只是代码略显杂乱,相比之下,高阶 API Keras 就贴心得多,代码极度精简,一目了然。
- Libsvm 数据读取:手写个 Libsvm 格式数据的读取很容易,读取稀疏编码转成稠密编码,但既然 sklearn 已经有 load_svmlight_file 了为什么不用呢,该函数会读进整个文件,当然小数据量不是问题。
- fit 和 fit_generator:Keras 模型训练只接收稠密编码,而 Libsvm 是稀疏编码,如果数据集不算太大,通过 load_svmlight_file 全部读进内存也能接受,但要先全部转成稠密编码再喂给 fit,那内存可能会爆掉;理想方案是用多少读多少,读进来再转换,此处图省事就先用 load_svmlight_file 全部读进来以稀疏编码保存,使用时再分批喂给 fit_generator。
代码如下:
import numpy as np
from sklearn.datasets import load_svmlight_file
from tensorflow import keras
import tensorflow as tf
feature_len = 100000 # 特征维度,下面使用时可替换成 X_train.shape[1]
n_epochs = 1
batch_size = 256
train_file_path = './data/train_libsvm.txt'
test_file_path = './data/test_libsvm.txt'
def batch_generator(X_data, y_data, batch_size):
number_of_batches = X_data.shape[0]/batch_size
counter=0
index = np.arange(np.shape(y_data)[0])
while True:
index_batch = index[batch_size*counter:batch_size*(counter+1)]
X_batch = X_data[index_batch,:].todense()
y_batch = y_data[index_batch]
counter += 1
yield np.array(X_batch),y_batch
if (counter > number_of_batches):
counter=0
def create_keras_model(feature_len):
model = keras.Sequential([
# 可在此添加隐层
keras.layers.Dense(64, input_shape=[feature_len], activation=tf.nn.tanh),
keras.layers.Dense(6, activation=tf.nn.softmax)
])
model.compile(optimizer=tf.train.AdamOptimizer(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
if __name__ == "__main__":
X_train, y_train = load_svmlight_file(train_file_path)
X_test, y_test = load_svmlight_file(test_file_path)
keras_model = create_keras_model(X_train.shape[1])
keras_model.fit_generator(generator=batch_generator(X_train, y_train, batch_size = batch_size),
steps_per_epoch=int(X_train.shape[0]/batch_size),
epochs=n_epochs)
test_loss, test_acc = keras_model.evaluate_generator(generator=batch_generator(X_test, y_test, batch_size = batch_size),
steps=int(X_test.shape[0]/batch_size))
print('Test accuracy:', test_acc)
以上即早前实际调研中使用的代码,完成当时的训练任务够使了,但该代码的缺点显而易见,一方面空间复杂度太差,大数据常驻内存会影响其它进程,当遇到大数据集时就无能为力了,另一方面可用性差,数据分批需在 batch_generator 手动编码实现,调试耗费时间,也容易出错。
Tensorflow Dataset 是个完美的解决方案,不过由于之前对 Dataset 不熟,也不知道如何用 TF 低阶 API 解析 libsvm 并把 SparseTensor 转成 DenseTensor,当时时间有限就搁置了,后来才解决该问题,重点即下面代码中的 decode_libsvm 函数。
把 libsvm 转成 Dataset 后,DNN 才得到解锁,可以自由运行在任意大数据集上了。
下面依次介绍了 Dataset 应用在 Keras model、Keras to estimator、DNNClassifier。
附 embedding 代码,第一个隐层的输出作为 embedding:
def save_output_file(output_array, filename):
result = list()
for row_data in output_array:
line = ','.join([str(x) for x in row_data.tolist()])
result.append(line)
with open(filename,'w') as fw:
fw.write('%s' % '\n'.join(result))
X_test, y_test = load_svmlight_file("./data/test_libsvm.txt")
model = load_model('./dnn_onelayer_tanh.model')
dense1_layer_model = Model(inputs=model.input, outputs=model.layers[0].output)
dense1_output = dense1_layer_model.predict(X_test)
save_output_file(dense1_output, './hidden_output/hidden_output_test.txt')
Keras Dataset
将 libsvm 数据读取从 load_svmlight_file 改成 dataset 并 decode_libsvm。
import numpy as np
from sklearn.datasets import load_svmlight_file
from tensorflow import keras
import tensorflow as tf
feature_len = 138830
n_epochs = 1
batch_size = 256
train_file_path = './data/train_libsvm.txt'
test_file_path = './data/test_libsvm.txt'
def decode_libsvm(line):
columns = tf.string_split([line], ' ')
labels = tf.string_to_number(columns.values[0], out_type=tf.int32)
labels = tf.reshape(labels,[-1])
splits = tf.string_split(columns.values[1:], ':')
id_vals = tf.reshape(splits.values,splits.dense_shape)
feat_ids, feat_vals = tf.split(id_vals,num_or_size_splits=2,axis=1)
feat_ids = tf.string_to_number(feat_ids, out_type=tf.int64)
feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32)
# 由于 libsvm 特征编码从 1 开始,这里需要将 feat_ids 减 1
sparse_feature = tf.SparseTensor(feat_ids-1, tf.reshape(feat_vals,[-1]), [feature_len])
dense_feature = tf.sparse.to_dense(sparse_feature)
return dense_feature, labels
def create_keras_model():
model = keras.Sequential([
keras.layers.Dense(64, input_shape=[feature_len], activation=tf.nn.tanh),
keras.layers.Dense(6, activation=tf.nn.softmax)
])
model.compile(optimizer=tf.train.AdamOptimizer(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
if __name__ == "__main__":
dataset_train = tf.data.TextLineDataset([train_file_path]).map(decode_libsvm).batch(batch_size).repeat()
dataset_test = tf.data.TextLineDataset([test_file_path]).map(decode_libsvm).batch(batch_size).repeat()
keras_model = create_keras_model()
sample_size = 10000 # 由于训练函数必须要指定 steps_per_epoch,所以这里需要先获取到样本数
keras_model.fit(dataset_train, steps_per_epoch=int(sample_size/batch_size), epochs=n_epochs)
test_loss, test_acc = keras_model.evaluate(dataset_test, steps=int(sample_size/batch_size))
print('Test accuracy:', test_acc)
解决了空间复杂度高的问题,数据轻轻地来,轻轻地去,不占用大量内存。
不过可用性上仍有两点不便:
- keras fit 时需指定 steps_per_epoch,为了保证每一轮走完整批数据,需要实现计算 sample size,不合理,其实 dataset 的 repeat 就可以保证,用 Estimator 就没有必须指定 steps_per_epoch 的限制。
- 需事先计算特征维度 feature_len,由于 libsvm 是稀疏编码,只读取一行或几行无法推断特征维度,可先离线用 load_svmlight_file 获取特征维度 feature_len=X_train.shape[1],然后写死在代码里。这是 libsvm 的固有特点,只能如此处理了。
Keras model to Estimator
Tensorflow 的另一个高阶 API 是 Estimator,更加灵活,据说单机和分布式代码一致,且不用考虑底层的硬件设施,可以比较方便地和一些分布式调度框架(e.g. xlearning)结合使用,在工作中也发现 Estimator 比 Keras 能得到平台更全面的支持。
Estimator 是跟 Keras 相互独立的高阶 API,如果之前用的是 Keras,一时半会不能全部重构成 Estimator, TF 还提供了 Keras 的 model_to_estimator 接口,也可以享受到 Estimator 带来的好处。
from tensorflow import keras
import tensorflow as tf
from tensorflow.python.platform import tf_logging
# 打开 estimator 日志,可在训练时输出日志,了解进度
tf_logging.set_verbosity('INFO')
feature_len = 100000
n_epochs = 1
batch_size = 256
train_file_path = './data/train_libsvm.txt'
test_file_path = './data/test_libsvm.txt'
# 注意这里多了个参数 input_name,返回值也与上不同
def decode_libsvm(line, input_name):
columns = tf.string_split([line], ' ')
labels = tf.string_to_number(columns.values[0], out_type=tf.int32)
labels = tf.reshape(labels,[-1])
splits = tf.string_split(columns.values[1:], ':')
id_vals = tf.reshape(splits.values,splits.dense_shape)
feat_ids, feat_vals = tf.split(id_vals,num_or_size_splits=2,axis=1)
feat_ids = tf.string_to_number(feat_ids, out_type=tf.int64)
feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32)
sparse_feature = tf.SparseTensor(feat_ids-1, tf.reshape(feat_vals,[-1]),[feature_len])
dense_feature = tf.sparse.to_dense(sparse_feature)
return {input_name: dense_feature}, labels
def input_train(input_name):
# 这里使用 lambda 来给 map 中的 decode_libsvm 函数添加除 line 之的参数
return tf.data.TextLineDataset([train_file_path]).map(lambda line: decode_libsvm(line, input_name)).batch(batch_size).repeat(n_epochs).make_one_shot_iterator().get_next()
def input_test(input_name):
return tf.data.TextLineDataset([train_file_path]).map(lambda line: decode_libsvm(line, input_name)).batch(batch_size).make_one_shot_iterator().get_next()
def create_keras_model(feature_len):
model = keras.Sequential([
# 可在此添加隐层
keras.layers.Dense(64, input_shape=[feature_len], activation=tf.nn.tanh),
keras.layers.Dense(6, activation=tf.nn.softmax)
])
model.compile(optimizer=tf.train.AdamOptimizer(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
def create_keras_estimator():
model = create_keras_model()
input_name = model.input_names[0]
estimator = tf.keras.estimator.model_to_estimator(model)
return estimator, input_name
if __name__ == "__main__":
keras_estimator, input_name = create_keras_estimator(feature_len)
keras_estimator.train(input_fn=lambda:input_train(input_name))
eval_result = keras_estimator.evaluate(input_fn=lambda:input_train(input_name))
print(eval_result)
这里不用 sample_size 了,但 feature_len 还是须事先计算。注意到 Estimator 的 input_fn 返回的 dict key 需要跟 model 的输入名保持一致,这里通过 input_name 传递该值。
用 Keras 的人很多,很多开源项目也用 Keras 来搭建复杂模型,由于 Keras 的模型格式特别,部分平台不支持保存,但提供了对 Estimator 的模型保存支持,这时正好可以使用 model_to_estimator 来保存 Keras 模型,非常方便。
DNNClassifier
最后来直接使用 Tensorflow 预创建的 Estimator:DNNClassifier。
import tensorflow as tf
from tensorflow.python.platform import tf_logging
# 打开 estimator 日志,可在训练时输出日志,了解进度
tf_logging.set_verbosity('INFO')
feature_len = 100000
n_epochs = 1
batch_size = 256
train_file_path = './data/train_libsvm.txt'
test_file_path = './data/test_libsvm.txt'
def decode_libsvm(line, input_name):
columns = tf.string_split([line], ' ')
labels = tf.string_to_number(columns.values[0], out_type=tf.int32)
labels = tf.reshape(labels,[-1])
splits = tf.string_split(columns.values[1:], ':')
id_vals = tf.reshape(splits.values,splits.dense_shape)
feat_ids, feat_vals = tf.split(id_vals,num_or_size_splits=2,axis=1)
feat_ids = tf.string_to_number(feat_ids, out_type=tf.int64)
feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32)
sparse_feature = tf.SparseTensor(feat_ids-1,tf.reshape(feat_vals,[-1]),[feature_len])
dense_feature = tf.sparse.to_dense(sparse_feature)
return {input_name: dense_feature}, labels
def input_train(input_name):
return tf.data.TextLineDataset([train_file_path]).map(lambda line: decode_libsvm(line, input_name)).batch(batch_size).repeat(n_epochs).make_one_shot_iterator().get_next()
def input_test(input_name):
return tf.data.TextLineDataset([train_file_path]).map(lambda line: decode_libsvm(line, input_name)).batch(batch_size).make_one_shot_iterator().get_next()
def create_dnn_estimator():
input_name = "dense_input"
feature_columns = tf.feature_column.numeric_column(input_name, shape=[feature_len])
estimator = tf.estimator.DNNClassifier(hidden_units=[64],
n_classes=6,
feature_columns=[feature_columns])
return estimator, input_name
if __name__ == "__main__":
dnn_estimator, input_name = create_dnn_estimator()
dnn_estimator.train(input_fn=lambda:input_train(input_name))
eval_result = dnn_estimator.evaluate(input_fn=lambda:input_test(input_name))
print('\nTest set accuracy: {accuracy:0.3f}\n'.format(**eval_result))
Estimator 代码逻辑清晰,使用简单,功能也很强大,关于 Estimator 的更多信息可参考官方文档,这里不再赘述。
以上方案除第一个不便处理大数据,其它均可在单机运行,使用时可根据需求修改网络结构、目标函数等。
本文代码源自一个调研,耗费数小时调试,调研完成代码即闲置,现不计鄙陋,抛砖引玉,希望能对其它同学有所帮助。